From 4a80059b1b2a427ab5e0ac742c3657cb6e8bf7ac Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 29 Sep 2025 18:38:04 -0700 Subject: [PATCH] Add turn.failed and rename session created to thread started (#4478) Don't produce completed when turn failed. --- codex-rs/exec/src/exec_events.rs | 15 +++++-- ...mental_event_processor_with_json_output.rs | 32 +++++++++----- .../tests/event_processor_with_json_output.rs | 43 +++++++++++++++++-- 3 files changed, 72 insertions(+), 18 deletions(-) diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs index 07a9c015..0681111e 100644 --- a/codex-rs/exec/src/exec_events.rs +++ b/codex-rs/exec/src/exec_events.rs @@ -6,12 +6,14 @@ use ts_rs::TS; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] #[serde(tag = "type")] pub enum ConversationEvent { - #[serde(rename = "session.created")] - SessionCreated(SessionCreatedEvent), + #[serde(rename = "thread.started")] + ThreadStarted(ThreadStartedEvent), #[serde(rename = "turn.started")] TurnStarted(TurnStartedEvent), #[serde(rename = "turn.completed")] TurnCompleted(TurnCompletedEvent), + #[serde(rename = "turn.failed")] + TurnFailed(TurnFailedEvent), #[serde(rename = "item.started")] ItemStarted(ItemStartedEvent), #[serde(rename = "item.updated")] @@ -23,8 +25,8 @@ pub enum ConversationEvent { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] -pub struct SessionCreatedEvent { - pub session_id: String, +pub struct ThreadStartedEvent { + pub thread_id: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)] @@ -35,6 +37,11 @@ pub struct TurnCompletedEvent { pub usage: Usage, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct TurnFailedEvent { + pub error: ConversationErrorEvent, +} + /// Minimal usage summary for a turn. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)] pub struct Usage { diff --git a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs index 55af87a2..7d4f12f7 100644 --- a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs +++ b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs @@ -20,10 +20,11 @@ use crate::exec_events::ItemUpdatedEvent; use crate::exec_events::PatchApplyStatus; use crate::exec_events::PatchChangeKind; use crate::exec_events::ReasoningItem; -use crate::exec_events::SessionCreatedEvent; +use crate::exec_events::ThreadStartedEvent; use crate::exec_events::TodoItem; use crate::exec_events::TodoListItem; use crate::exec_events::TurnCompletedEvent; +use crate::exec_events::TurnFailedEvent; use crate::exec_events::TurnStartedEvent; use crate::exec_events::Usage; use codex_core::config::Config; @@ -53,6 +54,7 @@ pub struct ExperimentalEventProcessorWithJsonOutput { // Tracks the todo list for the current turn (at most one per turn). running_todo_list: Option, last_total_token_usage: Option, + last_critical_error: Option, } #[derive(Debug, Clone)] @@ -76,6 +78,7 @@ impl ExperimentalEventProcessorWithJsonOutput { running_patch_applies: HashMap::new(), running_todo_list: None, last_total_token_usage: None, + last_critical_error: None, } } @@ -96,9 +99,13 @@ impl ExperimentalEventProcessorWithJsonOutput { } EventMsg::TaskStarted(ev) => self.handle_task_started(ev), EventMsg::TaskComplete(_) => self.handle_task_complete(), - EventMsg::Error(ev) => vec![ConversationEvent::Error(ConversationErrorEvent { - message: ev.message.clone(), - })], + EventMsg::Error(ev) => { + let error = ConversationErrorEvent { + message: ev.message.clone(), + }; + self.last_critical_error = Some(error.clone()); + vec![ConversationEvent::Error(error)] + } EventMsg::StreamError(ev) => vec![ConversationEvent::Error(ConversationErrorEvent { message: ev.message.clone(), })], @@ -119,8 +126,8 @@ impl ExperimentalEventProcessorWithJsonOutput { &self, payload: &SessionConfiguredEvent, ) -> Vec { - vec![ConversationEvent::SessionCreated(SessionCreatedEvent { - session_id: payload.session_id.to_string(), + vec![ConversationEvent::ThreadStarted(ThreadStartedEvent { + thread_id: payload.session_id.to_string(), })] } @@ -296,7 +303,8 @@ impl ExperimentalEventProcessorWithJsonOutput { vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] } - fn handle_task_started(&self, _: &TaskStartedEvent) -> Vec { + fn handle_task_started(&mut self, _: &TaskStartedEvent) -> Vec { + self.last_critical_error = None; vec![ConversationEvent::TurnStarted(TurnStartedEvent {})] } @@ -325,9 +333,13 @@ impl ExperimentalEventProcessorWithJsonOutput { })); } - items.push(ConversationEvent::TurnCompleted(TurnCompletedEvent { - usage, - })); + if let Some(error) = self.last_critical_error.take() { + items.push(ConversationEvent::TurnFailed(TurnFailedEvent { error })); + } else { + items.push(ConversationEvent::TurnCompleted(TurnCompletedEvent { + usage, + })); + } items } diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index ebfb667b..f6f38d32 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -1,5 +1,6 @@ use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::AgentReasoningEvent; +use codex_core::protocol::ErrorEvent; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecCommandBeginEvent; @@ -21,10 +22,11 @@ use codex_exec::exec_events::ItemUpdatedEvent; use codex_exec::exec_events::PatchApplyStatus; use codex_exec::exec_events::PatchChangeKind; use codex_exec::exec_events::ReasoningItem; -use codex_exec::exec_events::SessionCreatedEvent; +use codex_exec::exec_events::ThreadStartedEvent; use codex_exec::exec_events::TodoItem as ExecTodoItem; use codex_exec::exec_events::TodoListItem as ExecTodoListItem; use codex_exec::exec_events::TurnCompletedEvent; +use codex_exec::exec_events::TurnFailedEvent; use codex_exec::exec_events::TurnStartedEvent; use codex_exec::exec_events::Usage; use codex_exec::experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput; @@ -40,7 +42,7 @@ fn event(id: &str, msg: EventMsg) -> Event { } #[test] -fn session_configured_produces_session_created_event() { +fn session_configured_produces_thread_started_event() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); let session_id = codex_protocol::mcp_protocol::ConversationId::from_string( "67e55044-10b1-426f-9247-bb680e5fe0c8", @@ -62,8 +64,8 @@ fn session_configured_produces_session_created_event() { let out = ep.collect_conversation_events(&ev); assert_eq!( out, - vec![ConversationEvent::SessionCreated(SessionCreatedEvent { - session_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(), + vec![ConversationEvent::ThreadStarted(ThreadStartedEvent { + thread_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(), })] ); } @@ -334,6 +336,39 @@ fn stream_error_event_produces_error() { ); } +#[test] +fn error_followed_by_task_complete_produces_turn_failed() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + let error_event = event( + "e1", + EventMsg::Error(ErrorEvent { + message: "boom".to_string(), + }), + ); + assert_eq!( + ep.collect_conversation_events(&error_event), + vec![ConversationEvent::Error(ConversationErrorEvent { + message: "boom".to_string(), + })] + ); + + let complete_event = event( + "e2", + EventMsg::TaskComplete(codex_core::protocol::TaskCompleteEvent { + last_agent_message: None, + }), + ); + assert_eq!( + ep.collect_conversation_events(&complete_event), + vec![ConversationEvent::TurnFailed(TurnFailedEvent { + error: ConversationErrorEvent { + message: "boom".to_string(), + }, + })] + ); +} + #[test] fn exec_command_end_success_produces_completed_command_item() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);