//! Very small proof-of-concept request router for the MCP prototype server. use mcp_types::CallToolRequestParams; use mcp_types::CallToolResultContent; use mcp_types::ClientRequest; use mcp_types::JSONRPCBatchRequest; use mcp_types::JSONRPCBatchResponse; use mcp_types::JSONRPCError; use mcp_types::JSONRPCErrorError; use mcp_types::JSONRPCMessage; use mcp_types::JSONRPCNotification; use mcp_types::JSONRPCRequest; use mcp_types::JSONRPCResponse; use mcp_types::ListToolsResult; use mcp_types::ModelContextProtocolRequest; use mcp_types::RequestId; use mcp_types::ServerCapabilitiesTools; use mcp_types::ServerNotification; use mcp_types::TextContent; use mcp_types::Tool; use mcp_types::ToolInputSchema; use mcp_types::JSONRPC_VERSION; use serde_json::json; use tokio::sync::mpsc; pub(crate) struct MessageProcessor { outgoing: mpsc::Sender, initialized: bool, } impl MessageProcessor { /// Create a new `MessageProcessor`, retaining a handle to the outgoing /// `Sender` so handlers can enqueue messages to be written to stdout. pub(crate) fn new(outgoing: mpsc::Sender) -> Self { Self { outgoing, initialized: false, } } pub(crate) fn process_request(&mut self, request: JSONRPCRequest) { // Hold on to the ID so we can respond. let request_id = request.id.clone(); let client_request = match ClientRequest::try_from(request) { Ok(client_request) => client_request, Err(e) => { tracing::warn!("Failed to convert request: {e}"); return; } }; // Dispatch to a dedicated handler for each request type. match client_request { ClientRequest::InitializeRequest(params) => { self.handle_initialize(request_id, params); } ClientRequest::PingRequest(params) => { self.handle_ping(request_id, params); } ClientRequest::ListResourcesRequest(params) => { self.handle_list_resources(params); } ClientRequest::ListResourceTemplatesRequest(params) => { self.handle_list_resource_templates(params); } ClientRequest::ReadResourceRequest(params) => { self.handle_read_resource(params); } ClientRequest::SubscribeRequest(params) => { self.handle_subscribe(params); } ClientRequest::UnsubscribeRequest(params) => { self.handle_unsubscribe(params); } ClientRequest::ListPromptsRequest(params) => { self.handle_list_prompts(params); } ClientRequest::GetPromptRequest(params) => { self.handle_get_prompt(params); } ClientRequest::ListToolsRequest(params) => { self.handle_list_tools(request_id, params); } ClientRequest::CallToolRequest(params) => { self.handle_call_tool(request_id, params); } ClientRequest::SetLevelRequest(params) => { self.handle_set_level(params); } ClientRequest::CompleteRequest(params) => { self.handle_complete(params); } } } /// Handle a standalone JSON-RPC response originating from the peer. pub(crate) fn process_response(&mut self, response: JSONRPCResponse) { tracing::info!("<- response: {:?}", response); } /// Handle a fire-and-forget JSON-RPC notification. pub(crate) fn process_notification(&mut self, notification: JSONRPCNotification) { let server_notification = match ServerNotification::try_from(notification) { Ok(n) => n, Err(e) => { tracing::warn!("Failed to convert notification: {e}"); return; } }; // Similar to requests, route each notification type to its own stub // handler so additional logic can be implemented incrementally. match server_notification { ServerNotification::CancelledNotification(params) => { self.handle_cancelled_notification(params); } ServerNotification::ProgressNotification(params) => { self.handle_progress_notification(params); } ServerNotification::ResourceListChangedNotification(params) => { self.handle_resource_list_changed(params); } ServerNotification::ResourceUpdatedNotification(params) => { self.handle_resource_updated(params); } ServerNotification::PromptListChangedNotification(params) => { self.handle_prompt_list_changed(params); } ServerNotification::ToolListChangedNotification(params) => { self.handle_tool_list_changed(params); } ServerNotification::LoggingMessageNotification(params) => { self.handle_logging_message(params); } } } /// Handle a batch of requests and/or notifications. pub(crate) fn process_batch_request(&mut self, batch: JSONRPCBatchRequest) { tracing::info!("<- batch request containing {} item(s)", batch.len()); for item in batch { match item { mcp_types::JSONRPCBatchRequestItem::JSONRPCRequest(req) => { self.process_request(req); } mcp_types::JSONRPCBatchRequestItem::JSONRPCNotification(note) => { self.process_notification(note); } } } } /// Handle an error object received from the peer. pub(crate) fn process_error(&mut self, err: JSONRPCError) { tracing::error!("<- error: {:?}", err); } /// Handle a batch of responses/errors. pub(crate) fn process_batch_response(&mut self, batch: JSONRPCBatchResponse) { tracing::info!("<- batch response containing {} item(s)", batch.len()); for item in batch { match item { mcp_types::JSONRPCBatchResponseItem::JSONRPCResponse(resp) => { self.process_response(resp); } mcp_types::JSONRPCBatchResponseItem::JSONRPCError(err) => { self.process_error(err); } } } } fn handle_initialize( &mut self, id: RequestId, params: ::Params, ) { tracing::info!("initialize -> params: {:?}", params); if self.initialized { // Already initialised: send JSON-RPC error response. let error_msg = JSONRPCMessage::Error(JSONRPCError { jsonrpc: JSONRPC_VERSION.into(), id, error: JSONRPCErrorError { code: -32600, // Invalid Request message: "initialize called more than once".to_string(), data: None, }, }); if let Err(e) = self.outgoing.try_send(error_msg) { tracing::error!("Failed to send initialization error: {e}"); } return; } self.initialized = true; // Build a minimal InitializeResult. Fill with placeholders. let result = mcp_types::InitializeResult { capabilities: mcp_types::ServerCapabilities { completions: None, experimental: None, logging: None, prompts: None, resources: None, tools: Some(ServerCapabilitiesTools { list_changed: Some(true), }), }, instructions: None, protocol_version: params.protocol_version.clone(), server_info: mcp_types::Implementation { name: "codex-mcp-server".to_string(), version: mcp_types::MCP_SCHEMA_VERSION.to_string(), }, }; self.send_response::(id, result); } fn send_response(&self, id: RequestId, result: T::Result) where T: ModelContextProtocolRequest, { let response = JSONRPCMessage::Response(JSONRPCResponse { jsonrpc: JSONRPC_VERSION.into(), id, result: serde_json::to_value(result).unwrap(), }); if let Err(e) = self.outgoing.try_send(response) { tracing::error!("Failed to send response: {e}"); } } fn handle_ping( &self, id: RequestId, params: ::Params, ) { tracing::info!("ping -> params: {:?}", params); let result = json!({}); self.send_response::(id, result); } fn handle_list_resources( &self, params: ::Params, ) { tracing::info!("resources/list -> params: {:?}", params); } fn handle_list_resource_templates( &self, params: ::Params, ) { tracing::info!("resources/templates/list -> params: {:?}", params); } fn handle_read_resource( &self, params: ::Params, ) { tracing::info!("resources/read -> params: {:?}", params); } fn handle_subscribe( &self, params: ::Params, ) { tracing::info!("resources/subscribe -> params: {:?}", params); } fn handle_unsubscribe( &self, params: ::Params, ) { tracing::info!("resources/unsubscribe -> params: {:?}", params); } fn handle_list_prompts( &self, params: ::Params, ) { tracing::info!("prompts/list -> params: {:?}", params); } fn handle_get_prompt( &self, params: ::Params, ) { tracing::info!("prompts/get -> params: {:?}", params); } fn handle_list_tools( &self, id: RequestId, params: ::Params, ) { tracing::trace!("tools/list -> {params:?}"); let result = ListToolsResult { tools: vec![Tool { name: "echo".to_string(), input_schema: ToolInputSchema { r#type: "object".to_string(), properties: Some(json!({ "input": { "type": "string", "description": "The input to echo back" } })), required: Some(vec!["input".to_string()]), }, description: Some("Echoes the request back".to_string()), annotations: None, }], next_cursor: None, }; self.send_response::(id, result); } fn handle_call_tool( &self, id: RequestId, params: ::Params, ) { tracing::info!("tools/call -> params: {:?}", params); let CallToolRequestParams { name, arguments } = params; match name.as_str() { "echo" => { let result = mcp_types::CallToolResult { content: vec![CallToolResultContent::TextContent(TextContent { r#type: "text".to_string(), text: format!("Echo: {arguments:?}"), annotations: None, })], is_error: None, }; self.send_response::(id, result); } _ => { let result = mcp_types::CallToolResult { content: vec![], is_error: Some(true), }; self.send_response::(id, result); } } } fn handle_set_level( &self, params: ::Params, ) { tracing::info!("logging/setLevel -> params: {:?}", params); } fn handle_complete( &self, params: ::Params, ) { tracing::info!("completion/complete -> params: {:?}", params); } // --------------------------------------------------------------------- // Notification handlers // --------------------------------------------------------------------- fn handle_cancelled_notification( &self, params: ::Params, ) { tracing::info!("notifications/cancelled -> params: {:?}", params); } fn handle_progress_notification( &self, params: ::Params, ) { tracing::info!("notifications/progress -> params: {:?}", params); } fn handle_resource_list_changed( &self, params: ::Params, ) { tracing::info!( "notifications/resources/list_changed -> params: {:?}", params ); } fn handle_resource_updated( &self, params: ::Params, ) { tracing::info!("notifications/resources/updated -> params: {:?}", params); } fn handle_prompt_list_changed( &self, params: ::Params, ) { tracing::info!("notifications/prompts/list_changed -> params: {:?}", params); } fn handle_tool_list_changed( &self, params: ::Params, ) { tracing::info!("notifications/tools/list_changed -> params: {:?}", params); } fn handle_logging_message( &self, params: ::Params, ) { tracing::info!("notifications/message -> params: {:?}", params); } }