From 97ab8fb6101dc5203104117023453862320e91ca Mon Sep 17 00:00:00 2001 From: aibrahim-oai Date: Fri, 1 Aug 2025 15:18:36 -0700 Subject: [PATCH] MCP: add conversation.create tool [Stack 2/2] (#1783) Introduce conversation.create handler (handle_create_conversation) and wire it in MessageProcessor. Stack: Top: #1783 Bottom: #1784 --------- Co-authored-by: Gabriel Peal --- codex-rs/mcp-server/src/conversation_loop.rs | 121 +++++++++++++ codex-rs/mcp-server/src/lib.rs | 1 + codex-rs/mcp-server/src/message_processor.rs | 8 + .../src/tool_handlers/create_conversation.rs | 160 ++++++++++++++++++ codex-rs/mcp-server/src/tool_handlers/mod.rs | 1 + .../mcp-server/tests/common/mcp_process.rs | 36 ++++ .../mcp-server/tests/create_conversation.rs | 128 ++++++++++++++ 7 files changed, 455 insertions(+) create mode 100644 codex-rs/mcp-server/src/conversation_loop.rs create mode 100644 codex-rs/mcp-server/src/tool_handlers/create_conversation.rs create mode 100644 codex-rs/mcp-server/tests/create_conversation.rs diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs new file mode 100644 index 00000000..53427518 --- /dev/null +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -0,0 +1,121 @@ +use std::sync::Arc; + +use crate::exec_approval::handle_exec_approval_request; +use crate::outgoing_message::OutgoingMessageSender; +use crate::outgoing_message::OutgoingNotificationMeta; +use crate::patch_approval::handle_patch_approval_request; +use codex_core::Codex; +use codex_core::protocol::AgentMessageEvent; +use codex_core::protocol::ApplyPatchApprovalRequestEvent; +use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecApprovalRequestEvent; +use mcp_types::RequestId; +use tracing::error; + +pub async fn run_conversation_loop( + codex: Arc, + outgoing: Arc, + request_id: RequestId, +) { + let request_id_str = match &request_id { + RequestId::String(s) => s.clone(), + RequestId::Integer(n) => n.to_string(), + }; + + // Stream events until the task needs to pause for user interaction or + // completes. + loop { + match codex.next_event().await { + Ok(event) => { + outgoing + .send_event_as_notification( + &event, + Some(OutgoingNotificationMeta::new(Some(request_id.clone()))), + ) + .await; + + match event.msg { + EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { + command, + cwd, + call_id, + reason: _, + }) => { + handle_exec_approval_request( + command, + cwd, + outgoing.clone(), + codex.clone(), + request_id.clone(), + request_id_str.clone(), + event.id.clone(), + call_id, + ) + .await; + continue; + } + EventMsg::Error(_) => { + error!("Codex runtime error"); + } + EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { + call_id, + reason, + grant_root, + changes, + }) => { + handle_patch_approval_request( + call_id, + reason, + grant_root, + changes, + outgoing.clone(), + codex.clone(), + request_id.clone(), + request_id_str.clone(), + event.id.clone(), + ) + .await; + continue; + } + EventMsg::TaskComplete(_) => {} + EventMsg::SessionConfigured(_) => { + tracing::error!("unexpected SessionConfigured event"); + } + EventMsg::AgentMessageDelta(_) => { + // TODO: think how we want to support this in the MCP + } + EventMsg::AgentReasoningDelta(_) => { + // TODO: think how we want to support this in the MCP + } + EventMsg::AgentMessage(AgentMessageEvent { .. }) => { + // TODO: think how we want to support this in the MCP + } + EventMsg::TaskStarted + | EventMsg::TokenCount(_) + | EventMsg::AgentReasoning(_) + | EventMsg::McpToolCallBegin(_) + | EventMsg::McpToolCallEnd(_) + | EventMsg::ExecCommandBegin(_) + | EventMsg::ExecCommandEnd(_) + | EventMsg::BackgroundEvent(_) + | EventMsg::ExecCommandOutputDelta(_) + | EventMsg::PatchApplyBegin(_) + | EventMsg::PatchApplyEnd(_) + | EventMsg::GetHistoryEntryResponse(_) + | EventMsg::PlanUpdate(_) + | EventMsg::ShutdownComplete => { + // For now, we do not do anything extra for these + // events. Note that + // send(codex_event_to_notification(&event)) above has + // already dispatched these events as notifications, + // though we may want to do give different treatment to + // individual events in the future. + } + } + } + Err(e) => { + error!("Codex runtime error: {e}"); + } + } + } +} diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index 6b3c0ddb..f6d0838e 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -17,6 +17,7 @@ use tracing_subscriber::EnvFilter; mod codex_tool_config; mod codex_tool_runner; +mod conversation_loop; mod exec_approval; mod json_to_toml; pub mod mcp_protocol; diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 14e9bb38..2f99cda7 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -11,6 +11,7 @@ use crate::mcp_protocol::ToolCallRequestParams; use crate::mcp_protocol::ToolCallResponse; use crate::mcp_protocol::ToolCallResponseResult; use crate::outgoing_message::OutgoingMessageSender; +use crate::tool_handlers::create_conversation::handle_create_conversation; use crate::tool_handlers::send_message::handle_send_message; use codex_core::Codex; @@ -67,6 +68,10 @@ impl MessageProcessor { self.session_map.clone() } + pub(crate) fn outgoing(&self) -> Arc { + self.outgoing.clone() + } + pub(crate) fn running_session_ids(&self) -> Arc>> { self.running_session_ids.clone() } @@ -349,6 +354,9 @@ impl MessageProcessor { } async fn handle_new_tool_calls(&self, request_id: RequestId, params: ToolCallRequestParams) { match params { + ToolCallRequestParams::ConversationCreate(args) => { + handle_create_conversation(self, request_id, args).await; + } ToolCallRequestParams::ConversationSendMessage(args) => { handle_send_message(self, request_id, args).await; } diff --git a/codex-rs/mcp-server/src/tool_handlers/create_conversation.rs b/codex-rs/mcp-server/src/tool_handlers/create_conversation.rs new file mode 100644 index 00000000..28a89651 --- /dev/null +++ b/codex-rs/mcp-server/src/tool_handlers/create_conversation.rs @@ -0,0 +1,160 @@ +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use codex_core::Codex; +use codex_core::codex_wrapper::init_codex; +use codex_core::config::Config as CodexConfig; +use codex_core::config::ConfigOverrides; +use codex_core::protocol::EventMsg; +use codex_core::protocol::SessionConfiguredEvent; +use mcp_types::RequestId; +use tokio::sync::Mutex; +use uuid::Uuid; + +use crate::conversation_loop::run_conversation_loop; +use crate::json_to_toml::json_to_toml; +use crate::mcp_protocol::ConversationCreateArgs; +use crate::mcp_protocol::ConversationCreateResult; +use crate::mcp_protocol::ConversationId; +use crate::mcp_protocol::ToolCallResponseResult; +use crate::message_processor::MessageProcessor; + +pub(crate) async fn handle_create_conversation( + message_processor: &MessageProcessor, + id: RequestId, + args: ConversationCreateArgs, +) { + // Build ConfigOverrides from args + let ConversationCreateArgs { + prompt: _, // not used here; creation only establishes the session + model, + cwd, + approval_policy, + sandbox, + config, + profile, + base_instructions, + } = args; + + // Convert config overrides JSON into CLI-style TOML overrides + let cli_overrides: Vec<(String, toml::Value)> = match config { + Some(v) => match v.as_object() { + Some(map) => map + .into_iter() + .map(|(k, v)| (k.clone(), json_to_toml(v.clone()))) + .collect(), + None => Vec::new(), + }, + None => Vec::new(), + }; + + let overrides = ConfigOverrides { + model: Some(model.clone()), + cwd: Some(PathBuf::from(cwd)), + approval_policy, + sandbox_mode: sandbox, + model_provider: None, + config_profile: profile, + codex_linux_sandbox_exe: None, + base_instructions, + include_plan_tool: None, + }; + + let cfg: CodexConfig = match CodexConfig::load_with_cli_overrides(cli_overrides, overrides) { + Ok(cfg) => cfg, + Err(e) => { + message_processor + .send_response_with_optional_error( + id, + Some(ToolCallResponseResult::ConversationCreate( + ConversationCreateResult::Error { + message: format!("Failed to load config: {e}"), + }, + )), + Some(true), + ) + .await; + return; + } + }; + + // Initialize Codex session + let codex_conversation = match init_codex(cfg).await { + Ok(conv) => conv, + Err(e) => { + message_processor + .send_response_with_optional_error( + id, + Some(ToolCallResponseResult::ConversationCreate( + ConversationCreateResult::Error { + message: format!("Failed to initialize session: {e}"), + }, + )), + Some(true), + ) + .await; + return; + } + }; + + // Expect SessionConfigured; if not, return error. + let EventMsg::SessionConfigured(SessionConfiguredEvent { model, .. }) = + &codex_conversation.session_configured.msg + else { + message_processor + .send_response_with_optional_error( + id, + Some(ToolCallResponseResult::ConversationCreate( + ConversationCreateResult::Error { + message: "Expected SessionConfigured event".to_string(), + }, + )), + Some(true), + ) + .await; + return; + }; + + let effective_model = model.clone(); + + let session_id = codex_conversation.session_id; + let codex_arc = Arc::new(codex_conversation.codex); + + // Store session for future calls + insert_session( + session_id, + codex_arc.clone(), + message_processor.session_map(), + ) + .await; + // Run the conversation loop in the background so this request can return immediately. + let outgoing = message_processor.outgoing(); + let spawn_id = id.clone(); + tokio::spawn(async move { + run_conversation_loop(codex_arc.clone(), outgoing, spawn_id).await; + }); + + // Reply with the new conversation id and effective model + message_processor + .send_response_with_optional_error( + id, + Some(ToolCallResponseResult::ConversationCreate( + ConversationCreateResult::Ok { + conversation_id: ConversationId(session_id), + model: effective_model, + }, + )), + Some(false), + ) + .await; +} + +async fn insert_session( + session_id: Uuid, + codex: Arc, + session_map: Arc>>>, +) { + let mut guard = session_map.lock().await; + guard.insert(session_id, codex); +} diff --git a/codex-rs/mcp-server/src/tool_handlers/mod.rs b/codex-rs/mcp-server/src/tool_handlers/mod.rs index 1907ec64..5863bdc2 100644 --- a/codex-rs/mcp-server/src/tool_handlers/mod.rs +++ b/codex-rs/mcp-server/src/tool_handlers/mod.rs @@ -1 +1,2 @@ +pub(crate) mod create_conversation; pub(crate) mod send_message; diff --git a/codex-rs/mcp-server/tests/common/mcp_process.rs b/codex-rs/mcp-server/tests/common/mcp_process.rs index d7144af4..83cf085c 100644 --- a/codex-rs/mcp-server/tests/common/mcp_process.rs +++ b/codex-rs/mcp-server/tests/common/mcp_process.rs @@ -14,6 +14,7 @@ use assert_cmd::prelude::*; use codex_core::protocol::InputItem; use codex_mcp_server::CodexToolCallParam; use codex_mcp_server::CodexToolCallReplyParam; +use codex_mcp_server::mcp_protocol::ConversationCreateArgs; use codex_mcp_server::mcp_protocol::ConversationId; use codex_mcp_server::mcp_protocol::ConversationSendMessageArgs; use codex_mcp_server::mcp_protocol::ToolCallRequestParams; @@ -200,6 +201,41 @@ impl McpProcess { .await } + pub async fn send_conversation_create_tool_call( + &mut self, + prompt: &str, + model: &str, + cwd: &str, + ) -> anyhow::Result { + let params = ToolCallRequestParams::ConversationCreate(ConversationCreateArgs { + prompt: prompt.to_string(), + model: model.to_string(), + cwd: cwd.to_string(), + approval_policy: None, + sandbox: None, + config: None, + profile: None, + base_instructions: None, + }); + self.send_request( + mcp_types::CallToolRequest::METHOD, + Some(serde_json::to_value(params)?), + ) + .await + } + + pub async fn send_conversation_create_with_args( + &mut self, + args: ConversationCreateArgs, + ) -> anyhow::Result { + let params = ToolCallRequestParams::ConversationCreate(args); + self.send_request( + mcp_types::CallToolRequest::METHOD, + Some(serde_json::to_value(params)?), + ) + .await + } + async fn send_request( &mut self, method: &str, diff --git a/codex-rs/mcp-server/tests/create_conversation.rs b/codex-rs/mcp-server/tests/create_conversation.rs new file mode 100644 index 00000000..41052ea7 --- /dev/null +++ b/codex-rs/mcp-server/tests/create_conversation.rs @@ -0,0 +1,128 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::path::Path; + +use mcp_test_support::McpProcess; +use mcp_test_support::create_final_assistant_message_sse_response; +use mcp_test_support::create_mock_chat_completions_server; +use mcp_types::JSONRPCResponse; +use mcp_types::RequestId; +use pretty_assertions::assert_eq; +use serde_json::json; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_conversation_create_and_send_message_ok() { + // Mock server – we won't strictly rely on it, but provide one to satisfy any model wiring. + let responses = vec![ + create_final_assistant_message_sse_response("Done").expect("build mock assistant message"), + ]; + let server = create_mock_chat_completions_server(responses).await; + + // Temporary Codex home with config pointing at the mock server. + let codex_home = TempDir::new().expect("create temp dir"); + create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml"); + + // Start MCP server process and initialize. + let mut mcp = McpProcess::new(codex_home.path()) + .await + .expect("spawn mcp process"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timeout") + .expect("init failed"); + + // Create a conversation via the new tool. + let req_id = mcp + .send_conversation_create_tool_call("", "o3", "/repo") + .await + .expect("send conversationCreate"); + + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await + .expect("create response timeout") + .expect("create response error"); + + // Structured content must include status=ok, a UUID conversation_id and the model we passed. + let sc = &resp.result["structuredContent"]; + let conv_id = sc["conversation_id"].as_str().expect("uuid string"); + assert!(!conv_id.is_empty()); + assert_eq!(sc["model"], json!("o3")); + + // Now send a message to the created conversation and expect an OK result. + let send_id = mcp + .send_user_message_tool_call("Hello", conv_id) + .await + .expect("send message"); + + let send_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_id)), + ) + .await + .expect("send response timeout") + .expect("send response error"); + assert_eq!( + send_resp.result["structuredContent"], + json!({ "status": "ok" }) + ); + + // avoid race condition by waiting for the mock server to receive the chat.completions request + let deadline = std::time::Instant::now() + DEFAULT_READ_TIMEOUT; + loop { + let requests = server.received_requests().await.unwrap_or_default(); + if !requests.is_empty() { + break; + } + if std::time::Instant::now() >= deadline { + panic!("mock server did not receive the chat.completions request in time"); + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + // Verify the outbound request body matches expectations for Chat Completions. + let request = &server.received_requests().await.unwrap()[0]; + let body = request + .body_json::() + .expect("parse request body as JSON"); + assert_eq!(body["model"], json!("o3")); + assert!(body["stream"].as_bool().unwrap_or(false)); + let messages = body["messages"] + .as_array() + .expect("messages should be array"); + let last = messages.last().expect("at least one message"); + assert_eq!(last["role"], json!("user")); + assert_eq!(last["content"], json!("Hello")); + + drop(server); +} + +// Helper to create a config.toml pointing at the mock model server. +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "danger-full-access" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +}