[mcp-server] Populate notifications._meta with requestId (#1704)

## Summary
Per the [latest MCP
spec](https://modelcontextprotocol.io/specification/2025-06-18/basic#meta),
the `_meta` field is reserved for metadata. In the [Typescript
Schema](0695a497eb/schema/2025-06-18/schema.ts (L37-L40)),
`progressToken` is defined as a value to be attached to subsequent
notifications for that request.

The
[CallToolRequestParams](0695a497eb/schema/2025-06-18/schema.ts (L806-L817))
extends this definition but overwrites the params field. This ambiguity
makes our generated type definitions tricky, so I'm going to skip
`progressToken` field for now and just send back the `requestId`
instead.
 
In a future PR, we can clarify, update our `generate_mcp_types.py`
script, and update our progressToken logic accordingly.

## Testing
- [x] Added unit tests
- [x] Manually tested with mcp client
This commit is contained in:
Dylan
2025-07-28 13:32:09 -07:00
committed by GitHub
parent 2d2df891bb
commit 094d7af8c3
2 changed files with 167 additions and 8 deletions

View File

@@ -27,6 +27,7 @@ use uuid::Uuid;
use crate::exec_approval::handle_exec_approval_request;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotificationMeta;
use crate::patch_approval::handle_patch_approval_request;
pub(crate) const INVALID_PARAMS_ERROR_CODE: i64 = -32602;
@@ -71,9 +72,11 @@ pub async fn run_codex_tool_session(
session_map.lock().await.insert(session_id, codex.clone());
drop(session_map);
// Send initial SessionConfigured event.
outgoing
.send_event_as_notification(&session_configured)
.send_event_as_notification(
&session_configured,
Some(OutgoingNotificationMeta::new(Some(id.clone()))),
)
.await;
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
@@ -158,7 +161,12 @@ async fn run_codex_tool_session_inner(
loop {
match codex.next_event().await {
Ok(event) => {
outgoing.send_event_as_notification(&event).await;
outgoing
.send_event_as_notification(
&event,
Some(OutgoingNotificationMeta::new(Some(request_id.clone()))),
)
.await;
match event.msg {
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {

View File

@@ -18,6 +18,7 @@ use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::warn;
/// Sends messages to the client and manages request callbacks.
pub(crate) struct OutgoingMessageSender {
next_request_id: AtomicI64,
sender: mpsc::Sender<OutgoingMessage>,
@@ -78,18 +79,34 @@ impl OutgoingMessageSender {
let _ = self.sender.send(outgoing_message).await;
}
pub(crate) async fn send_event_as_notification(&self, event: &Event) {
#[expect(clippy::expect_used)]
let params = Some(serde_json::to_value(event).expect("Event must serialize"));
pub(crate) async fn send_event_as_notification(
&self,
event: &Event,
meta: Option<OutgoingNotificationMeta>,
) {
#[allow(clippy::expect_used)]
let event_json = serde_json::to_value(event).expect("Event must serialize");
let params = if let Ok(params) = serde_json::to_value(OutgoingNotificationParams {
meta,
event: event_json.clone(),
}) {
params
} else {
warn!("Failed to serialize event as OutgoingNotificationParams");
event_json
};
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {
method: "codex/event".to_string(),
params: params.clone(),
params: Some(params.clone()),
});
let _ = self.sender.send(outgoing_message).await;
self.send_event_as_notification_new_schema(event, params)
self.send_event_as_notification_new_schema(event, Some(params.clone()))
.await;
}
// should be backwards compatible.
// it will replace send_event_as_notification eventually.
async fn send_event_as_notification_new_schema(
@@ -167,6 +184,30 @@ pub(crate) struct OutgoingNotification {
pub params: Option<serde_json::Value>,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub(crate) struct OutgoingNotificationParams {
#[serde(rename = "_meta", default, skip_serializing_if = "Option::is_none")]
pub meta: Option<OutgoingNotificationMeta>,
#[serde(flatten)]
pub event: serde_json::Value,
}
// Additional mcp-specific data to be added to a [`codex_core::protocol::Event`] as notification.params._meta
// MCP Spec: https://modelcontextprotocol.io/specification/2025-06-18/basic#meta
// Typescript Schema: https://github.com/modelcontextprotocol/modelcontextprotocol/blob/0695a497eb50a804fc0e88c18a93a21a675d6b3e/schema/2025-06-18/schema.ts
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct OutgoingNotificationMeta {
pub request_id: Option<RequestId>,
}
impl OutgoingNotificationMeta {
pub(crate) fn new(request_id: Option<RequestId>) -> Self {
Self { request_id }
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub(crate) struct OutgoingResponse {
pub id: RequestId,
@@ -178,3 +219,113 @@ pub(crate) struct OutgoingError {
pub error: JSONRPCErrorError,
pub id: RequestId,
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use codex_core::protocol::EventMsg;
use codex_core::protocol::SessionConfiguredEvent;
use pretty_assertions::assert_eq;
use serde_json::json;
use uuid::Uuid;
use super::*;
#[tokio::test]
async fn test_send_event_as_notification() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(2);
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let event = Event {
id: "1".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: Uuid::new_v4(),
model: "gpt-4o".to_string(),
history_log_id: 1,
history_entry_count: 1000,
}),
};
outgoing_message_sender
.send_event_as_notification(&event, None)
.await;
let result = outgoing_rx.recv().await.unwrap();
let OutgoingMessage::Notification(OutgoingNotification { method, params }) = result else {
panic!("expected Notification for first message");
};
assert_eq!(method, "codex/event");
let Ok(expected_params) = serde_json::to_value(&event) else {
panic!("Event must serialize");
};
assert_eq!(params, Some(expected_params.clone()));
let result2 = outgoing_rx.recv().await.unwrap();
let OutgoingMessage::Notification(OutgoingNotification {
method: method2,
params: params2,
}) = result2
else {
panic!("expected Notification for second message");
};
assert_eq!(method2, event.msg.to_string());
assert_eq!(params2, Some(expected_params));
}
#[tokio::test]
async fn test_send_event_as_notification_with_meta() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(2);
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let session_configured_event = SessionConfiguredEvent {
session_id: Uuid::new_v4(),
model: "gpt-4o".to_string(),
history_log_id: 1,
history_entry_count: 1000,
};
let event = Event {
id: "1".to_string(),
msg: EventMsg::SessionConfigured(session_configured_event.clone()),
};
let meta = OutgoingNotificationMeta {
request_id: Some(RequestId::String("123".to_string())),
};
outgoing_message_sender
.send_event_as_notification(&event, Some(meta))
.await;
let result = outgoing_rx.recv().await.unwrap();
let OutgoingMessage::Notification(OutgoingNotification { method, params }) = result else {
panic!("expected Notification for first message");
};
assert_eq!(method, "codex/event");
let expected_params = json!({
"_meta": {
"requestId": "123",
},
"id": "1",
"msg": {
"session_id": session_configured_event.session_id,
"model": session_configured_event.model,
"history_log_id": session_configured_event.history_log_id,
"history_entry_count": session_configured_event.history_entry_count,
"type": "session_configured",
}
});
assert_eq!(params.unwrap(), expected_params);
let result2 = outgoing_rx.recv().await.unwrap();
let OutgoingMessage::Notification(OutgoingNotification {
method: method2,
params: params2,
}) = result2
else {
panic!("expected Notification for second message");
};
assert_eq!(method2, event.msg.to_string());
assert_eq!(params2.unwrap(), expected_params);
}
}