[codex exec] Add item.started and support it for command execution (#4250)
Adds a new `item.started` event to `codex exec` and implements it for
command_execution item type.
```jsonl
{"type":"session.created","session_id":"019982d1-75f0-7920-b051-e0d3731a5ed8"}
{"type":"item.completed","item":{"id":"item_0","item_type":"reasoning","text":"**Executing commands securely**\n\nI'm thinking about how the default harness typically uses \"bash -lc,\" while historically \"bash\" is what we've been using. The command should be executed as a string in our CLI, so using \"bash -lc 'echo hello'\" is optimal but calling \"echo hello\" directly feels safer. The sandbox makes sure environment variables like CODEX_SANDBOX_NETWORK_DISABLED=1 are set, so I won't ask for approval. I just need to run \"echo hello\" and correctly present the output."}}
{"type":"item.completed","item":{"id":"item_1","item_type":"reasoning","text":"**Preparing for tool calls**\n\nI realize that I need to include a preamble before making any tool calls. So, I'll first state the preamble in the commentary channel, then proceed with the tool call. After that, I need to present the final message along with the output. It's possible that the CLI will show the output inline, but I must ensure that I present the result clearly regardless. Let's move forward and get this organized!"}}
{"type":"item.completed","item":{"id":"item_2","item_type":"assistant_message","text":"Running `echo` to confirm shell access and print output."}}
{"type":"item.started","item":{"id":"item_3","item_type":"command_execution","command":"bash -lc echo hello","aggregated_output":"","exit_code":null,"status":"in_progress"}}
{"type":"item.completed","item":{"id":"item_3","item_type":"command_execution","command":"bash -lc echo hello","aggregated_output":"hello\n","exit_code":0,"status":"completed"}}
{"type":"item.completed","item":{"id":"item_4","item_type":"assistant_message","text":"hello"}}
```
This commit is contained in:
@@ -8,6 +8,8 @@ use ts_rs::TS;
|
|||||||
pub enum ConversationEvent {
|
pub enum ConversationEvent {
|
||||||
#[serde(rename = "session.created")]
|
#[serde(rename = "session.created")]
|
||||||
SessionCreated(SessionCreatedEvent),
|
SessionCreated(SessionCreatedEvent),
|
||||||
|
#[serde(rename = "item.started")]
|
||||||
|
ItemStarted(ItemStartedEvent),
|
||||||
#[serde(rename = "item.completed")]
|
#[serde(rename = "item.completed")]
|
||||||
ItemCompleted(ItemCompletedEvent),
|
ItemCompleted(ItemCompletedEvent),
|
||||||
#[serde(rename = "error")]
|
#[serde(rename = "error")]
|
||||||
@@ -20,6 +22,12 @@ pub struct SessionCreatedEvent {
|
|||||||
pub session_id: String,
|
pub session_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Payload describing the start of an existing conversation item.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
|
||||||
|
pub struct ItemStartedEvent {
|
||||||
|
pub item: ConversationItem,
|
||||||
|
}
|
||||||
|
|
||||||
/// Payload describing the completion of an existing conversation item.
|
/// Payload describing the completion of an existing conversation item.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
|
||||||
pub struct ItemCompletedEvent {
|
pub struct ItemCompletedEvent {
|
||||||
@@ -85,7 +93,8 @@ pub enum CommandExecutionStatus {
|
|||||||
pub struct CommandExecutionItem {
|
pub struct CommandExecutionItem {
|
||||||
pub command: String,
|
pub command: String,
|
||||||
pub aggregated_output: String,
|
pub aggregated_output: String,
|
||||||
pub exit_code: i32,
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub exit_code: Option<i32>,
|
||||||
pub status: CommandExecutionStatus,
|
pub status: CommandExecutionStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ use crate::exec_events::ConversationItemDetails;
|
|||||||
use crate::exec_events::FileChangeItem;
|
use crate::exec_events::FileChangeItem;
|
||||||
use crate::exec_events::FileUpdateChange;
|
use crate::exec_events::FileUpdateChange;
|
||||||
use crate::exec_events::ItemCompletedEvent;
|
use crate::exec_events::ItemCompletedEvent;
|
||||||
|
use crate::exec_events::ItemStartedEvent;
|
||||||
use crate::exec_events::PatchApplyStatus;
|
use crate::exec_events::PatchApplyStatus;
|
||||||
use crate::exec_events::PatchChangeKind;
|
use crate::exec_events::PatchChangeKind;
|
||||||
use crate::exec_events::ReasoningItem;
|
use crate::exec_events::ReasoningItem;
|
||||||
@@ -32,14 +33,22 @@ use codex_core::protocol::PatchApplyEndEvent;
|
|||||||
use codex_core::protocol::SessionConfiguredEvent;
|
use codex_core::protocol::SessionConfiguredEvent;
|
||||||
use codex_core::protocol::TaskCompleteEvent;
|
use codex_core::protocol::TaskCompleteEvent;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
pub struct ExperimentalEventProcessorWithJsonOutput {
|
pub struct ExperimentalEventProcessorWithJsonOutput {
|
||||||
last_message_path: Option<PathBuf>,
|
last_message_path: Option<PathBuf>,
|
||||||
next_event_id: AtomicU64,
|
next_event_id: AtomicU64,
|
||||||
running_commands: HashMap<String, Vec<String>>,
|
// Tracks running commands by call_id, including the associated item id.
|
||||||
|
running_commands: HashMap<String, RunningCommand>,
|
||||||
running_patch_applies: HashMap<String, PatchApplyBeginEvent>,
|
running_patch_applies: HashMap<String, PatchApplyBeginEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct RunningCommand {
|
||||||
|
command: String,
|
||||||
|
item_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
impl ExperimentalEventProcessorWithJsonOutput {
|
impl ExperimentalEventProcessorWithJsonOutput {
|
||||||
pub fn new(last_message_path: Option<PathBuf>) -> Self {
|
pub fn new(last_message_path: Option<PathBuf>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -114,10 +123,38 @@ impl ExperimentalEventProcessorWithJsonOutput {
|
|||||||
})]
|
})]
|
||||||
}
|
}
|
||||||
fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec<ConversationEvent> {
|
fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec<ConversationEvent> {
|
||||||
self.running_commands
|
let item_id = self.get_next_item_id();
|
||||||
.insert(ev.call_id.clone(), ev.command.clone());
|
|
||||||
|
|
||||||
Vec::new()
|
let command_string = match shlex::try_join(ev.command.iter().map(String::as_str)) {
|
||||||
|
Ok(command_string) => command_string,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
call_id = ev.call_id,
|
||||||
|
"Failed to stringify command: {e:?}; skipping item.started"
|
||||||
|
);
|
||||||
|
ev.command.join(" ")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.running_commands.insert(
|
||||||
|
ev.call_id.clone(),
|
||||||
|
RunningCommand {
|
||||||
|
command: command_string.clone(),
|
||||||
|
item_id: item_id.clone(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let item = ConversationItem {
|
||||||
|
id: item_id,
|
||||||
|
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
||||||
|
command: command_string,
|
||||||
|
aggregated_output: String::new(),
|
||||||
|
exit_code: None,
|
||||||
|
status: CommandExecutionStatus::InProgress,
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec<ConversationEvent> {
|
fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec<ConversationEvent> {
|
||||||
@@ -167,23 +204,26 @@ impl ExperimentalEventProcessorWithJsonOutput {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec<ConversationEvent> {
|
fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec<ConversationEvent> {
|
||||||
let command = self
|
let Some(RunningCommand { command, item_id }) = self.running_commands.remove(&ev.call_id)
|
||||||
.running_commands
|
else {
|
||||||
.remove(&ev.call_id)
|
warn!(
|
||||||
.map(|command| command.join(" "))
|
call_id = ev.call_id,
|
||||||
.unwrap_or_default();
|
"ExecCommandEnd without matching ExecCommandBegin; skipping item.completed"
|
||||||
|
);
|
||||||
|
return Vec::new();
|
||||||
|
};
|
||||||
let status = if ev.exit_code == 0 {
|
let status = if ev.exit_code == 0 {
|
||||||
CommandExecutionStatus::Completed
|
CommandExecutionStatus::Completed
|
||||||
} else {
|
} else {
|
||||||
CommandExecutionStatus::Failed
|
CommandExecutionStatus::Failed
|
||||||
};
|
};
|
||||||
let item = ConversationItem {
|
let item = ConversationItem {
|
||||||
id: self.get_next_item_id(),
|
id: item_id,
|
||||||
|
|
||||||
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
||||||
command,
|
command,
|
||||||
aggregated_output: ev.aggregated_output.clone(),
|
aggregated_output: ev.aggregated_output.clone(),
|
||||||
exit_code: ev.exit_code,
|
exit_code: Some(ev.exit_code),
|
||||||
status,
|
status,
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ use codex_exec::exec_events::ConversationEvent;
|
|||||||
use codex_exec::exec_events::ConversationItem;
|
use codex_exec::exec_events::ConversationItem;
|
||||||
use codex_exec::exec_events::ConversationItemDetails;
|
use codex_exec::exec_events::ConversationItemDetails;
|
||||||
use codex_exec::exec_events::ItemCompletedEvent;
|
use codex_exec::exec_events::ItemCompletedEvent;
|
||||||
|
use codex_exec::exec_events::ItemStartedEvent;
|
||||||
use codex_exec::exec_events::PatchApplyStatus;
|
use codex_exec::exec_events::PatchApplyStatus;
|
||||||
use codex_exec::exec_events::PatchChangeKind;
|
use codex_exec::exec_events::PatchChangeKind;
|
||||||
use codex_exec::exec_events::ReasoningItem;
|
use codex_exec::exec_events::ReasoningItem;
|
||||||
@@ -156,7 +157,20 @@ fn exec_command_end_success_produces_completed_command_item() {
|
|||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
let out_begin = ep.collect_conversation_events(&begin);
|
let out_begin = ep.collect_conversation_events(&begin);
|
||||||
assert!(out_begin.is_empty());
|
assert_eq!(
|
||||||
|
out_begin,
|
||||||
|
vec![ConversationEvent::ItemStarted(ItemStartedEvent {
|
||||||
|
item: ConversationItem {
|
||||||
|
id: "item_0".to_string(),
|
||||||
|
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
||||||
|
command: "bash -lc 'echo hi'".to_string(),
|
||||||
|
aggregated_output: String::new(),
|
||||||
|
exit_code: None,
|
||||||
|
status: CommandExecutionStatus::InProgress,
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
})]
|
||||||
|
);
|
||||||
|
|
||||||
// End (success) -> item.completed (item_0)
|
// End (success) -> item.completed (item_0)
|
||||||
let end_ok = event(
|
let end_ok = event(
|
||||||
@@ -178,9 +192,9 @@ fn exec_command_end_success_produces_completed_command_item() {
|
|||||||
item: ConversationItem {
|
item: ConversationItem {
|
||||||
id: "item_0".to_string(),
|
id: "item_0".to_string(),
|
||||||
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
||||||
command: "bash -lc echo hi".to_string(),
|
command: "bash -lc 'echo hi'".to_string(),
|
||||||
aggregated_output: "hi\n".to_string(),
|
aggregated_output: "hi\n".to_string(),
|
||||||
exit_code: 0,
|
exit_code: Some(0),
|
||||||
status: CommandExecutionStatus::Completed,
|
status: CommandExecutionStatus::Completed,
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
@@ -202,7 +216,20 @@ fn exec_command_end_failure_produces_failed_command_item() {
|
|||||||
parsed_cmd: Vec::new(),
|
parsed_cmd: Vec::new(),
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
assert!(ep.collect_conversation_events(&begin).is_empty());
|
assert_eq!(
|
||||||
|
ep.collect_conversation_events(&begin),
|
||||||
|
vec![ConversationEvent::ItemStarted(ItemStartedEvent {
|
||||||
|
item: ConversationItem {
|
||||||
|
id: "item_0".to_string(),
|
||||||
|
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
||||||
|
command: "sh -c 'exit 1'".to_string(),
|
||||||
|
aggregated_output: String::new(),
|
||||||
|
exit_code: None,
|
||||||
|
status: CommandExecutionStatus::InProgress,
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
})]
|
||||||
|
);
|
||||||
|
|
||||||
// End (failure) -> item.completed (item_0)
|
// End (failure) -> item.completed (item_0)
|
||||||
let end_fail = event(
|
let end_fail = event(
|
||||||
@@ -224,9 +251,9 @@ fn exec_command_end_failure_produces_failed_command_item() {
|
|||||||
item: ConversationItem {
|
item: ConversationItem {
|
||||||
id: "item_0".to_string(),
|
id: "item_0".to_string(),
|
||||||
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
details: ConversationItemDetails::CommandExecution(CommandExecutionItem {
|
||||||
command: "sh -c exit 1".to_string(),
|
command: "sh -c 'exit 1'".to_string(),
|
||||||
aggregated_output: String::new(),
|
aggregated_output: String::new(),
|
||||||
exit_code: 1,
|
exit_code: Some(1),
|
||||||
status: CommandExecutionStatus::Failed,
|
status: CommandExecutionStatus::Failed,
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
@@ -234,6 +261,27 @@ fn exec_command_end_failure_produces_failed_command_item() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn exec_command_end_without_begin_is_ignored() {
|
||||||
|
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
|
||||||
|
|
||||||
|
// End event arrives without a prior Begin; should produce no conversation events.
|
||||||
|
let end_only = event(
|
||||||
|
"c1",
|
||||||
|
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
||||||
|
call_id: "no-begin".to_string(),
|
||||||
|
stdout: String::new(),
|
||||||
|
stderr: String::new(),
|
||||||
|
aggregated_output: String::new(),
|
||||||
|
exit_code: 0,
|
||||||
|
duration: Duration::from_millis(1),
|
||||||
|
formatted_output: String::new(),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
let out = ep.collect_conversation_events(&end_only);
|
||||||
|
assert!(out.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn patch_apply_success_produces_item_completed_patchapply() {
|
fn patch_apply_success_produces_item_completed_patchapply() {
|
||||||
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
|
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
|
||||||
|
|||||||
Reference in New Issue
Block a user