fix: verify notifications are sent with the conversationId set (#2278)

This updates `CodexMessageProcessor` so that each notification it sends
for a `EventMsg` from a `CodexConversation` such that:

- The `params` always has an appropriate `conversationId` field.
- The `method` is now includes the name of the `EventMsg` type rather
than using `codex/event` as the `method` type for all notifications. (We
currently prefix the method name with `codex/event/`, but I think that
should go away once we formalize the notification schema in
`wire_format.rs`.)

As part of this, we update `test_codex_jsonrpc_conversation_flow()` to
verify that the `task_finished` notification has made it through the
system instead of sleeping for 5s and "hoping" the server finished
processing the task. Note we have seen some flakiness in some of our
other, similar integration tests, and I expect adding a similar check
would help in those cases, as well.
This commit is contained in:
Michael Bolin
2025-08-13 17:54:12 -07:00
committed by GitHub
parent e7bad650ff
commit a62510e0ae
4 changed files with 75 additions and 13 deletions

View File

@@ -15,7 +15,7 @@ use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::json_to_toml::json_to_toml;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotificationMeta;
use crate::outgoing_message::OutgoingNotification;
use crate::wire_format::AddConversationListenerParams;
use crate::wire_format::AddConversationSubscriptionResponse;
use crate::wire_format::CodexRequest;
@@ -176,7 +176,6 @@ impl CodexMessageProcessor {
self.conversation_listeners
.insert(subscription_id, cancel_tx);
let outgoing_for_task = self.outgoing.clone();
let add_listener_request_id = request_id.clone();
tokio::spawn(async move {
loop {
tokio::select! {
@@ -193,10 +192,24 @@ impl CodexMessageProcessor {
}
};
outgoing_for_task.send_event_as_notification(
&event,
Some(OutgoingNotificationMeta::new(Some(add_listener_request_id.clone()))),
)
let method = format!("codex/event/{}", event.msg);
let mut params = match serde_json::to_value(event) {
Ok(serde_json::Value::Object(map)) => map,
Ok(_) => {
tracing::error!("event did not serialize to an object");
continue;
}
Err(err) => {
tracing::error!("failed to serialize event: {err}");
continue;
}
};
params.insert("conversationId".to_string(), conversation_id.to_string().into());
outgoing_for_task.send_notification(OutgoingNotification {
method,
params: Some(params.into()),
})
.await;
}
}

View File

@@ -114,16 +114,21 @@ impl OutgoingMessageSender {
event_json
};
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {
self.send_notification(OutgoingNotification {
method: "codex/event".to_string(),
params: Some(params.clone()),
});
let _ = self.sender.send(outgoing_message).await;
})
.await;
self.send_event_as_notification_new_schema(event, Some(params.clone()))
.await;
}
pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
let outgoing_message = OutgoingMessage::Notification(notification);
let _ = self.sender.send(outgoing_message).await;
}
// should be backwards compatible.
// it will replace send_event_as_notification eventually.
async fn send_event_as_notification_new_schema(

View File

@@ -15,6 +15,7 @@ use mcp_test_support::McpProcess;
use mcp_test_support::create_final_assistant_message_sse_response;
use mcp_test_support::create_mock_chat_completions_server;
use mcp_test_support::create_shell_sse_response;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
@@ -123,10 +124,26 @@ async fn test_codex_jsonrpc_conversation_flow() {
let SendUserMessageResponse {} = to_response::<SendUserMessageResponse>(send_user_resp)
.expect("deserialize sendUserMessage response");
// Give the server time to process the user's request.
tokio::time::sleep(std::time::Duration::from_millis(5_000)).await;
// Could verify that some notifications were received?
// Verify the task_finished notification is received.
// Note this also ensures that the final request to the server was made.
let task_finished_notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await
.expect("task_finished_notification timeout")
.expect("task_finished_notification resp");
let serde_json::Value::Object(map) = task_finished_notification
.params
.expect("notification should have params")
else {
panic!("task_finished_notification should have params");
};
assert_eq!(
map.get("conversationId")
.expect("should have conversationId"),
&serde_json::Value::String(conversation_id.to_string())
);
// 4) removeConversationListener
let remove_listener_id = mcp

View File

@@ -374,6 +374,33 @@ impl McpProcess {
}
}
pub async fn read_stream_until_notification_message(
&mut self,
method: &str,
) -> anyhow::Result<JSONRPCNotification> {
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");
match message {
JSONRPCMessage::Notification(notification) => {
if notification.method == method {
return Ok(notification);
}
}
JSONRPCMessage::Request(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
}
JSONRPCMessage::Error(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
}
JSONRPCMessage::Response(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
}
}
}
}
pub async fn read_stream_until_configured_response_message(
&mut self,
) -> anyhow::Result<String> {