chore: introduce OutgoingMessageSender (#1622)

Previous to this change, `MessageProcessor` had a
`tokio::sync::mpsc::Sender<JSONRPCMessage>` as an abstraction for server
code to send a message down to the MCP client. Because `Sender` is cheap
to `clone()`, it was straightforward to make it available to tasks
scheduled with `tokio::task::spawn()`.

This worked well when we were only sending notifications or responses
back down to the client, but we want to add support for sending
elicitations in #1623, which means that we need to be able to send
_requests_ to the client, and now we need a bit of centralization to
ensure all request ids are unique.

To that end, this PR introduces `OutgoingMessageSender`, which houses
the existing `Sender<OutgoingMessage>` as well as an `AtomicI64` to mint
out new, unique request ids. It has methods like `send_request()` and
`send_response()` so that callers do not have to deal with
`JSONRPCMessage` directly, as having to set the `jsonrpc` for each
message was a bit tedious (this cleans up `codex_tool_runner.rs` quite a
bit).

We do not have `OutgoingMessageSender` implement `Clone` because it is
important that the `AtomicI64` is shared across all users of
`OutgoingMessageSender`. As such, `Arc<OutgoingMessageSender>` must be
used instead, as it is frequently shared with new tokio tasks.

As part of this change, we update `message_processor.rs` to embrace
`await`, though we must be careful that no individual handler blocks the
main loop and prevents other messages from being handled.

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/1622).
* #1623
* __->__ #1622
* #1621
* #1620
This commit is contained in:
Michael Bolin
2025-07-19 00:30:56 -04:00
committed by GitHub
parent e78ec00e73
commit 11fd3123be
4 changed files with 186 additions and 103 deletions

View File

