diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs index 82f15be3..38a43186 100644 --- a/codex-rs/exec/src/exec_events.rs +++ b/codex-rs/exec/src/exec_events.rs @@ -10,30 +10,34 @@ pub enum ConversationEvent { SessionCreated(SessionCreatedEvent), #[serde(rename = "item.started")] ItemStarted(ItemStartedEvent), + #[serde(rename = "item.updated")] + ItemUpdated(ItemUpdatedEvent), #[serde(rename = "item.completed")] ItemCompleted(ItemCompletedEvent), #[serde(rename = "error")] Error(ConversationErrorEvent), } -/// Payload describing a newly created conversation item. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct SessionCreatedEvent { pub session_id: String, } -/// Payload describing the start of an existing conversation item. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct ItemStartedEvent { pub item: ConversationItem, } -/// Payload describing the completion of an existing conversation item. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct ItemCompletedEvent { pub item: ConversationItem, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct ItemUpdatedEvent { + pub item: ConversationItem, +} + /// Fatal error emitted by the stream. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] pub struct ConversationErrorEvent { @@ -58,6 +62,7 @@ pub enum ConversationItemDetails { FileChange(FileChangeItem), McpToolCall(McpToolCallItem), WebSearch(WebSearchItem), + TodoList(TodoListItem), Error(ErrorItem), } @@ -153,3 +158,14 @@ pub struct WebSearchItem { pub struct ErrorItem { pub message: String, } + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct TodoItem { + pub text: String, + pub completed: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct TodoListItem { + pub items: Vec, +} diff --git a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs index ea4ba022..a2eddc17 100644 --- a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs +++ b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs @@ -16,11 +16,16 @@ use crate::exec_events::FileChangeItem; use crate::exec_events::FileUpdateChange; use crate::exec_events::ItemCompletedEvent; use crate::exec_events::ItemStartedEvent; +use crate::exec_events::ItemUpdatedEvent; use crate::exec_events::PatchApplyStatus; use crate::exec_events::PatchChangeKind; use crate::exec_events::ReasoningItem; use crate::exec_events::SessionCreatedEvent; +use crate::exec_events::TodoItem; +use crate::exec_events::TodoListItem; use codex_core::config::Config; +use codex_core::plan_tool::StepStatus; +use codex_core::plan_tool::UpdatePlanArgs; use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::AgentReasoningEvent; use codex_core::protocol::Event; @@ -41,6 +46,8 @@ pub struct ExperimentalEventProcessorWithJsonOutput { // Tracks running commands by call_id, including the associated item id. running_commands: HashMap, running_patch_applies: HashMap, + // Tracks the todo list for the current turn (at most one per turn). + running_todo_list: Option, } #[derive(Debug, Clone)] @@ -49,6 +56,12 @@ struct RunningCommand { item_id: String, } +#[derive(Debug, Clone)] +struct RunningTodoList { + item_id: String, + items: Vec, +} + impl ExperimentalEventProcessorWithJsonOutput { pub fn new(last_message_path: Option) -> Self { Self { @@ -56,6 +69,7 @@ impl ExperimentalEventProcessorWithJsonOutput { next_event_id: AtomicU64::new(0), running_commands: HashMap::new(), running_patch_applies: HashMap::new(), + running_todo_list: None, } } @@ -74,6 +88,8 @@ impl ExperimentalEventProcessorWithJsonOutput { EventMsg::StreamError(ev) => vec![ConversationEvent::Error(ConversationErrorEvent { message: ev.message.clone(), })], + EventMsg::PlanUpdate(ev) => self.handle_plan_update(ev), + EventMsg::TaskComplete(_) => self.handle_task_complete(), _ => Vec::new(), } } @@ -232,6 +248,55 @@ impl ExperimentalEventProcessorWithJsonOutput { item, })] } + + fn todo_items_from_plan(&self, args: &UpdatePlanArgs) -> Vec { + args.plan + .iter() + .map(|p| TodoItem { + text: p.step.clone(), + completed: matches!(p.status, StepStatus::Completed), + }) + .collect() + } + + fn handle_plan_update(&mut self, args: &UpdatePlanArgs) -> Vec { + let items = self.todo_items_from_plan(args); + + if let Some(running) = &mut self.running_todo_list { + running.items = items.clone(); + let item = ConversationItem { + id: running.item_id.clone(), + details: ConversationItemDetails::TodoList(TodoListItem { items }), + }; + return vec![ConversationEvent::ItemUpdated(ItemUpdatedEvent { item })]; + } + + let item_id = self.get_next_item_id(); + self.running_todo_list = Some(RunningTodoList { + item_id: item_id.clone(), + items: items.clone(), + }); + let item = ConversationItem { + id: item_id, + details: ConversationItemDetails::TodoList(TodoListItem { items }), + }; + vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] + } + + fn handle_task_complete(&mut self) -> Vec { + if let Some(running) = self.running_todo_list.take() { + let item = ConversationItem { + id: running.item_id, + details: ConversationItemDetails::TodoList(TodoListItem { + items: running.items, + }), + }; + return vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item, + })]; + } + Vec::new() + } } impl EventProcessor for ExperimentalEventProcessorWithJsonOutput { diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index d8911aae..fcdabce5 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -17,10 +17,13 @@ use codex_exec::exec_events::ConversationItem; use codex_exec::exec_events::ConversationItemDetails; use codex_exec::exec_events::ItemCompletedEvent; use codex_exec::exec_events::ItemStartedEvent; +use codex_exec::exec_events::ItemUpdatedEvent; use codex_exec::exec_events::PatchApplyStatus; use codex_exec::exec_events::PatchChangeKind; use codex_exec::exec_events::ReasoningItem; use codex_exec::exec_events::SessionCreatedEvent; +use codex_exec::exec_events::TodoItem as ExecTodoItem; +use codex_exec::exec_events::TodoListItem as ExecTodoListItem; use codex_exec::experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput; use pretty_assertions::assert_eq; use std::path::PathBuf; @@ -62,6 +65,171 @@ fn session_configured_produces_session_created_event() { ); } +#[test] +fn plan_update_emits_todo_list_started_updated_and_completed() { + use codex_core::plan_tool::PlanItemArg; + use codex_core::plan_tool::StepStatus; + use codex_core::plan_tool::UpdatePlanArgs; + + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + // First plan update => item.started (todo_list) + let first = event( + "p1", + EventMsg::PlanUpdate(UpdatePlanArgs { + explanation: None, + plan: vec![ + PlanItemArg { + step: "step one".to_string(), + status: StepStatus::Pending, + }, + PlanItemArg { + step: "step two".to_string(), + status: StepStatus::InProgress, + }, + ], + }), + ); + let out_first = ep.collect_conversation_events(&first); + assert_eq!( + out_first, + vec![ConversationEvent::ItemStarted(ItemStartedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::TodoList(ExecTodoListItem { + items: vec![ + ExecTodoItem { + text: "step one".to_string(), + completed: false + }, + ExecTodoItem { + text: "step two".to_string(), + completed: false + }, + ], + }), + }, + })] + ); + + // Second plan update in same turn => item.updated (same id) + let second = event( + "p2", + EventMsg::PlanUpdate(UpdatePlanArgs { + explanation: None, + plan: vec![ + PlanItemArg { + step: "step one".to_string(), + status: StepStatus::Completed, + }, + PlanItemArg { + step: "step two".to_string(), + status: StepStatus::InProgress, + }, + ], + }), + ); + let out_second = ep.collect_conversation_events(&second); + assert_eq!( + out_second, + vec![ConversationEvent::ItemUpdated(ItemUpdatedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::TodoList(ExecTodoListItem { + items: vec![ + ExecTodoItem { + text: "step one".to_string(), + completed: true + }, + ExecTodoItem { + text: "step two".to_string(), + completed: false + }, + ], + }), + }, + })] + ); + + // Task completes => item.completed (same id, latest state) + let complete = event( + "p3", + EventMsg::TaskComplete(codex_core::protocol::TaskCompleteEvent { + last_agent_message: None, + }), + ); + let out_complete = ep.collect_conversation_events(&complete); + assert_eq!( + out_complete, + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::TodoList(ExecTodoListItem { + items: vec![ + ExecTodoItem { + text: "step one".to_string(), + completed: true + }, + ExecTodoItem { + text: "step two".to_string(), + completed: false + }, + ], + }), + }, + })] + ); +} + +#[test] +fn plan_update_after_complete_starts_new_todo_list_with_new_id() { + use codex_core::plan_tool::PlanItemArg; + use codex_core::plan_tool::StepStatus; + use codex_core::plan_tool::UpdatePlanArgs; + + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + // First turn: start + complete + let start = event( + "t1", + EventMsg::PlanUpdate(UpdatePlanArgs { + explanation: None, + plan: vec![PlanItemArg { + step: "only".to_string(), + status: StepStatus::Pending, + }], + }), + ); + let _ = ep.collect_conversation_events(&start); + let complete = event( + "t2", + EventMsg::TaskComplete(codex_core::protocol::TaskCompleteEvent { + last_agent_message: None, + }), + ); + let _ = ep.collect_conversation_events(&complete); + + // Second turn: a new todo list should have a new id + let start_again = event( + "t3", + EventMsg::PlanUpdate(UpdatePlanArgs { + explanation: None, + plan: vec![PlanItemArg { + step: "again".to_string(), + status: StepStatus::Pending, + }], + }), + ); + let out = ep.collect_conversation_events(&start_again); + + match &out[0] { + ConversationEvent::ItemStarted(ItemStartedEvent { item }) => { + assert_eq!(&item.id, "item_1"); + } + other => panic!("unexpected event: {other:?}"), + } +} + #[test] fn agent_reasoning_produces_item_completed_reasoning() { let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);