use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use crate::codex_tool_config::CodexToolCallParam; use crate::codex_tool_config::CodexToolCallReplyParam; use crate::codex_tool_config::create_tool_for_codex_tool_call_param; use crate::codex_tool_config::create_tool_for_codex_tool_call_reply_param; use crate::outgoing_message::OutgoingMessageSender; use codex_core::Codex; use codex_core::config::Config as CodexConfig; use mcp_types::CallToolRequestParams; use mcp_types::CallToolResult; use mcp_types::ClientRequest; use mcp_types::ContentBlock; use mcp_types::JSONRPCError; use mcp_types::JSONRPCErrorError; use mcp_types::JSONRPCNotification; use mcp_types::JSONRPCRequest; use mcp_types::JSONRPCResponse; use mcp_types::ListToolsResult; use mcp_types::ModelContextProtocolRequest; use mcp_types::RequestId; use mcp_types::ServerCapabilitiesTools; use mcp_types::ServerNotification; use mcp_types::TextContent; use serde_json::json; use tokio::sync::Mutex; use tokio::task; use uuid::Uuid; pub(crate) struct MessageProcessor { outgoing: Arc, initialized: bool, codex_linux_sandbox_exe: Option, session_map: Arc>>>, } impl MessageProcessor { /// Create a new `MessageProcessor`, retaining a handle to the outgoing /// `Sender` so handlers can enqueue messages to be written to stdout. pub(crate) fn new( outgoing: OutgoingMessageSender, codex_linux_sandbox_exe: Option, ) -> Self { Self { outgoing: Arc::new(outgoing), initialized: false, codex_linux_sandbox_exe, session_map: Arc::new(Mutex::new(HashMap::new())), } } pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) { // Hold on to the ID so we can respond. let request_id = request.id.clone(); let client_request = match ClientRequest::try_from(request) { Ok(client_request) => client_request, Err(e) => { tracing::warn!("Failed to convert request: {e}"); return; } }; // Dispatch to a dedicated handler for each request type. match client_request { ClientRequest::InitializeRequest(params) => { self.handle_initialize(request_id, params).await; } ClientRequest::PingRequest(params) => { self.handle_ping(request_id, params).await; } ClientRequest::ListResourcesRequest(params) => { self.handle_list_resources(params); } ClientRequest::ListResourceTemplatesRequest(params) => { self.handle_list_resource_templates(params); } ClientRequest::ReadResourceRequest(params) => { self.handle_read_resource(params); } ClientRequest::SubscribeRequest(params) => { self.handle_subscribe(params); } ClientRequest::UnsubscribeRequest(params) => { self.handle_unsubscribe(params); } ClientRequest::ListPromptsRequest(params) => { self.handle_list_prompts(params); } ClientRequest::GetPromptRequest(params) => { self.handle_get_prompt(params); } ClientRequest::ListToolsRequest(params) => { self.handle_list_tools(request_id, params).await; } ClientRequest::CallToolRequest(params) => { self.handle_call_tool(request_id, params).await; } ClientRequest::SetLevelRequest(params) => { self.handle_set_level(params); } ClientRequest::CompleteRequest(params) => { self.handle_complete(params); } } } /// Handle a standalone JSON-RPC response originating from the peer. pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) { tracing::info!("<- response: {:?}", response); let JSONRPCResponse { id, result, .. } = response; self.outgoing.notify_client_response(id, result).await } /// Handle a fire-and-forget JSON-RPC notification. pub(crate) fn process_notification(&mut self, notification: JSONRPCNotification) { let server_notification = match ServerNotification::try_from(notification) { Ok(n) => n, Err(e) => { tracing::warn!("Failed to convert notification: {e}"); return; } }; // Similar to requests, route each notification type to its own stub // handler so additional logic can be implemented incrementally. match server_notification { ServerNotification::CancelledNotification(params) => { self.handle_cancelled_notification(params); } ServerNotification::ProgressNotification(params) => { self.handle_progress_notification(params); } ServerNotification::ResourceListChangedNotification(params) => { self.handle_resource_list_changed(params); } ServerNotification::ResourceUpdatedNotification(params) => { self.handle_resource_updated(params); } ServerNotification::PromptListChangedNotification(params) => { self.handle_prompt_list_changed(params); } ServerNotification::ToolListChangedNotification(params) => { self.handle_tool_list_changed(params); } ServerNotification::LoggingMessageNotification(params) => { self.handle_logging_message(params); } } } /// Handle an error object received from the peer. pub(crate) fn process_error(&mut self, err: JSONRPCError) { tracing::error!("<- error: {:?}", err); } async fn handle_initialize( &mut self, id: RequestId, params: ::Params, ) { tracing::info!("initialize -> params: {:?}", params); if self.initialized { // Already initialised: send JSON-RPC error response. let error = JSONRPCErrorError { code: -32600, // Invalid Request message: "initialize called more than once".to_string(), data: None, }; self.outgoing.send_error(id, error).await; return; } self.initialized = true; // Build a minimal InitializeResult. Fill with placeholders. let result = mcp_types::InitializeResult { capabilities: mcp_types::ServerCapabilities { completions: None, experimental: None, logging: None, prompts: None, resources: None, tools: Some(ServerCapabilitiesTools { list_changed: Some(true), }), }, instructions: None, protocol_version: params.protocol_version.clone(), server_info: mcp_types::Implementation { name: "codex-mcp-server".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), title: Some("Codex".to_string()), }, }; self.send_response::(id, result) .await; } async fn send_response(&self, id: RequestId, result: T::Result) where T: ModelContextProtocolRequest, { // result has `Serialized` instance so should never fail #[expect(clippy::unwrap_used)] let result = serde_json::to_value(result).unwrap(); self.outgoing.send_response(id, result).await; } async fn handle_ping( &self, id: RequestId, params: ::Params, ) { tracing::info!("ping -> params: {:?}", params); let result = json!({}); self.send_response::(id, result) .await; } fn handle_list_resources( &self, params: ::Params, ) { tracing::info!("resources/list -> params: {:?}", params); } fn handle_list_resource_templates( &self, params: ::Params, ) { tracing::info!("resources/templates/list -> params: {:?}", params); } fn handle_read_resource( &self, params: ::Params, ) { tracing::info!("resources/read -> params: {:?}", params); } fn handle_subscribe( &self, params: ::Params, ) { tracing::info!("resources/subscribe -> params: {:?}", params); } fn handle_unsubscribe( &self, params: ::Params, ) { tracing::info!("resources/unsubscribe -> params: {:?}", params); } fn handle_list_prompts( &self, params: ::Params, ) { tracing::info!("prompts/list -> params: {:?}", params); } fn handle_get_prompt( &self, params: ::Params, ) { tracing::info!("prompts/get -> params: {:?}", params); } async fn handle_list_tools( &self, id: RequestId, params: ::Params, ) { tracing::trace!("tools/list -> {params:?}"); let result = ListToolsResult { tools: vec![ create_tool_for_codex_tool_call_param(), create_tool_for_codex_tool_call_reply_param(), ], next_cursor: None, }; self.send_response::(id, result) .await; } async fn handle_call_tool( &self, id: RequestId, params: ::Params, ) { tracing::info!("tools/call -> params: {:?}", params); let CallToolRequestParams { name, arguments } = params; match name.as_str() { "codex" => self.handle_tool_call_codex(id, arguments).await, "codex-reply" => { self.handle_tool_call_codex_session_reply(id, arguments) .await } _ => { let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_string(), text: format!("Unknown tool '{name}'"), annotations: None, })], is_error: Some(true), structured_content: None, }; self.send_response::(id, result) .await; } } } async fn handle_tool_call_codex(&self, id: RequestId, arguments: Option) { let (initial_prompt, config): (String, CodexConfig) = match arguments { Some(json_val) => match serde_json::from_value::(json_val) { Ok(tool_cfg) => match tool_cfg.into_config(self.codex_linux_sandbox_exe.clone()) { Ok(cfg) => cfg, Err(e) => { let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), text: format!( "Failed to load Codex configuration from overrides: {e}" ), annotations: None, })], is_error: Some(true), structured_content: None, }; self.send_response::(id, result) .await; return; } }, Err(e) => { let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), text: format!("Failed to parse configuration for Codex tool: {e}"), annotations: None, })], is_error: Some(true), structured_content: None, }; self.send_response::(id, result) .await; return; } }, None => { let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_string(), text: "Missing arguments for codex tool-call; the `prompt` field is required." .to_string(), annotations: None, })], is_error: Some(true), structured_content: None, }; self.send_response::(id, result) .await; return; } }; // Clone outgoing and session map to move into async task. let outgoing = self.outgoing.clone(); let session_map = self.session_map.clone(); // Spawn an async task to handle the Codex session so that we do not // block the synchronous message-processing loop. task::spawn(async move { // Run the Codex session and stream events back to the client. crate::codex_tool_runner::run_codex_tool_session( id, initial_prompt, config, outgoing, session_map, ) .await; }); } async fn handle_tool_call_codex_session_reply( &self, request_id: RequestId, arguments: Option, ) { tracing::info!("tools/call -> params: {:?}", arguments); // parse arguments let CodexToolCallReplyParam { session_id, prompt } = match arguments { Some(json_val) => match serde_json::from_value::(json_val) { Ok(params) => params, Err(e) => { tracing::error!("Failed to parse Codex tool call reply parameters: {e}"); let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), text: format!("Failed to parse configuration for Codex tool: {e}"), annotations: None, })], is_error: Some(true), structured_content: None, }; self.send_response::(request_id, result) .await; return; } }, None => { tracing::error!( "Missing arguments for codex-reply tool-call; the `session_id` and `prompt` fields are required." ); let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), text: "Missing arguments for codex-reply tool-call; the `session_id` and `prompt` fields are required.".to_owned(), annotations: None, })], is_error: Some(true), structured_content: None, }; self.send_response::(request_id, result) .await; return; } }; let session_id = match Uuid::parse_str(&session_id) { Ok(id) => id, Err(e) => { tracing::error!("Failed to parse session_id: {e}"); let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), text: format!("Failed to parse session_id: {e}"), annotations: None, })], is_error: Some(true), structured_content: None, }; self.send_response::(request_id, result) .await; return; } }; // load codex from session map let session_map_mutex = Arc::clone(&self.session_map); // Clone outgoing and session map to move into async task. let outgoing = self.outgoing.clone(); // Spawn an async task to handle the Codex session so that we do not // block the synchronous message-processing loop. task::spawn(async move { let session_map = session_map_mutex.lock().await; let codex = match session_map.get(&session_id) { Some(codex) => codex, None => { tracing::warn!("Session not found for session_id: {session_id}"); let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), text: format!("Session not found for session_id: {session_id}"), annotations: None, })], is_error: Some(true), structured_content: None, }; // unwrap_or_default is fine here because we know the result is valid JSON outgoing .send_response(request_id, serde_json::to_value(result).unwrap_or_default()) .await; return; } }; crate::codex_tool_runner::run_codex_tool_session_reply( codex.clone(), outgoing, request_id, prompt.clone(), ) .await; }); } fn handle_set_level( &self, params: ::Params, ) { tracing::info!("logging/setLevel -> params: {:?}", params); } fn handle_complete( &self, params: ::Params, ) { tracing::info!("completion/complete -> params: {:?}", params); } // --------------------------------------------------------------------- // Notification handlers // --------------------------------------------------------------------- fn handle_cancelled_notification( &self, params: ::Params, ) { tracing::info!("notifications/cancelled -> params: {:?}", params); } fn handle_progress_notification( &self, params: ::Params, ) { tracing::info!("notifications/progress -> params: {:?}", params); } fn handle_resource_list_changed( &self, params: ::Params, ) { tracing::info!( "notifications/resources/list_changed -> params: {:?}", params ); } fn handle_resource_updated( &self, params: ::Params, ) { tracing::info!("notifications/resources/updated -> params: {:?}", params); } fn handle_prompt_list_changed( &self, params: ::Params, ) { tracing::info!("notifications/prompts/list_changed -> params: {:?}", params); } fn handle_tool_list_changed( &self, params: ::Params, ) { tracing::info!("notifications/tools/list_changed -> params: {:?}", params); } fn handle_logging_message( &self, params: ::Params, ) { tracing::info!("notifications/message -> params: {:?}", params); } }