diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 0b811c36..c8f362aa 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -797,6 +797,7 @@ dependencies = [ "codex-protocol", "core_test_support", "libc", + "mcp-types", "opentelemetry-appender-tracing", "owo-colors", "predicates", diff --git a/codex-rs/exec/Cargo.toml b/codex-rs/exec/Cargo.toml index e37865b2..27ae067c 100644 --- a/codex-rs/exec/Cargo.toml +++ b/codex-rs/exec/Cargo.toml @@ -58,3 +58,4 @@ tempfile = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } wiremock = { workspace = true } +mcp-types = { workspace = true } diff --git a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs index 7d4f12f7..89c10c34 100644 --- a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs +++ b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs @@ -17,6 +17,8 @@ use crate::exec_events::FileUpdateChange; use crate::exec_events::ItemCompletedEvent; use crate::exec_events::ItemStartedEvent; use crate::exec_events::ItemUpdatedEvent; +use crate::exec_events::McpToolCallItem; +use crate::exec_events::McpToolCallStatus; use crate::exec_events::PatchApplyStatus; use crate::exec_events::PatchChangeKind; use crate::exec_events::ReasoningItem; @@ -37,6 +39,8 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::FileChange; +use codex_core::protocol::McpToolCallBeginEvent; +use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; @@ -54,6 +58,7 @@ pub struct ExperimentalEventProcessorWithJsonOutput { // Tracks the todo list for the current turn (at most one per turn). running_todo_list: Option, last_total_token_usage: Option, + running_mcp_tool_calls: HashMap, last_critical_error: Option, } @@ -69,6 +74,13 @@ struct RunningTodoList { items: Vec, } +#[derive(Debug, Clone)] +struct RunningMcpToolCall { + server: String, + tool: String, + item_id: String, +} + impl ExperimentalEventProcessorWithJsonOutput { pub fn new(last_message_path: Option) -> Self { Self { @@ -78,6 +90,7 @@ impl ExperimentalEventProcessorWithJsonOutput { running_patch_applies: HashMap::new(), running_todo_list: None, last_total_token_usage: None, + running_mcp_tool_calls: HashMap::new(), last_critical_error: None, } } @@ -89,6 +102,8 @@ impl ExperimentalEventProcessorWithJsonOutput { EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev), EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev), EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev), + EventMsg::McpToolCallBegin(ev) => self.handle_mcp_tool_call_begin(ev), + EventMsg::McpToolCallEnd(ev) => self.handle_mcp_tool_call_end(ev), EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev), EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev), EventMsg::TokenCount(ev) => { @@ -193,6 +208,68 @@ impl ExperimentalEventProcessorWithJsonOutput { vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] } + fn handle_mcp_tool_call_begin(&mut self, ev: &McpToolCallBeginEvent) -> Vec { + let item_id = self.get_next_item_id(); + let server = ev.invocation.server.clone(); + let tool = ev.invocation.tool.clone(); + + self.running_mcp_tool_calls.insert( + ev.call_id.clone(), + RunningMcpToolCall { + server: server.clone(), + tool: tool.clone(), + item_id: item_id.clone(), + }, + ); + + let item = ConversationItem { + id: item_id, + details: ConversationItemDetails::McpToolCall(McpToolCallItem { + server, + tool, + status: McpToolCallStatus::InProgress, + }), + }; + + vec![ConversationEvent::ItemStarted(ItemStartedEvent { item })] + } + + fn handle_mcp_tool_call_end(&mut self, ev: &McpToolCallEndEvent) -> Vec { + let status = if ev.is_success() { + McpToolCallStatus::Completed + } else { + McpToolCallStatus::Failed + }; + + let (server, tool, item_id) = match self.running_mcp_tool_calls.remove(&ev.call_id) { + Some(running) => (running.server, running.tool, running.item_id), + None => { + warn!( + call_id = ev.call_id, + "Received McpToolCallEnd without begin; synthesizing new item" + ); + ( + ev.invocation.server.clone(), + ev.invocation.tool.clone(), + self.get_next_item_id(), + ) + } + }; + + let item = ConversationItem { + id: item_id, + details: ConversationItemDetails::McpToolCall(McpToolCallItem { + server, + tool, + status, + }), + }; + + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item, + })] + } + fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec { self.running_patch_applies .insert(ev.call_id.clone(), ev.clone()); diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index f6f38d32..198be44a 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -6,6 +6,9 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::FileChange; +use codex_core::protocol::McpInvocation; +use codex_core::protocol::McpToolCallBeginEvent; +use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; @@ -19,6 +22,8 @@ use codex_exec::exec_events::ConversationItemDetails; use codex_exec::exec_events::ItemCompletedEvent; use codex_exec::exec_events::ItemStartedEvent; use codex_exec::exec_events::ItemUpdatedEvent; +use codex_exec::exec_events::McpToolCallItem; +use codex_exec::exec_events::McpToolCallStatus; use codex_exec::exec_events::PatchApplyStatus; use codex_exec::exec_events::PatchChangeKind; use codex_exec::exec_events::ReasoningItem; @@ -30,6 +35,7 @@ use codex_exec::exec_events::TurnFailedEvent; use codex_exec::exec_events::TurnStartedEvent; use codex_exec::exec_events::Usage; use codex_exec::experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput; +use mcp_types::CallToolResult; use pretty_assertions::assert_eq; use std::path::PathBuf; use std::time::Duration; @@ -207,6 +213,109 @@ fn plan_update_emits_todo_list_started_updated_and_completed() { ); } +#[test] +fn mcp_tool_call_begin_and_end_emit_item_events() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + let invocation = McpInvocation { + server: "server_a".to_string(), + tool: "tool_x".to_string(), + arguments: None, + }; + + let begin = event( + "m1", + EventMsg::McpToolCallBegin(McpToolCallBeginEvent { + call_id: "call-1".to_string(), + invocation: invocation.clone(), + }), + ); + let begin_events = ep.collect_conversation_events(&begin); + assert_eq!( + begin_events, + vec![ConversationEvent::ItemStarted(ItemStartedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::McpToolCall(McpToolCallItem { + server: "server_a".to_string(), + tool: "tool_x".to_string(), + status: McpToolCallStatus::InProgress, + }), + }, + })] + ); + + let end = event( + "m2", + EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id: "call-1".to_string(), + invocation, + duration: Duration::from_secs(1), + result: Ok(CallToolResult { + content: Vec::new(), + is_error: None, + structured_content: None, + }), + }), + ); + let end_events = ep.collect_conversation_events(&end); + assert_eq!( + end_events, + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::McpToolCall(McpToolCallItem { + server: "server_a".to_string(), + tool: "tool_x".to_string(), + status: McpToolCallStatus::Completed, + }), + }, + })] + ); +} + +#[test] +fn mcp_tool_call_failure_sets_failed_status() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + let invocation = McpInvocation { + server: "server_b".to_string(), + tool: "tool_y".to_string(), + arguments: None, + }; + + let begin = event( + "m3", + EventMsg::McpToolCallBegin(McpToolCallBeginEvent { + call_id: "call-2".to_string(), + invocation: invocation.clone(), + }), + ); + ep.collect_conversation_events(&begin); + + let end = event( + "m4", + EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id: "call-2".to_string(), + invocation, + duration: Duration::from_millis(5), + result: Err("tool exploded".to_string()), + }), + ); + let events = ep.collect_conversation_events(&end); + assert_eq!( + events, + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::McpToolCall(McpToolCallItem { + server: "server_b".to_string(), + tool: "tool_y".to_string(), + status: McpToolCallStatus::Failed, + }), + }, + })] + ); +} + #[test] fn plan_update_after_complete_starts_new_todo_list_with_new_id() { use codex_core::plan_tool::PlanItemArg;