feat: support traditional JSON-RPC request/response in MCP server (#2264)
This introduces a new set of request types that our `codex mcp`
supports. Note that these do not conform to MCP tool calls so that
instead of having to send something like this:
```json
{
"jsonrpc": "2.0",
"method": "tools/call",
"id": 42,
"params": {
"name": "newConversation",
"arguments": {
"model": "gpt-5",
"approvalPolicy": "on-request"
}
}
}
```
we can send something like this:
```json
{
"jsonrpc": "2.0",
"method": "newConversation",
"id": 42,
"params": {
"model": "gpt-5",
"approvalPolicy": "on-request"
}
}
```
Admittedly, this new format is not a valid MCP tool call, but we are OK
with that right now. (That is, not everything we might want to request
of `codex mcp` is something that is appropriate for an autonomous agent
to do.)
To start, this introduces four request types:
- `newConversation`
- `sendUserMessage`
- `addConversationListener`
- `removeConversationListener`
The new `mcp-server/tests/codex_message_processor_flow.rs` shows how
these can be used.
The types are defined on the `CodexRequest` enum, so we introduce a new
`CodexMessageProcessor` that is responsible for dealing with requests
from this enum. The top-level `MessageProcessor` has been updated so
that when `process_request()` is called, it first checks whether the
request conforms to `CodexRequest` and dispatches it to
`CodexMessageProcessor` if so.
Note that I also decided to use `camelCase` for the on-the-wire format,
as that seems to be the convention for MCP.
For the moment, the new protocol is defined in `wire_format.rs` within
the `mcp-server` crate, but in a subsequent PR, I will probably move it
to its own crate to ensure the protocol has minimal dependencies and
that we can codegen a schema from it.
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2264).
* #2278
* __->__ #2264
This commit is contained in:
269
codex-rs/mcp-server/src/codex_message_processor.rs
Normal file
269
codex-rs/mcp-server/src/codex_message_processor.rs
Normal file
@@ -0,0 +1,269 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use mcp_types::JSONRPCErrorError;
|
||||
use mcp_types::RequestId;
|
||||
use tokio::sync::oneshot;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::json_to_toml::json_to_toml;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::outgoing_message::OutgoingNotificationMeta;
|
||||
use crate::wire_format::AddConversationListenerParams;
|
||||
use crate::wire_format::AddConversationSubscriptionResponse;
|
||||
use crate::wire_format::CodexRequest;
|
||||
use crate::wire_format::ConversationId;
|
||||
use crate::wire_format::InputItem as WireInputItem;
|
||||
use crate::wire_format::NewConversationParams;
|
||||
use crate::wire_format::NewConversationResponse;
|
||||
use crate::wire_format::RemoveConversationListenerParams;
|
||||
use crate::wire_format::RemoveConversationSubscriptionResponse;
|
||||
use crate::wire_format::SendUserMessageParams;
|
||||
use crate::wire_format::SendUserMessageResponse;
|
||||
use codex_core::protocol::InputItem as CoreInputItem;
|
||||
use codex_core::protocol::Op;
|
||||
|
||||
/// Handles JSON-RPC messages for Codex conversations.
|
||||
pub(crate) struct CodexMessageProcessor {
|
||||
conversation_manager: Arc<ConversationManager>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl CodexMessageProcessor {
|
||||
pub fn new(
|
||||
conversation_manager: Arc<ConversationManager>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
) -> Self {
|
||||
Self {
|
||||
conversation_manager,
|
||||
outgoing,
|
||||
codex_linux_sandbox_exe,
|
||||
conversation_listeners: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_request(&mut self, request: CodexRequest) {
|
||||
match request {
|
||||
CodexRequest::NewConversation { request_id, params } => {
|
||||
// Do not tokio::spawn() to process new_conversation()
|
||||
// asynchronously because we need to ensure the conversation is
|
||||
// created before processing any subsequent messages.
|
||||
self.process_new_conversation(request_id, params).await;
|
||||
}
|
||||
CodexRequest::SendUserMessage { request_id, params } => {
|
||||
self.send_user_message(request_id, params).await;
|
||||
}
|
||||
CodexRequest::AddConversationListener { request_id, params } => {
|
||||
self.add_conversation_listener(request_id, params).await;
|
||||
}
|
||||
CodexRequest::RemoveConversationListener { request_id, params } => {
|
||||
self.remove_conversation_listener(request_id, params).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) {
|
||||
let config = match derive_config_from_params(params, self.codex_linux_sandbox_exe.clone()) {
|
||||
Ok(config) => config,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("error deriving config: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match self.conversation_manager.new_conversation(config).await {
|
||||
Ok(conversation_id) => {
|
||||
let NewConversation {
|
||||
conversation_id,
|
||||
session_configured,
|
||||
..
|
||||
} = conversation_id;
|
||||
let response = NewConversationResponse {
|
||||
conversation_id: ConversationId(conversation_id),
|
||||
model: session_configured.model,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("error creating conversation: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
|
||||
let SendUserMessageParams {
|
||||
conversation_id,
|
||||
items,
|
||||
} = params;
|
||||
let Ok(conversation) = self
|
||||
.conversation_manager
|
||||
.get_conversation(conversation_id.0)
|
||||
.await
|
||||
else {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("conversation not found: {conversation_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
};
|
||||
|
||||
let mapped_items: Vec<CoreInputItem> = items
|
||||
.into_iter()
|
||||
.map(|item| match item {
|
||||
WireInputItem::Text { text } => CoreInputItem::Text { text },
|
||||
WireInputItem::Image { image_url } => CoreInputItem::Image { image_url },
|
||||
WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path },
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Submit user input to the conversation.
|
||||
let _ = conversation
|
||||
.submit(Op::UserInput {
|
||||
items: mapped_items,
|
||||
})
|
||||
.await;
|
||||
|
||||
// Acknowledge with an empty result.
|
||||
self.outgoing
|
||||
.send_response(request_id, SendUserMessageResponse {})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn add_conversation_listener(
|
||||
&mut self,
|
||||
request_id: RequestId,
|
||||
params: AddConversationListenerParams,
|
||||
) {
|
||||
let AddConversationListenerParams { conversation_id } = params;
|
||||
let Ok(conversation) = self
|
||||
.conversation_manager
|
||||
.get_conversation(conversation_id.0)
|
||||
.await
|
||||
else {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("conversation not found: {}", conversation_id.0),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
};
|
||||
|
||||
let subscription_id = Uuid::new_v4();
|
||||
let (cancel_tx, mut cancel_rx) = oneshot::channel();
|
||||
self.conversation_listeners
|
||||
.insert(subscription_id, cancel_tx);
|
||||
let outgoing_for_task = self.outgoing.clone();
|
||||
let add_listener_request_id = request_id.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut cancel_rx => {
|
||||
// User has unsubscribed, so exit this task.
|
||||
break;
|
||||
}
|
||||
event = conversation.next_event() => {
|
||||
let event = match event {
|
||||
Ok(event) => event,
|
||||
Err(err) => {
|
||||
tracing::warn!("conversation.next_event() failed with: {err}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
outgoing_for_task.send_event_as_notification(
|
||||
&event,
|
||||
Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let response = AddConversationSubscriptionResponse { subscription_id };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn remove_conversation_listener(
|
||||
&mut self,
|
||||
request_id: RequestId,
|
||||
params: RemoveConversationListenerParams,
|
||||
) {
|
||||
let RemoveConversationListenerParams { subscription_id } = params;
|
||||
match self.conversation_listeners.remove(&subscription_id) {
|
||||
Some(sender) => {
|
||||
// Signal the spawned task to exit and acknowledge.
|
||||
let _ = sender.send(());
|
||||
let response = RemoveConversationSubscriptionResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
None => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("subscription not found: {subscription_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn derive_config_from_params(
|
||||
params: NewConversationParams,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
) -> std::io::Result<Config> {
|
||||
let NewConversationParams {
|
||||
model,
|
||||
profile,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox,
|
||||
config: cli_overrides,
|
||||
base_instructions,
|
||||
include_plan_tool,
|
||||
} = params;
|
||||
let overrides = ConfigOverrides {
|
||||
model,
|
||||
config_profile: profile,
|
||||
cwd: cwd.map(PathBuf::from),
|
||||
approval_policy: approval_policy.map(Into::into),
|
||||
sandbox_mode: sandbox.map(Into::into),
|
||||
model_provider: None,
|
||||
codex_linux_sandbox_exe,
|
||||
base_instructions,
|
||||
include_plan_tool,
|
||||
disable_response_storage: None,
|
||||
show_raw_agent_reasoning: None,
|
||||
};
|
||||
|
||||
let cli_overrides = cli_overrides
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, json_to_toml(v)))
|
||||
.collect();
|
||||
|
||||
Config::load_with_cli_overrides(cli_overrides, overrides)
|
||||
}
|
||||
Reference in New Issue
Block a user