diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs index 0681111e..f52071d4 100644 --- a/codex-rs/exec/src/exec_events.rs +++ b/codex-rs/exec/src/exec_events.rs @@ -2,10 +2,10 @@ use serde::Deserialize; use serde::Serialize; use ts_rs::TS; -/// Top-level events emitted on the Codex Exec conversation stream. +/// Top-level events emitted on the Codex Exec thread stream. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] #[serde(tag = "type")] -pub enum ConversationEvent { +pub enum ThreadEvent { #[serde(rename = "thread.started")] ThreadStarted(ThreadStartedEvent), #[serde(rename = "turn.started")] @@ -21,7 +21,7 @@ pub enum ConversationEvent { #[serde(rename = "item.completed")] ItemCompleted(ItemCompletedEvent), #[serde(rename = "error")] - Error(ConversationErrorEvent), + Error(ThreadErrorEvent), } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] @@ -39,7 +39,7 @@ pub struct TurnCompletedEvent { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct TurnFailedEvent { - pub error: ConversationErrorEvent, + pub error: ThreadErrorEvent, } /// Minimal usage summary for a turn. @@ -52,37 +52,37 @@ pub struct Usage { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct ItemStartedEvent { - pub item: ConversationItem, + pub item: ThreadItem, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct ItemCompletedEvent { - pub item: ConversationItem, + pub item: ThreadItem, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct ItemUpdatedEvent { - pub item: ConversationItem, + pub item: ThreadItem, } /// Fatal error emitted by the stream. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] -pub struct ConversationErrorEvent { +pub struct ThreadErrorEvent { pub message: String, } -/// Canonical representation of a conversation item and its domain-specific payload. +/// Canonical representation of a thread item and its domain-specific payload. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] -pub struct ConversationItem { +pub struct ThreadItem { pub id: String, #[serde(flatten)] - pub details: ConversationItemDetails, + pub details: ThreadItemDetails, } -/// Typed payloads for each supported conversation item type. +/// Typed payloads for each supported thread item type. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] #[serde(tag = "item_type", rename_all = "snake_case")] -pub enum ConversationItemDetails { +pub enum ThreadItemDetails { AssistantMessage(AssistantMessageItem), Reasoning(ReasoningItem), CommandExecution(CommandExecutionItem), @@ -93,7 +93,7 @@ pub enum ConversationItemDetails { Error(ErrorItem), } -/// Session conversation metadata. +/// Session metadata. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct SessionItem { pub session_id: String, diff --git a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs index 89c10c34..164ebece 100644 --- a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs +++ b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs @@ -8,10 +8,6 @@ use crate::event_processor::handle_last_message; use crate::exec_events::AssistantMessageItem; use crate::exec_events::CommandExecutionItem; use crate::exec_events::CommandExecutionStatus; -use crate::exec_events::ConversationErrorEvent; -use crate::exec_events::ConversationEvent; -use crate::exec_events::ConversationItem; -use crate::exec_events::ConversationItemDetails; use crate::exec_events::FileChangeItem; use crate::exec_events::FileUpdateChange; use crate::exec_events::ItemCompletedEvent; @@ -22,6 +18,10 @@ use crate::exec_events::McpToolCallStatus; use crate::exec_events::PatchApplyStatus; use crate::exec_events::PatchChangeKind; use crate::exec_events::ReasoningItem; +use crate::exec_events::ThreadErrorEvent; +use crate::exec_events::ThreadEvent; +use crate::exec_events::ThreadItem; +use crate::exec_events::ThreadItemDetails; use crate::exec_events::ThreadStartedEvent; use crate::exec_events::TodoItem; use crate::exec_events::TodoListItem; @@ -59,7 +59,7 @@ pub struct ExperimentalEventProcessorWithJsonOutput { running_todo_list: Option, last_total_token_usage: Option, running_mcp_tool_calls: HashMap, - last_critical_error: Option, + last_critical_error: Option, } #[derive(Debug, Clone)] @@ -95,7 +95,7 @@ impl ExperimentalEventProcessorWithJsonOutput { } } - pub fn collect_conversation_events(&mut self, event: &Event) -> Vec { + pub fn collect_thread_events(&mut self, event: &Event) -> Vec { match &event.msg { EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev), EventMsg::AgentMessage(ev) => self.handle_agent_message(ev), @@ -115,13 +115,13 @@ impl ExperimentalEventProcessorWithJsonOutput { EventMsg::TaskStarted(ev) => self.handle_task_started(ev), EventMsg::TaskComplete(_) => self.handle_task_complete(), EventMsg::Error(ev) => { - let error = ConversationErrorEvent { + let error = ThreadErrorEvent { message: ev.message.clone(), }; self.last_critical_error = Some(error.clone()); - vec![ConversationEvent::Error(error)] + vec![ThreadEvent::Error(error)] } - EventMsg::StreamError(ev) => vec![ConversationEvent::Error(ConversationErrorEvent { + EventMsg::StreamError(ev) => vec![ThreadEvent::Error(ThreadErrorEvent { message: ev.message.clone(), })], EventMsg::PlanUpdate(ev) => self.handle_plan_update(ev), @@ -137,43 +137,36 @@ impl ExperimentalEventProcessorWithJsonOutput { ) } - fn handle_session_configured( - &self, - payload: &SessionConfiguredEvent, - ) -> Vec { - vec![ConversationEvent::ThreadStarted(ThreadStartedEvent { + fn handle_session_configured(&self, payload: &SessionConfiguredEvent) -> Vec { + vec![ThreadEvent::ThreadStarted(ThreadStartedEvent { thread_id: payload.session_id.to_string(), })] } - fn handle_agent_message(&self, payload: &AgentMessageEvent) -> Vec { - let item = ConversationItem { + fn handle_agent_message(&self, payload: &AgentMessageEvent) -> Vec { + let item = ThreadItem { id: self.get_next_item_id(), - details: ConversationItemDetails::AssistantMessage(AssistantMessageItem { + details: ThreadItemDetails::AssistantMessage(AssistantMessageItem { text: payload.message.clone(), }), }; - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item, - })] + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })] } - fn handle_reasoning_event(&self, ev: &AgentReasoningEvent) -> Vec { - let item = ConversationItem { + fn handle_reasoning_event(&self, ev: &AgentReasoningEvent) -> Vec { + let item = ThreadItem { id: self.get_next_item_id(), - details: ConversationItemDetails::Reasoning(ReasoningItem { + details: ThreadItemDetails::Reasoning(ReasoningItem { text: ev.text.clone(), }), }; - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item, - })] + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })] } - fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec { + fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec { let item_id = self.get_next_item_id(); let command_string = match shlex::try_join(ev.command.iter().map(String::as_str)) { @@ -195,9 +188,9 @@ impl ExperimentalEventProcessorWithJsonOutput { }, ); - let item = ConversationItem { + let item = ThreadItem { id: item_id, - details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + details: ThreadItemDetails::CommandExecution(CommandExecutionItem { command: command_string, aggregated_output: String::new(), exit_code: None, @@ -205,10 +198,10 @@ impl ExperimentalEventProcessorWithJsonOutput { }), }; - vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] + vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })] } - fn handle_mcp_tool_call_begin(&mut self, ev: &McpToolCallBeginEvent) -> Vec { + fn handle_mcp_tool_call_begin(&mut self, ev: &McpToolCallBeginEvent) -> Vec { let item_id = self.get_next_item_id(); let server = ev.invocation.server.clone(); let tool = ev.invocation.tool.clone(); @@ -222,19 +215,19 @@ impl ExperimentalEventProcessorWithJsonOutput { }, ); - let item = ConversationItem { + let item = ThreadItem { id: item_id, - details: ConversationItemDetails::McpToolCall(McpToolCallItem { + details: ThreadItemDetails::McpToolCall(McpToolCallItem { server, tool, status: McpToolCallStatus::InProgress, }), }; - vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] + vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })] } - fn handle_mcp_tool_call_end(&mut self, ev: &McpToolCallEndEvent) -> Vec { + fn handle_mcp_tool_call_end(&mut self, ev: &McpToolCallEndEvent) -> Vec { let status = if ev.is_success() { McpToolCallStatus::Completed } else { @@ -256,21 +249,19 @@ impl ExperimentalEventProcessorWithJsonOutput { } }; - let item = ConversationItem { + let item = ThreadItem { id: item_id, - details: ConversationItemDetails::McpToolCall(McpToolCallItem { + details: ThreadItemDetails::McpToolCall(McpToolCallItem { server, tool, status, }), }; - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item, - })] + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })] } - fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec { + fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec { self.running_patch_applies .insert(ev.call_id.clone(), ev.clone()); @@ -285,17 +276,17 @@ impl ExperimentalEventProcessorWithJsonOutput { } } - fn handle_patch_apply_end(&mut self, ev: &PatchApplyEndEvent) -> Vec { + fn handle_patch_apply_end(&mut self, ev: &PatchApplyEndEvent) -> Vec { if let Some(running_patch_apply) = self.running_patch_applies.remove(&ev.call_id) { let status = if ev.success { PatchApplyStatus::Completed } else { PatchApplyStatus::Failed }; - let item = ConversationItem { + let item = ThreadItem { id: self.get_next_item_id(), - details: ConversationItemDetails::FileChange(FileChangeItem { + details: ThreadItemDetails::FileChange(FileChangeItem { changes: running_patch_apply .changes .iter() @@ -308,15 +299,13 @@ impl ExperimentalEventProcessorWithJsonOutput { }), }; - return vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item, - })]; + return vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]; } Vec::new() } - fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec { + fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec { let Some(RunningCommand { command, item_id }) = self.running_commands.remove(&ev.call_id) else { warn!( @@ -330,10 +319,10 @@ impl ExperimentalEventProcessorWithJsonOutput { } else { CommandExecutionStatus::Failed }; - let item = ConversationItem { + let item = ThreadItem { id: item_id, - details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + details: ThreadItemDetails::CommandExecution(CommandExecutionItem { command, aggregated_output: ev.aggregated_output.clone(), exit_code: Some(ev.exit_code), @@ -341,9 +330,7 @@ impl ExperimentalEventProcessorWithJsonOutput { }), }; - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item, - })] + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })] } fn todo_items_from_plan(&self, args: &UpdatePlanArgs) -> Vec { @@ -356,16 +343,16 @@ impl ExperimentalEventProcessorWithJsonOutput { .collect() } - fn handle_plan_update(&mut self, args: &UpdatePlanArgs) -> Vec { + fn handle_plan_update(&mut self, args: &UpdatePlanArgs) -> Vec { let items = self.todo_items_from_plan(args); if let Some(running) = &mut self.running_todo_list { running.items = items.clone(); - let item = ConversationItem { + let item = ThreadItem { id: running.item_id.clone(), - details: ConversationItemDetails::TodoList(TodoListItem { items }), + details: ThreadItemDetails::TodoList(TodoListItem { items }), }; - return vec![ConversationEvent::ItemUpdated(ItemUpdatedEvent { item })]; + return vec![ThreadEvent::ItemUpdated(ItemUpdatedEvent { item })]; } let item_id = self.get_next_item_id(); @@ -373,19 +360,19 @@ impl ExperimentalEventProcessorWithJsonOutput { item_id: item_id.clone(), items: items.clone(), }); - let item = ConversationItem { + let item = ThreadItem { id: item_id, - details: ConversationItemDetails::TodoList(TodoListItem { items }), + details: ThreadItemDetails::TodoList(TodoListItem { items }), }; - vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] + vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })] } - fn handle_task_started(&mut self, _: &TaskStartedEvent) -> Vec { + fn handle_task_started(&mut self, _: &TaskStartedEvent) -> Vec { self.last_critical_error = None; - vec![ConversationEvent::TurnStarted(TurnStartedEvent {})] + vec![ThreadEvent::TurnStarted(TurnStartedEvent {})] } - fn handle_task_complete(&mut self) -> Vec { + fn handle_task_complete(&mut self) -> Vec { let usage = if let Some(u) = &self.last_total_token_usage { Usage { input_tokens: u.input_tokens, @@ -399,23 +386,19 @@ impl ExperimentalEventProcessorWithJsonOutput { let mut items = Vec::new(); if let Some(running) = self.running_todo_list.take() { - let item = ConversationItem { + let item = ThreadItem { id: running.item_id, - details: ConversationItemDetails::TodoList(TodoListItem { + details: ThreadItemDetails::TodoList(TodoListItem { items: running.items, }), }; - items.push(ConversationEvent::ItemCompleted(ItemCompletedEvent { - item, - })); + items.push(ThreadEvent::ItemCompleted(ItemCompletedEvent { item })); } if let Some(error) = self.last_critical_error.take() { - items.push(ConversationEvent::TurnFailed(TurnFailedEvent { error })); + items.push(ThreadEvent::TurnFailed(TurnFailedEvent { error })); } else { - items.push(ConversationEvent::TurnCompleted(TurnCompletedEvent { - usage, - })); + items.push(ThreadEvent::TurnCompleted(TurnCompletedEvent { usage })); } items @@ -431,7 +414,7 @@ impl EventProcessor for ExperimentalEventProcessorWithJsonOutput { } fn process_event(&mut self, event: Event) -> CodexStatus { - let aggregated = self.collect_conversation_events(&event); + let aggregated = self.collect_thread_events(&event); for conv_event in aggregated { match serde_json::to_string(&conv_event) { Ok(line) => { diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index 198be44a..c4628797 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -15,10 +15,6 @@ use codex_core::protocol::SessionConfiguredEvent; use codex_exec::exec_events::AssistantMessageItem; use codex_exec::exec_events::CommandExecutionItem; use codex_exec::exec_events::CommandExecutionStatus; -use codex_exec::exec_events::ConversationErrorEvent; -use codex_exec::exec_events::ConversationEvent; -use codex_exec::exec_events::ConversationItem; -use codex_exec::exec_events::ConversationItemDetails; use codex_exec::exec_events::ItemCompletedEvent; use codex_exec::exec_events::ItemStartedEvent; use codex_exec::exec_events::ItemUpdatedEvent; @@ -27,6 +23,10 @@ use codex_exec::exec_events::McpToolCallStatus; use codex_exec::exec_events::PatchApplyStatus; use codex_exec::exec_events::PatchChangeKind; use codex_exec::exec_events::ReasoningItem; +use codex_exec::exec_events::ThreadErrorEvent; +use codex_exec::exec_events::ThreadEvent; +use codex_exec::exec_events::ThreadItem; +use codex_exec::exec_events::ThreadItemDetails; use codex_exec::exec_events::ThreadStartedEvent; use codex_exec::exec_events::TodoItem as ExecTodoItem; use codex_exec::exec_events::TodoListItem as ExecTodoListItem; @@ -67,10 +67,10 @@ fn session_configured_produces_thread_started_event() { rollout_path, }), ); - let out = ep.collect_conversation_events(&ev); + let out = ep.collect_thread_events(&ev); assert_eq!( out, - vec![ConversationEvent::ThreadStarted(ThreadStartedEvent { + vec![ThreadEvent::ThreadStarted(ThreadStartedEvent { thread_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(), })] ); @@ -79,17 +79,14 @@ fn session_configured_produces_thread_started_event() { #[test] fn task_started_produces_turn_started_event() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); - let out = ep.collect_conversation_events(&event( + let out = ep.collect_thread_events(&event( "t1", EventMsg::TaskStarted(codex_core::protocol::TaskStartedEvent { model_context_window: Some(32_000), }), )); - assert_eq!( - out, - vec![ConversationEvent::TurnStarted(TurnStartedEvent {})] - ); + assert_eq!(out, vec![ThreadEvent::TurnStarted(TurnStartedEvent {})]); } #[test] @@ -117,13 +114,13 @@ fn plan_update_emits_todo_list_started_updated_and_completed() { ], }), ); - let out_first = ep.collect_conversation_events(&first); + let out_first = ep.collect_thread_events(&first); assert_eq!( out_first, - vec![ConversationEvent::ItemStarted(ItemStartedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemStarted(ItemStartedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::TodoList(ExecTodoListItem { + details: ThreadItemDetails::TodoList(ExecTodoListItem { items: vec![ ExecTodoItem { text: "step one".to_string(), @@ -156,13 +153,13 @@ fn plan_update_emits_todo_list_started_updated_and_completed() { ], }), ); - let out_second = ep.collect_conversation_events(&second); + let out_second = ep.collect_thread_events(&second); assert_eq!( out_second, - vec![ConversationEvent::ItemUpdated(ItemUpdatedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemUpdated(ItemUpdatedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::TodoList(ExecTodoListItem { + details: ThreadItemDetails::TodoList(ExecTodoListItem { items: vec![ ExecTodoItem { text: "step one".to_string(), @@ -185,14 +182,14 @@ fn plan_update_emits_todo_list_started_updated_and_completed() { last_agent_message: None, }), ); - let out_complete = ep.collect_conversation_events(&complete); + let out_complete = ep.collect_thread_events(&complete); assert_eq!( out_complete, vec![ - ConversationEvent::ItemCompleted(ItemCompletedEvent { - item: ConversationItem { + ThreadEvent::ItemCompleted(ItemCompletedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::TodoList(ExecTodoListItem { + details: ThreadItemDetails::TodoList(ExecTodoListItem { items: vec![ ExecTodoItem { text: "step one".to_string(), @@ -206,7 +203,7 @@ fn plan_update_emits_todo_list_started_updated_and_completed() { }), }, }), - ConversationEvent::TurnCompleted(TurnCompletedEvent { + ThreadEvent::TurnCompleted(TurnCompletedEvent { usage: Usage::default(), }), ] @@ -229,13 +226,13 @@ fn mcp_tool_call_begin_and_end_emit_item_events() { invocation: invocation.clone(), }), ); - let begin_events = ep.collect_conversation_events(&begin); + let begin_events = ep.collect_thread_events(&begin); assert_eq!( begin_events, - vec![ConversationEvent::ItemStarted(ItemStartedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemStarted(ItemStartedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::McpToolCall(McpToolCallItem { + details: ThreadItemDetails::McpToolCall(McpToolCallItem { server: "server_a".to_string(), tool: "tool_x".to_string(), status: McpToolCallStatus::InProgress, @@ -257,13 +254,13 @@ fn mcp_tool_call_begin_and_end_emit_item_events() { }), }), ); - let end_events = ep.collect_conversation_events(&end); + let end_events = ep.collect_thread_events(&end); assert_eq!( end_events, - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::McpToolCall(McpToolCallItem { + details: ThreadItemDetails::McpToolCall(McpToolCallItem { server: "server_a".to_string(), tool: "tool_x".to_string(), status: McpToolCallStatus::Completed, @@ -289,7 +286,7 @@ fn mcp_tool_call_failure_sets_failed_status() { invocation: invocation.clone(), }), ); - ep.collect_conversation_events(&begin); + ep.collect_thread_events(&begin); let end = event( "m4", @@ -300,13 +297,13 @@ fn mcp_tool_call_failure_sets_failed_status() { result: Err("tool exploded".to_string()), }), ); - let events = ep.collect_conversation_events(&end); + let events = ep.collect_thread_events(&end); assert_eq!( events, - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::McpToolCall(McpToolCallItem { + details: ThreadItemDetails::McpToolCall(McpToolCallItem { server: "server_b".to_string(), tool: "tool_y".to_string(), status: McpToolCallStatus::Failed, @@ -335,14 +332,14 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() { }], }), ); - let _ = ep.collect_conversation_events(&start); + let _ = ep.collect_thread_events(&start); let complete = event( "t2", EventMsg::TaskComplete(codex_core::protocol::TaskCompleteEvent { last_agent_message: None, }), ); - let _ = ep.collect_conversation_events(&complete); + let _ = ep.collect_thread_events(&complete); // Second turn: a new todo list should have a new id let start_again = event( @@ -355,10 +352,10 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() { }], }), ); - let out = ep.collect_conversation_events(&start_again); + let out = ep.collect_thread_events(&start_again); match &out[0] { - ConversationEvent::ItemStarted(ItemStartedEvent { item }) => { + ThreadEvent::ItemStarted(ItemStartedEvent { item }) => { assert_eq!(&item.id, "item_1"); } other => panic!("unexpected event: {other:?}"), @@ -374,13 +371,13 @@ fn agent_reasoning_produces_item_completed_reasoning() { text: "thinking...".to_string(), }), ); - let out = ep.collect_conversation_events(&ev); + let out = ep.collect_thread_events(&ev); assert_eq!( out, - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::Reasoning(ReasoningItem { + details: ThreadItemDetails::Reasoning(ReasoningItem { text: "thinking...".to_string(), }), }, @@ -397,13 +394,13 @@ fn agent_message_produces_item_completed_assistant_message() { message: "hello".to_string(), }), ); - let out = ep.collect_conversation_events(&ev); + let out = ep.collect_thread_events(&ev); assert_eq!( out, - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::AssistantMessage(AssistantMessageItem { + details: ThreadItemDetails::AssistantMessage(AssistantMessageItem { text: "hello".to_string(), }), }, @@ -414,7 +411,7 @@ fn agent_message_produces_item_completed_assistant_message() { #[test] fn error_event_produces_error() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); - let out = ep.collect_conversation_events(&event( + let out = ep.collect_thread_events(&event( "e1", EventMsg::Error(codex_core::protocol::ErrorEvent { message: "boom".to_string(), @@ -422,7 +419,7 @@ fn error_event_produces_error() { )); assert_eq!( out, - vec![ConversationEvent::Error(ConversationErrorEvent { + vec![ThreadEvent::Error(ThreadErrorEvent { message: "boom".to_string(), })] ); @@ -431,7 +428,7 @@ fn error_event_produces_error() { #[test] fn stream_error_event_produces_error() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); - let out = ep.collect_conversation_events(&event( + let out = ep.collect_thread_events(&event( "e1", EventMsg::StreamError(codex_core::protocol::StreamErrorEvent { message: "retrying".to_string(), @@ -439,7 +436,7 @@ fn stream_error_event_produces_error() { )); assert_eq!( out, - vec![ConversationEvent::Error(ConversationErrorEvent { + vec![ThreadEvent::Error(ThreadErrorEvent { message: "retrying".to_string(), })] ); @@ -456,8 +453,8 @@ fn error_followed_by_task_complete_produces_turn_failed() { }), ); assert_eq!( - ep.collect_conversation_events(&error_event), - vec![ConversationEvent::Error(ConversationErrorEvent { + ep.collect_thread_events(&error_event), + vec![ThreadEvent::Error(ThreadErrorEvent { message: "boom".to_string(), })] ); @@ -469,9 +466,9 @@ fn error_followed_by_task_complete_produces_turn_failed() { }), ); assert_eq!( - ep.collect_conversation_events(&complete_event), - vec![ConversationEvent::TurnFailed(TurnFailedEvent { - error: ConversationErrorEvent { + ep.collect_thread_events(&complete_event), + vec![ThreadEvent::TurnFailed(TurnFailedEvent { + error: ThreadErrorEvent { message: "boom".to_string(), }, })] @@ -492,13 +489,13 @@ fn exec_command_end_success_produces_completed_command_item() { parsed_cmd: Vec::new(), }), ); - let out_begin = ep.collect_conversation_events(&begin); + let out_begin = ep.collect_thread_events(&begin); assert_eq!( out_begin, - vec![ConversationEvent::ItemStarted(ItemStartedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemStarted(ItemStartedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + details: ThreadItemDetails::CommandExecution(CommandExecutionItem { command: "bash -lc 'echo hi'".to_string(), aggregated_output: String::new(), exit_code: None, @@ -521,13 +518,13 @@ fn exec_command_end_success_produces_completed_command_item() { formatted_output: String::new(), }), ); - let out_ok = ep.collect_conversation_events(&end_ok); + let out_ok = ep.collect_thread_events(&end_ok); assert_eq!( out_ok, - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + details: ThreadItemDetails::CommandExecution(CommandExecutionItem { command: "bash -lc 'echo hi'".to_string(), aggregated_output: "hi\n".to_string(), exit_code: Some(0), @@ -553,11 +550,11 @@ fn exec_command_end_failure_produces_failed_command_item() { }), ); assert_eq!( - ep.collect_conversation_events(&begin), - vec![ConversationEvent::ItemStarted(ItemStartedEvent { - item: ConversationItem { + ep.collect_thread_events(&begin), + vec![ThreadEvent::ItemStarted(ItemStartedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + details: ThreadItemDetails::CommandExecution(CommandExecutionItem { command: "sh -c 'exit 1'".to_string(), aggregated_output: String::new(), exit_code: None, @@ -580,13 +577,13 @@ fn exec_command_end_failure_produces_failed_command_item() { formatted_output: String::new(), }), ); - let out_fail = ep.collect_conversation_events(&end_fail); + let out_fail = ep.collect_thread_events(&end_fail); assert_eq!( out_fail, - vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { - item: ConversationItem { + vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { + item: ThreadItem { id: "item_0".to_string(), - details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + details: ThreadItemDetails::CommandExecution(CommandExecutionItem { command: "sh -c 'exit 1'".to_string(), aggregated_output: String::new(), exit_code: Some(1), @@ -601,7 +598,7 @@ fn exec_command_end_failure_produces_failed_command_item() { fn exec_command_end_without_begin_is_ignored() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); - // End event arrives without a prior Begin; should produce no conversation events. + // End event arrives without a prior Begin; should produce no thread events. let end_only = event( "c1", EventMsg::ExecCommandEnd(ExecCommandEndEvent { @@ -614,7 +611,7 @@ fn exec_command_end_without_begin_is_ignored() { formatted_output: String::new(), }), ); - let out = ep.collect_conversation_events(&end_only); + let out = ep.collect_thread_events(&end_only); assert!(out.is_empty()); } @@ -653,7 +650,7 @@ fn patch_apply_success_produces_item_completed_patchapply() { changes: changes.clone(), }), ); - let out_begin = ep.collect_conversation_events(&begin); + let out_begin = ep.collect_thread_events(&begin); assert!(out_begin.is_empty()); // End (success) -> item.completed (item_0) @@ -666,15 +663,15 @@ fn patch_apply_success_produces_item_completed_patchapply() { success: true, }), ); - let out_end = ep.collect_conversation_events(&end); + let out_end = ep.collect_thread_events(&end); assert_eq!(out_end.len(), 1); // Validate structure without relying on HashMap iteration order match &out_end[0] { - ConversationEvent::ItemCompleted(ItemCompletedEvent { item }) => { + ThreadEvent::ItemCompleted(ItemCompletedEvent { item }) => { assert_eq!(&item.id, "item_0"); match &item.details { - ConversationItemDetails::FileChange(file_update) => { + ThreadItemDetails::FileChange(file_update) => { assert_eq!(file_update.status, PatchApplyStatus::Completed); let mut actual: Vec<(String, PatchChangeKind)> = file_update @@ -722,7 +719,7 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() { changes: changes.clone(), }), ); - assert!(ep.collect_conversation_events(&begin).is_empty()); + assert!(ep.collect_thread_events(&begin).is_empty()); // End (failure) -> item.completed (item_0) with Failed status let end = event( @@ -734,14 +731,14 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() { success: false, }), ); - let out_end = ep.collect_conversation_events(&end); + let out_end = ep.collect_thread_events(&end); assert_eq!(out_end.len(), 1); match &out_end[0] { - ConversationEvent::ItemCompleted(ItemCompletedEvent { item }) => { + ThreadEvent::ItemCompleted(ItemCompletedEvent { item }) => { assert_eq!(&item.id, "item_0"); match &item.details { - ConversationItemDetails::FileChange(file_update) => { + ThreadItemDetails::FileChange(file_update) => { assert_eq!(file_update.status, PatchApplyStatus::Failed); assert_eq!(file_update.changes.len(), 1); assert_eq!(file_update.changes[0].path, "file.txt".to_string()); @@ -778,10 +775,7 @@ fn task_complete_produces_turn_completed_with_usage() { rate_limits: None, }), ); - assert!( - ep.collect_conversation_events(&token_count_event) - .is_empty() - ); + assert!(ep.collect_thread_events(&token_count_event).is_empty()); // Then TaskComplete should produce turn.completed with the captured usage. let complete_event = event( @@ -790,10 +784,10 @@ fn task_complete_produces_turn_completed_with_usage() { last_agent_message: Some("done".to_string()), }), ); - let out = ep.collect_conversation_events(&complete_event); + let out = ep.collect_thread_events(&complete_event); assert_eq!( out, - vec![ConversationEvent::TurnCompleted(TurnCompletedEvent { + vec![ThreadEvent::TurnCompleted(TurnCompletedEvent { usage: Usage { input_tokens: 1200, cached_input_tokens: 200, diff --git a/sdk/typescript/samples/basic_streaming.ts b/sdk/typescript/samples/basic_streaming.ts index 41499b3e..801b52e7 100755 --- a/sdk/typescript/samples/basic_streaming.ts +++ b/sdk/typescript/samples/basic_streaming.ts @@ -4,7 +4,7 @@ import { createInterface } from "node:readline/promises"; import { stdin as input, stdout as output } from "node:process"; import { Codex } from "@openai/codex-sdk"; -import type { ConversationEvent, ConversationItem } from "@openai/codex-sdk"; +import type { ThreadEvent, ThreadItem } from "@openai/codex-sdk"; import path from "node:path"; const executablePath = @@ -15,7 +15,7 @@ const codex = new Codex({ executablePath }); const thread = codex.startThread(); const rl = createInterface({ input, output }); -const handleItemCompleted = (item: ConversationItem): void => { +const handleItemCompleted = (item: ThreadItem): void => { switch (item.item_type) { case "assistant_message": console.log(`Assistant: ${item.text}`); @@ -37,7 +37,7 @@ const handleItemCompleted = (item: ConversationItem): void => { } }; -const handleItemUpdated = (item: ConversationItem): void => { +const handleItemUpdated = (item: ThreadItem): void => { switch (item.item_type) { case "todo_list": { console.log(`Todo:`); @@ -49,7 +49,7 @@ const handleItemUpdated = (item: ConversationItem): void => { } }; -const handleEvent = (event: ConversationEvent): void => { +const handleEvent = (event: ThreadEvent): void => { switch (event.type) { case "item.completed": handleItemCompleted(event.item); @@ -63,6 +63,9 @@ const handleEvent = (event: ConversationEvent): void => { `Used ${event.usage.input_tokens} input tokens, ${event.usage.cached_input_tokens} cached input tokens, ${event.usage.output_tokens} output tokens.`, ); break; + case "turn.failed": + console.error(`Turn failed: ${event.error.message}`); + break; } }; diff --git a/sdk/typescript/src/events.ts b/sdk/typescript/src/events.ts index 89402bc7..344e348a 100644 --- a/sdk/typescript/src/events.ts +++ b/sdk/typescript/src/events.ts @@ -1,10 +1,10 @@ // based on event types from codex-rs/exec/src/exec_events.rs -import type { ConversationItem } from "./items"; +import type { ThreadItem } from "./items"; -export type SessionCreatedEvent = { - type: "session.created"; - session_id: string; +export type ThreadStartedEvent = { + type: "thread.started"; + thread_id: string; }; export type TurnStartedEvent = { @@ -22,31 +22,41 @@ export type TurnCompletedEvent = { usage: Usage; }; +export type TurnFailedEvent = { + type: "turn.failed"; + error: ThreadError; +}; + export type ItemStartedEvent = { type: "item.started"; - item: ConversationItem; + item: ThreadItem; }; export type ItemUpdatedEvent = { type: "item.updated"; - item: ConversationItem; + item: ThreadItem; }; export type ItemCompletedEvent = { type: "item.completed"; - item: ConversationItem; + item: ThreadItem; }; -export type ConversationErrorEvent = { +export type ThreadError = { + message: string; +}; + +export type ThreadErrorEvent = { type: "error"; message: string; }; -export type ConversationEvent = - | SessionCreatedEvent +export type ThreadEvent = + | ThreadStartedEvent | TurnStartedEvent | TurnCompletedEvent + | TurnFailedEvent | ItemStartedEvent | ItemUpdatedEvent | ItemCompletedEvent - | ConversationErrorEvent; + | ThreadErrorEvent; diff --git a/sdk/typescript/src/exec.ts b/sdk/typescript/src/exec.ts index 9a078aa9..0369f867 100644 --- a/sdk/typescript/src/exec.ts +++ b/sdk/typescript/src/exec.ts @@ -6,7 +6,7 @@ export type CodexExecArgs = { baseUrl?: string; apiKey?: string; - sessionId?: string | null; + threadId?: string | null; }; export class CodexExec { @@ -17,8 +17,8 @@ export class CodexExec { async *run(args: CodexExecArgs): AsyncGenerator { const commandArgs: string[] = ["exec", "--experimental-json"]; - if (args.sessionId) { - commandArgs.push("resume", args.sessionId, args.input); + if (args.threadId) { + commandArgs.push("resume", args.threadId, args.input); } else { commandArgs.push(args.input); } diff --git a/sdk/typescript/src/index.ts b/sdk/typescript/src/index.ts index f46471fd..02bbfa1b 100644 --- a/sdk/typescript/src/index.ts +++ b/sdk/typescript/src/index.ts @@ -1,15 +1,17 @@ export type { - ConversationEvent, - SessionCreatedEvent, + ThreadEvent, + ThreadStartedEvent, TurnStartedEvent, TurnCompletedEvent, + TurnFailedEvent, ItemStartedEvent, ItemUpdatedEvent, ItemCompletedEvent, - ConversationErrorEvent, + ThreadError, + ThreadErrorEvent, } from "./events"; export type { - ConversationItem, + ThreadItem, AssistantMessageItem, ReasoningItem, CommandExecutionItem, diff --git a/sdk/typescript/src/items.ts b/sdk/typescript/src/items.ts index 82fb0bb9..546dd66f 100644 --- a/sdk/typescript/src/items.ts +++ b/sdk/typescript/src/items.ts @@ -78,17 +78,7 @@ export type SessionItem = { session_id: string; }; -export type ConversationItem = - | AssistantMessageItem - | ReasoningItem - | CommandExecutionItem - | FileChangeItem - | McpToolCallItem - | WebSearchItem - | TodoListItem - | ErrorItem; - -export type ConversationItemDetails = +export type ThreadItem = | AssistantMessageItem | ReasoningItem | CommandExecutionItem diff --git a/sdk/typescript/src/thread.ts b/sdk/typescript/src/thread.ts index 2091f677..cd19a942 100644 --- a/sdk/typescript/src/thread.ts +++ b/sdk/typescript/src/thread.ts @@ -1,15 +1,15 @@ import { CodexOptions } from "./codexOptions"; -import { ConversationEvent } from "./events"; +import { ThreadEvent } from "./events"; import { CodexExec } from "./exec"; -import { ConversationItem } from "./items"; +import { ThreadItem } from "./items"; export type RunResult = { - items: ConversationItem[]; + items: ThreadItem[]; finalResponse: string; }; export type RunStreamedResult = { - events: AsyncGenerator; + events: AsyncGenerator; }; export type Input = string; @@ -29,17 +29,17 @@ export class Thread { return { events: this.runStreamedInternal(input) }; } - private async *runStreamedInternal(input: string): AsyncGenerator { + private async *runStreamedInternal(input: string): AsyncGenerator { const generator = this.exec.run({ input, baseUrl: this.options.baseUrl, apiKey: this.options.apiKey, - sessionId: this.id, + threadId: this.id, }); for await (const item of generator) { - const parsed = JSON.parse(item) as ConversationEvent; - if (parsed.type === "session.created") { - this.id = parsed.session_id; + const parsed = JSON.parse(item) as ThreadEvent; + if (parsed.type === "thread.started") { + this.id = parsed.thread_id; } yield parsed; } @@ -47,7 +47,7 @@ export class Thread { async run(input: string): Promise { const generator = this.runStreamedInternal(input); - const items: ConversationItem[] = []; + const items: ThreadItem[] = []; let finalResponse: string = ""; for await (const event of generator) { if (event.type === "item.completed") { diff --git a/sdk/typescript/tests/run.test.ts b/sdk/typescript/tests/run.test.ts index 9f7fb792..7f6c57c2 100644 --- a/sdk/typescript/tests/run.test.ts +++ b/sdk/typescript/tests/run.test.ts @@ -15,7 +15,7 @@ import { const codexExecPath = path.join(process.cwd(), "..", "..", "codex-rs", "target", "debug", "codex"); describe("Codex", () => { - it("returns session events", async () => { + it("returns thread events", async () => { const { url, close } = await startResponsesTestProxy({ statusCode: 200, responseBodies: [sse(responseStarted(), assistantMessage("Hi!"), responseCompleted())], @@ -65,7 +65,7 @@ describe("Codex", () => { await thread.run("first input"); await thread.run("second input"); - // Check second request continues conversation + // Check second request continues the same thread expect(requests.length).toBeGreaterThanOrEqual(2); const secondRequest = requests[1]; expect(secondRequest).toBeDefined(); diff --git a/sdk/typescript/tests/runStreamed.test.ts b/sdk/typescript/tests/runStreamed.test.ts index cb10435a..85b5cf38 100644 --- a/sdk/typescript/tests/runStreamed.test.ts +++ b/sdk/typescript/tests/runStreamed.test.ts @@ -3,7 +3,7 @@ import path from "path"; import { describe, expect, it } from "@jest/globals"; import { Codex } from "../src/codex"; -import { ConversationEvent } from "../src/index"; +import { ThreadEvent } from "../src/index"; import { assistantMessage, @@ -16,7 +16,7 @@ import { const codexExecPath = path.join(process.cwd(), "..", "..", "codex-rs", "target", "debug", "codex"); describe("Codex", () => { - it("returns session events", async () => { + it("returns thread events", async () => { const { url, close } = await startResponsesTestProxy({ statusCode: 200, responseBodies: [sse(responseStarted(), assistantMessage("Hi!"), responseCompleted())], @@ -28,15 +28,15 @@ describe("Codex", () => { const thread = client.startThread(); const result = await thread.runStreamed("Hello, world!"); - const events: ConversationEvent[] = []; + const events: ThreadEvent[] = []; for await (const event of result.events) { events.push(event); } expect(events).toEqual([ { - type: "session.created", - session_id: expect.any(String), + type: "thread.started", + thread_id: expect.any(String), }, { type: "turn.started", @@ -91,7 +91,7 @@ describe("Codex", () => { const second = await thread.runStreamed("second input"); await drainEvents(second.events); - // Check second request continues conversation + // Check second request continues the same thread expect(requests.length).toBeGreaterThanOrEqual(2); const secondRequest = requests[1]; expect(secondRequest).toBeDefined(); @@ -159,7 +159,7 @@ describe("Codex", () => { }); }); -async function drainEvents(events: AsyncGenerator): Promise { +async function drainEvents(events: AsyncGenerator): Promise { let done = false; do { done = (await events.next()).done ?? false;