@@ -1,17 +1,17 @@
use std::path::PathBuf;
use std::sync::Arc;
use crate::codex_tool_config::CodexToolCallParam;
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
use crate::outgoing_message::OutgoingMessageSender;
use codex_core::config::Config as CodexConfig;
use mcp_types::CallToolRequestParams;
use mcp_types::CallToolResult;
use mcp_types::ClientRequest;
use mcp_types::ContentBlock;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCError;
use mcp_types::JSONRPCErrorError;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCRequest;
use mcp_types::JSONRPCResponse;
@@ -22,11 +22,10 @@ use mcp_types::ServerCapabilitiesTools;
use mcp_types::ServerNotification;
use mcp_types::TextContent;
use serde_json::json;
use tokio::sync::mpsc;
use tokio::task;
pub(crate) struct MessageProcessor {
outgoing: mpsc::Sender<JSONRPCMessage>,
outgoing: Arc<OutgoingMessageSender>,
initialized: bool,
codex_linux_sandbox_exe: Option<PathBuf>,
}
@@ -35,17 +34,17 @@ impl MessageProcessor {
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(
outgoing: mpsc::Sender<JSONRPCMessage>,
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
) -> Self {
Self {
outgoing,
outgoing: Arc::new(outgoing),
initialized: false,
codex_linux_sandbox_exe,
}
}
pub(crate) fn process_request(&mut self, request: JSONRPCRequest) {
pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
// Hold on to the ID so we can respond.
let request_id = request.id.clone();
@@ -60,10 +59,10 @@ impl MessageProcessor {
// Dispatch to a dedicated handler for each request type.
match client_request {
ClientRequest::InitializeRequest(params) => {
self.handle_initialize(request_id, params);
self.handle_initialize(request_id, params).await;
}
ClientRequest::PingRequest(params) => {
self.handle_ping(request_id, params);
self.handle_ping(request_id, params).await;
}
ClientRequest::ListResourcesRequest(params) => {
self.handle_list_resources(params);
@@ -87,10 +86,10 @@ impl MessageProcessor {
self.handle_get_prompt(params);
}
ClientRequest::ListToolsRequest(params) => {
self.handle_list_tools(request_id, params);
self.handle_list_tools(request_id, params).await;
}
ClientRequest::CallToolRequest(params) => {
self.handle_call_tool(request_id, params);
self.handle_call_tool(request_id, params).await;
}
ClientRequest::SetLevelRequest(params) => {
self.handle_set_level(params);
@@ -148,7 +147,7 @@ impl MessageProcessor {
tracing::error!("<- error: {:?}", err);
}
fn handle_initialize(
async fn handle_initialize(
&mut self,
id: RequestId,
params: <mcp_types::InitializeRequest as ModelContextProtocolRequest>::Params,
@@ -157,19 +156,12 @@ impl MessageProcessor {
if self.initialized {
// Already initialised: send JSON-RPC error response.
let error_msg = JSONRPCMessage::Error(JSONRPCError {
jsonrpc: JSONRPC_VERSION.into(),
id,
error: JSONRPCErrorError {
code: -32600, // Invalid Request
message: "initialize called more than once".to_string(),
data: None,
},
});
if let Err(e) = self.outgoing.try_send(error_msg) {
tracing::error!("Failed to send initialization error: {e}");
}
let error = JSONRPCErrorError {
code: -32600, // Invalid Request
message: "initialize called more than once".to_string(),
data: None,
};
self.outgoing.send_error(id, error).await;
return;
}
@@ -196,34 +188,29 @@ impl MessageProcessor {
},
};
self.send_response::<mcp_types::InitializeRequest>(id, result);
self.send_response::<mcp_types::InitializeRequest>(id, result)
.await;
}
fn send_response<T>(&self, id: RequestId, result: T::Result)
async fn send_response<T>(&self, id: RequestId, result: T::Result)
where
T: ModelContextProtocolRequest,
{
// result has `Serialized` instance so should never fail
#[expect(clippy::unwrap_used)]
let response = JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: serde_json::to_value(result).unwrap(),
});
if let Err(e) = self.outgoing.try_send(response) {
tracing::error!("Failed to send response: {e}");
}
let result = serde_json::to_value(result).unwrap();
self.outgoing.send_response(id, result).await;
}
fn handle_ping(
async fn handle_ping(
&self,
id: RequestId,
params: <mcp_types::PingRequest as mcp_types::ModelContextProtocolRequest>::Params,
) {
tracing::info!("ping -> params: {:?}", params);
let result = json!({});
self.send_response::<mcp_types::PingRequest>(id, result);
self.send_response::<mcp_types::PingRequest>(id, result)
.await;
}
fn handle_list_resources(
@@ -276,7 +263,7 @@ impl MessageProcessor {
tracing::info!("prompts/get -> params: {:?}", params);
}
fn handle_list_tools(
async fn handle_list_tools(
&self,
id: RequestId,
params: <mcp_types::ListToolsRequest as mcp_types::ModelContextProtocolRequest>::Params,
@@ -287,10 +274,11 @@ impl MessageProcessor {
next_cursor: None,
};
self.send_response::<mcp_types::ListToolsRequest>(id, result);
self.send_response::<mcp_types::ListToolsRequest>(id, result)
.await;
}
fn handle_call_tool(
async fn handle_call_tool(
&self,
id: RequestId,
params: <mcp_types::CallToolRequest as mcp_types::ModelContextProtocolRequest>::Params,
@@ -310,7 +298,8 @@ impl MessageProcessor {
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(id, result);
self.send_response::<mcp_types::CallToolRequest>(id, result)
.await;
return;
}
@@ -330,7 +319,8 @@ impl MessageProcessor {
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(id, result);
self.send_response::<mcp_types::CallToolRequest>(id, result)
.await;
return;
}
},
@@ -344,7 +334,8 @@ impl MessageProcessor {
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(id, result);
self.send_response::<mcp_types::CallToolRequest>(id, result)
.await;
return;
}
},
@@ -360,7 +351,8 @@ impl MessageProcessor {
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(id, result);
self.send_response::<mcp_types::CallToolRequest>(id, result)
.await;
return;
}
};