diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs index 68c790b8..82f15be3 100644 --- a/codex-rs/exec/src/exec_events.rs +++ b/codex-rs/exec/src/exec_events.rs @@ -8,6 +8,8 @@ use ts_rs::TS; pub enum ConversationEvent { #[serde(rename = "session.created")] SessionCreated(SessionCreatedEvent), + #[serde(rename = "item.started")] + ItemStarted(ItemStartedEvent), #[serde(rename = "item.completed")] ItemCompleted(ItemCompletedEvent), #[serde(rename = "error")] @@ -20,6 +22,12 @@ pub struct SessionCreatedEvent { pub session_id: String, } +/// Payload describing the start of an existing conversation item. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct ItemStartedEvent { + pub item: ConversationItem, +} + /// Payload describing the completion of an existing conversation item. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct ItemCompletedEvent { @@ -85,7 +93,8 @@ pub enum CommandExecutionStatus { pub struct CommandExecutionItem { pub command: String, pub aggregated_output: String, - pub exit_code: i32, + #[serde(skip_serializing_if = "Option::is_none")] + pub exit_code: Option, pub status: CommandExecutionStatus, } diff --git a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs index 6abbda7c..ea4ba022 100644 --- a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs +++ b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs @@ -15,6 +15,7 @@ use crate::exec_events::ConversationItemDetails; use crate::exec_events::FileChangeItem; use crate::exec_events::FileUpdateChange; use crate::exec_events::ItemCompletedEvent; +use crate::exec_events::ItemStartedEvent; use crate::exec_events::PatchApplyStatus; use crate::exec_events::PatchChangeKind; use crate::exec_events::ReasoningItem; @@ -32,14 +33,22 @@ use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::TaskCompleteEvent; use tracing::error; +use tracing::warn; pub struct ExperimentalEventProcessorWithJsonOutput { last_message_path: Option, next_event_id: AtomicU64, - running_commands: HashMap>, + // Tracks running commands by call_id, including the associated item id. + running_commands: HashMap, running_patch_applies: HashMap, } +#[derive(Debug, Clone)] +struct RunningCommand { + command: String, + item_id: String, +} + impl ExperimentalEventProcessorWithJsonOutput { pub fn new(last_message_path: Option) -> Self { Self { @@ -114,10 +123,38 @@ impl ExperimentalEventProcessorWithJsonOutput { })] } fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec { - self.running_commands - .insert(ev.call_id.clone(), ev.command.clone()); + let item_id = self.get_next_item_id(); - Vec::new() + let command_string = match shlex::try_join(ev.command.iter().map(String::as_str)) { + Ok(command_string) => command_string, + Err(e) => { + warn!( + call_id = ev.call_id, + "Failed to stringify command: {e:?}; skipping item.started" + ); + ev.command.join(" ") + } + }; + + self.running_commands.insert( + ev.call_id.clone(), + RunningCommand { + command: command_string.clone(), + item_id: item_id.clone(), + }, + ); + + let item = ConversationItem { + id: item_id, + details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + command: command_string, + aggregated_output: String::new(), + exit_code: None, + status: CommandExecutionStatus::InProgress, + }), + }; + + vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] } fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec { @@ -167,23 +204,26 @@ impl ExperimentalEventProcessorWithJsonOutput { } fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec { - let command = self - .running_commands - .remove(&ev.call_id) - .map(|command| command.join(" ")) - .unwrap_or_default(); + let Some(RunningCommand { command, item_id }) = self.running_commands.remove(&ev.call_id) + else { + warn!( + call_id = ev.call_id, + "ExecCommandEnd without matching ExecCommandBegin; skipping item.completed" + ); + return Vec::new(); + }; let status = if ev.exit_code == 0 { CommandExecutionStatus::Completed } else { CommandExecutionStatus::Failed }; let item = ConversationItem { - id: self.get_next_item_id(), + id: item_id, details: ConversationItemDetails::CommandExecution(CommandExecutionItem { command, aggregated_output: ev.aggregated_output.clone(), - exit_code: ev.exit_code, + exit_code: Some(ev.exit_code), status, }), }; diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index 0ff8ed13..d8911aae 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -16,6 +16,7 @@ use codex_exec::exec_events::ConversationEvent; use codex_exec::exec_events::ConversationItem; use codex_exec::exec_events::ConversationItemDetails; use codex_exec::exec_events::ItemCompletedEvent; +use codex_exec::exec_events::ItemStartedEvent; use codex_exec::exec_events::PatchApplyStatus; use codex_exec::exec_events::PatchChangeKind; use codex_exec::exec_events::ReasoningItem; @@ -156,7 +157,20 @@ fn exec_command_end_success_produces_completed_command_item() { }), ); let out_begin = ep.collect_conversation_events(&begin); - assert!(out_begin.is_empty()); + assert_eq!( + out_begin, + vec![ConversationEvent::ItemStarted(ItemStartedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + command: "bash -lc 'echo hi'".to_string(), + aggregated_output: String::new(), + exit_code: None, + status: CommandExecutionStatus::InProgress, + }), + }, + })] + ); // End (success) -> item.completed (item_0) let end_ok = event( @@ -178,9 +192,9 @@ fn exec_command_end_success_produces_completed_command_item() { item: ConversationItem { id: "item_0".to_string(), details: ConversationItemDetails::CommandExecution(CommandExecutionItem { - command: "bash -lc echo hi".to_string(), + command: "bash -lc 'echo hi'".to_string(), aggregated_output: "hi\n".to_string(), - exit_code: 0, + exit_code: Some(0), status: CommandExecutionStatus::Completed, }), }, @@ -202,7 +216,20 @@ fn exec_command_end_failure_produces_failed_command_item() { parsed_cmd: Vec::new(), }), ); - assert!(ep.collect_conversation_events(&begin).is_empty()); + assert_eq!( + ep.collect_conversation_events(&begin), + vec![ConversationEvent::ItemStarted(ItemStartedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + command: "sh -c 'exit 1'".to_string(), + aggregated_output: String::new(), + exit_code: None, + status: CommandExecutionStatus::InProgress, + }), + }, + })] + ); // End (failure) -> item.completed (item_0) let end_fail = event( @@ -224,9 +251,9 @@ fn exec_command_end_failure_produces_failed_command_item() { item: ConversationItem { id: "item_0".to_string(), details: ConversationItemDetails::CommandExecution(CommandExecutionItem { - command: "sh -c exit 1".to_string(), + command: "sh -c 'exit 1'".to_string(), aggregated_output: String::new(), - exit_code: 1, + exit_code: Some(1), status: CommandExecutionStatus::Failed, }), }, @@ -234,6 +261,27 @@ fn exec_command_end_failure_produces_failed_command_item() { ); } +#[test] +fn exec_command_end_without_begin_is_ignored() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + // End event arrives without a prior Begin; should produce no conversation events. + let end_only = event( + "c1", + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "no-begin".to_string(), + stdout: String::new(), + stderr: String::new(), + aggregated_output: String::new(), + exit_code: 0, + duration: Duration::from_millis(1), + formatted_output: String::new(), + }), + ); + let out = ep.collect_conversation_events(&end_only); + assert!(out.is_empty()); +} + #[test] fn patch_apply_success_produces_item_completed_patchapply() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);