From 094d7af8c3a60fcc54e83f9a286415b0582b9d3c Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 28 Jul 2025 13:32:09 -0700 Subject: [PATCH] [mcp-server] Populate notifications._meta with requestId (#1704) ## Summary Per the [latest MCP spec](https://modelcontextprotocol.io/specification/2025-06-18/basic#meta), the `_meta` field is reserved for metadata. In the [Typescript Schema](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/0695a497eb50a804fc0e88c18a93a21a675d6b3e/schema/2025-06-18/schema.ts#L37-L40), `progressToken` is defined as a value to be attached to subsequent notifications for that request. The [CallToolRequestParams](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/0695a497eb50a804fc0e88c18a93a21a675d6b3e/schema/2025-06-18/schema.ts#L806-L817) extends this definition but overwrites the params field. This ambiguity makes our generated type definitions tricky, so I'm going to skip `progressToken` field for now and just send back the `requestId` instead. In a future PR, we can clarify, update our `generate_mcp_types.py` script, and update our progressToken logic accordingly. ## Testing - [x] Added unit tests - [x] Manually tested with mcp client --- codex-rs/mcp-server/src/codex_tool_runner.rs | 14 +- codex-rs/mcp-server/src/outgoing_message.rs | 161 ++++++++++++++++++- 2 files changed, 167 insertions(+), 8 deletions(-) diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 22a36b83..c3cb39c4 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -27,6 +27,7 @@ use uuid::Uuid; use crate::exec_approval::handle_exec_approval_request; use crate::outgoing_message::OutgoingMessageSender; +use crate::outgoing_message::OutgoingNotificationMeta; use crate::patch_approval::handle_patch_approval_request; pub(crate) const INVALID_PARAMS_ERROR_CODE: i64 = -32602; @@ -71,9 +72,11 @@ pub async fn run_codex_tool_session( session_map.lock().await.insert(session_id, codex.clone()); drop(session_map); - // Send initial SessionConfigured event. outgoing - .send_event_as_notification(&session_configured) + .send_event_as_notification( + &session_configured, + Some(OutgoingNotificationMeta::new(Some(id.clone()))), + ) .await; // Use the original MCP request ID as the `sub_id` for the Codex submission so that @@ -158,7 +161,12 @@ async fn run_codex_tool_session_inner( loop { match codex.next_event().await { Ok(event) => { - outgoing.send_event_as_notification(&event).await; + outgoing + .send_event_as_notification( + &event, + Some(OutgoingNotificationMeta::new(Some(request_id.clone()))), + ) + .await; match event.msg { EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index e4af1f78..e7b0b9b6 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -18,6 +18,7 @@ use tokio::sync::mpsc; use tokio::sync::oneshot; use tracing::warn; +/// Sends messages to the client and manages request callbacks. pub(crate) struct OutgoingMessageSender { next_request_id: AtomicI64, sender: mpsc::Sender, @@ -78,18 +79,34 @@ impl OutgoingMessageSender { let _ = self.sender.send(outgoing_message).await; } - pub(crate) async fn send_event_as_notification(&self, event: &Event) { - #[expect(clippy::expect_used)] - let params = Some(serde_json::to_value(event).expect("Event must serialize")); + pub(crate) async fn send_event_as_notification( + &self, + event: &Event, + meta: Option, + ) { + #[allow(clippy::expect_used)] + let event_json = serde_json::to_value(event).expect("Event must serialize"); + + let params = if let Ok(params) = serde_json::to_value(OutgoingNotificationParams { + meta, + event: event_json.clone(), + }) { + params + } else { + warn!("Failed to serialize event as OutgoingNotificationParams"); + event_json + }; + let outgoing_message = OutgoingMessage::Notification(OutgoingNotification { method: "codex/event".to_string(), - params: params.clone(), + params: Some(params.clone()), }); let _ = self.sender.send(outgoing_message).await; - self.send_event_as_notification_new_schema(event, params) + self.send_event_as_notification_new_schema(event, Some(params.clone())) .await; } + // should be backwards compatible. // it will replace send_event_as_notification eventually. async fn send_event_as_notification_new_schema( @@ -167,6 +184,30 @@ pub(crate) struct OutgoingNotification { pub params: Option, } +#[derive(Debug, Clone, PartialEq, Serialize)] +pub(crate) struct OutgoingNotificationParams { + #[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")] + pub meta: Option, + + #[serde(flatten)] + pub event: serde_json::Value, +} + +// Additional mcp-specific data to be added to a [`codex_core::protocol::Event`] as notification.params._meta +// MCP Spec: https://modelcontextprotocol.io/specification/2025-06-18/basic#meta +// Typescript Schema: https://github.com/modelcontextprotocol/modelcontextprotocol/blob/0695a497eb50a804fc0e88c18a93a21a675d6b3e/schema/2025-06-18/schema.ts +#[derive(Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct OutgoingNotificationMeta { + pub request_id: Option, +} + +impl OutgoingNotificationMeta { + pub(crate) fn new(request_id: Option) -> Self { + Self { request_id } + } +} + #[derive(Debug, Clone, PartialEq, Serialize)] pub(crate) struct OutgoingResponse { pub id: RequestId, @@ -178,3 +219,113 @@ pub(crate) struct OutgoingError { pub error: JSONRPCErrorError, pub id: RequestId, } + +#[cfg(test)] +mod tests { + #![allow(clippy::unwrap_used)] + + use codex_core::protocol::EventMsg; + use codex_core::protocol::SessionConfiguredEvent; + use pretty_assertions::assert_eq; + use serde_json::json; + use uuid::Uuid; + + use super::*; + + #[tokio::test] + async fn test_send_event_as_notification() { + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(2); + let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); + + let event = Event { + id: "1".to_string(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id: Uuid::new_v4(), + model: "gpt-4o".to_string(), + history_log_id: 1, + history_entry_count: 1000, + }), + }; + + outgoing_message_sender + .send_event_as_notification(&event, None) + .await; + + let result = outgoing_rx.recv().await.unwrap(); + let OutgoingMessage::Notification(OutgoingNotification { method, params }) = result else { + panic!("expected Notification for first message"); + }; + assert_eq!(method, "codex/event"); + + let Ok(expected_params) = serde_json::to_value(&event) else { + panic!("Event must serialize"); + }; + assert_eq!(params, Some(expected_params.clone())); + + let result2 = outgoing_rx.recv().await.unwrap(); + let OutgoingMessage::Notification(OutgoingNotification { + method: method2, + params: params2, + }) = result2 + else { + panic!("expected Notification for second message"); + }; + assert_eq!(method2, event.msg.to_string()); + assert_eq!(params2, Some(expected_params)); + } + + #[tokio::test] + async fn test_send_event_as_notification_with_meta() { + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(2); + let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); + + let session_configured_event = SessionConfiguredEvent { + session_id: Uuid::new_v4(), + model: "gpt-4o".to_string(), + history_log_id: 1, + history_entry_count: 1000, + }; + let event = Event { + id: "1".to_string(), + msg: EventMsg::SessionConfigured(session_configured_event.clone()), + }; + let meta = OutgoingNotificationMeta { + request_id: Some(RequestId::String("123".to_string())), + }; + + outgoing_message_sender + .send_event_as_notification(&event, Some(meta)) + .await; + + let result = outgoing_rx.recv().await.unwrap(); + let OutgoingMessage::Notification(OutgoingNotification { method, params }) = result else { + panic!("expected Notification for first message"); + }; + assert_eq!(method, "codex/event"); + let expected_params = json!({ + "_meta": { + "requestId": "123", + }, + "id": "1", + "msg": { + "session_id": session_configured_event.session_id, + "model": session_configured_event.model, + "history_log_id": session_configured_event.history_log_id, + "history_entry_count": session_configured_event.history_entry_count, + "type": "session_configured", + } + }); + assert_eq!(params.unwrap(), expected_params); + + let result2 = outgoing_rx.recv().await.unwrap(); + let OutgoingMessage::Notification(OutgoingNotification { + method: method2, + params: params2, + }) = result2 + else { + panic!("expected Notification for second message"); + }; + assert_eq!(method2, event.msg.to_string()); + assert_eq!(params2.unwrap(), expected_params); + } +}