remove mcp-server/src/mcp_protocol.rs and the code that depends on it (#2360)

This commit is contained in:
Michael Bolin
2025-08-18 00:29:18 -07:00
committed by GitHub
parent b581498882
commit a269754668
16 changed files with 274 additions and 1858 deletions

View File

@@ -20,11 +20,11 @@ use mcp_test_support::McpProcess;
use mcp_test_support::create_final_assistant_message_sse_response;
use mcp_test_support::create_mock_chat_completions_server;
use mcp_test_support::create_shell_sse_response;
use mcp_test_support::to_response;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde::de::DeserializeOwned;
use std::env;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -168,12 +168,6 @@ async fn test_codex_jsonrpc_conversation_flow() {
to_response(remove_listener_resp).expect("deserialize removeConversationListener response");
}
fn to_response<T: DeserializeOwned>(response: JSONRPCResponse) -> anyhow::Result<T> {
let value = serde_json::to_value(response.result)?;
let codex_response = serde_json::from_value(value)?;
Ok(codex_response)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_send_user_turn_changes_approval_policy_behavior() {
if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {

View File

@@ -1,7 +1,7 @@
[package]
edition = "2024"
name = "mcp_test_support"
version = { workspace = true }
edition = "2024"
[lib]
path = "lib.rs"
@@ -9,10 +9,11 @@ path = "lib.rs"
[dependencies]
anyhow = "1"
assert_cmd = "2"
codex-mcp-server = { path = "../.." }
codex-core = { path = "../../../core" }
codex-mcp-server = { path = "../.." }
mcp-types = { path = "../../../mcp-types" }
pretty_assertions = "1.4.1"
serde = { version = "1" }
serde_json = "1"
shlex = "1.3.0"
tempfile = "3"
@@ -22,5 +23,5 @@ tokio = { version = "1", features = [
"process",
"rt-multi-thread",
] }
uuid = { version = "1", features = ["serde", "v4"] }
wiremock = "0.6"
uuid = { version = "1", features = ["serde", "v4"] }

View File

@@ -3,7 +3,15 @@ mod mock_model_server;
mod responses;
pub use mcp_process::McpProcess;
use mcp_types::JSONRPCResponse;
pub use mock_model_server::create_mock_chat_completions_server;
pub use responses::create_apply_patch_sse_response;
pub use responses::create_final_assistant_message_sse_response;
pub use responses::create_shell_sse_response;
use serde::de::DeserializeOwned;
pub fn to_response<T: DeserializeOwned>(response: JSONRPCResponse) -> anyhow::Result<T> {
let value = serde_json::to_value(response.result)?;
let codex_response = serde_json::from_value(value)?;
Ok(codex_response)
}

View File

@@ -11,14 +11,9 @@ use tokio::process::ChildStdout;
use anyhow::Context;
use assert_cmd::prelude::*;
use codex_core::protocol::InputItem;
use codex_mcp_server::CodexToolCallParam;
use codex_mcp_server::CodexToolCallReplyParam;
use codex_mcp_server::mcp_protocol::ConversationCreateArgs;
use codex_mcp_server::mcp_protocol::ConversationId;
use codex_mcp_server::mcp_protocol::ConversationSendMessageArgs;
use codex_mcp_server::mcp_protocol::ToolCallRequestParams;
use codex_mcp_server::wire_format::AddConversationListenerParams;
use codex_mcp_server::wire_format::InterruptConversationParams;
use codex_mcp_server::wire_format::NewConversationParams;
use codex_mcp_server::wire_format::RemoveConversationListenerParams;
use codex_mcp_server::wire_format::SendUserMessageParams;
@@ -40,7 +35,6 @@ use pretty_assertions::assert_eq;
use serde_json::json;
use std::process::Command as StdCommand;
use tokio::process::Command;
use uuid::Uuid;
pub struct McpProcess {
next_request_id: AtomicI64,
@@ -167,83 +161,6 @@ impl McpProcess {
.await
}
pub async fn send_codex_reply_tool_call(
&mut self,
session_id: &str,
prompt: &str,
) -> anyhow::Result<i64> {
let codex_tool_call_params = CallToolRequestParams {
name: "codex-reply".to_string(),
arguments: Some(serde_json::to_value(CodexToolCallReplyParam {
prompt: prompt.to_string(),
session_id: session_id.to_string(),
})?),
};
self.send_request(
mcp_types::CallToolRequest::METHOD,
Some(serde_json::to_value(codex_tool_call_params)?),
)
.await
}
pub async fn send_user_message_tool_call(
&mut self,
message: &str,
session_id: &str,
) -> anyhow::Result<i64> {
let params = ToolCallRequestParams::ConversationSendMessage(ConversationSendMessageArgs {
conversation_id: ConversationId(Uuid::parse_str(session_id)?),
content: vec![InputItem::Text {
text: message.to_string(),
}],
parent_message_id: None,
conversation_overrides: None,
});
self.send_request(
mcp_types::CallToolRequest::METHOD,
Some(serde_json::to_value(params)?),
)
.await
}
pub async fn send_conversation_create_tool_call(
&mut self,
prompt: &str,
model: &str,
cwd: &str,
) -> anyhow::Result<i64> {
let params = ToolCallRequestParams::ConversationCreate(ConversationCreateArgs {
prompt: prompt.to_string(),
model: model.to_string(),
cwd: cwd.to_string(),
approval_policy: None,
sandbox: None,
config: None,
profile: None,
base_instructions: None,
});
self.send_request(
mcp_types::CallToolRequest::METHOD,
Some(serde_json::to_value(params)?),
)
.await
}
pub async fn send_conversation_create_with_args(
&mut self,
args: ConversationCreateArgs,
) -> anyhow::Result<i64> {
let params = ToolCallRequestParams::ConversationCreate(args);
self.send_request(
mcp_types::CallToolRequest::METHOD,
Some(serde_json::to_value(params)?),
)
.await
}
// ---------------------------------------------------------------------
// Codex JSON-RPC (non-tool) helpers
// ---------------------------------------------------------------------
/// Send a `newConversation` JSON-RPC request.
pub async fn send_new_conversation_request(
&mut self,
@@ -291,6 +208,15 @@ impl McpProcess {
self.send_request("sendUserTurn", params).await
}
/// Send a `interruptConversation` JSON-RPC request.
pub async fn send_interrupt_conversation_request(
&mut self,
params: InterruptConversationParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("interruptConversation", params).await
}
async fn send_request(
&mut self,
method: &str,
@@ -335,6 +261,7 @@ impl McpProcess {
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
Ok(message)
}
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<JSONRPCRequest> {
loop {
let message = self.read_jsonrpc_message().await?;
@@ -384,6 +311,33 @@ impl McpProcess {
}
}
pub async fn read_stream_until_error_message(
&mut self,
request_id: RequestId,
) -> anyhow::Result<mcp_types::JSONRPCError> {
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");
match message {
JSONRPCMessage::Notification(_) => {
eprintln!("notification: {message:?}");
}
JSONRPCMessage::Request(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
}
JSONRPCMessage::Response(_) => {
// Keep scanning; we're waiting for an error with matching id.
}
JSONRPCMessage::Error(err) => {
if err.id == request_id {
return Ok(err);
}
}
}
}
}
pub async fn read_stream_until_notification_message(
&mut self,
method: &str,
@@ -411,80 +365,6 @@ impl McpProcess {
}
}
pub async fn read_stream_until_configured_response_message(
&mut self,
) -> anyhow::Result<String> {
let mut sid_old: Option<String> = None;
let mut sid_new: Option<String> = None;
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");
match message {
JSONRPCMessage::Notification(notification) => {
if let Some(params) = notification.params {
// Back-compat schema: method == "codex/event" and msg.type == "session_configured"
if notification.method == "codex/event" {
if let Some(msg) = params.get("msg") {
if msg.get("type").and_then(|v| v.as_str())
== Some("session_configured")
{
if let Some(session_id) =
msg.get("session_id").and_then(|v| v.as_str())
{
sid_old = Some(session_id.to_string());
}
}
}
}
// New schema: method is the Display of EventMsg::SessionConfigured => "SessionConfigured"
if notification.method == "session_configured" {
if let Some(msg) = params.get("msg") {
if let Some(session_id) =
msg.get("session_id").and_then(|v| v.as_str())
{
sid_new = Some(session_id.to_string());
}
}
}
}
if sid_old.is_some() && sid_new.is_some() {
// Both seen, they must match
assert_eq!(
sid_old.as_ref().unwrap(),
sid_new.as_ref().unwrap(),
"session_id mismatch between old and new schema"
);
return Ok(sid_old.unwrap());
}
}
JSONRPCMessage::Request(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
}
JSONRPCMessage::Error(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
}
JSONRPCMessage::Response(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
}
}
}
}
pub async fn send_notification(
&mut self,
method: &str,
params: Option<serde_json::Value>,
) -> anyhow::Result<()> {
self.send_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: method.to_string(),
params,
}))
.await
}
/// Reads notifications until a legacy TaskComplete event is observed:
/// Method "codex/event" with params.msg.type == "task_complete".
pub async fn read_stream_until_legacy_task_complete_notification(

View File

@@ -1,8 +1,16 @@
use std::path::Path;
use codex_mcp_server::wire_format::AddConversationListenerParams;
use codex_mcp_server::wire_format::AddConversationSubscriptionResponse;
use codex_mcp_server::wire_format::InputItem;
use codex_mcp_server::wire_format::NewConversationParams;
use codex_mcp_server::wire_format::NewConversationResponse;
use codex_mcp_server::wire_format::SendUserMessageParams;
use codex_mcp_server::wire_format::SendUserMessageResponse;
use mcp_test_support::McpProcess;
use mcp_test_support::create_final_assistant_message_sse_response;
use mcp_test_support::create_mock_chat_completions_server;
use mcp_test_support::to_response;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
@@ -33,43 +41,64 @@ async fn test_conversation_create_and_send_message_ok() {
.expect("init timeout")
.expect("init failed");
// Create a conversation via the new tool.
let req_id = mcp
.send_conversation_create_tool_call("", "o3", "/repo")
// Create a conversation via the new JSON-RPC API.
let new_conv_id = mcp
.send_new_conversation_request(NewConversationParams {
model: Some("o3".to_string()),
..Default::default()
})
.await
.expect("send conversationCreate");
let resp: JSONRPCResponse = timeout(
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
)
.await
.expect("create response timeout")
.expect("create response error");
.expect("newConversation timeout")
.expect("newConversation resp");
let NewConversationResponse {
conversation_id,
model,
} = to_response::<NewConversationResponse>(new_conv_resp)
.expect("deserialize newConversation response");
assert_eq!(model, "o3");
// Structured content must include status=ok, a UUID conversation_id and the model we passed.
let sc = &resp.result["structuredContent"];
let conv_id = sc["conversation_id"].as_str().expect("uuid string");
assert!(!conv_id.is_empty());
assert_eq!(sc["model"], json!("o3"));
// Now send a message to the created conversation and expect an OK result.
let send_id = mcp
.send_user_message_tool_call("Hello", conv_id)
// Add a listener so we receive notifications for this conversation (not strictly required for this test).
let add_listener_id = mcp
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
.await
.expect("send message");
.expect("send addConversationListener");
let _sub: AddConversationSubscriptionResponse =
to_response::<AddConversationSubscriptionResponse>(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
)
.await
.expect("addConversationListener timeout")
.expect("addConversationListener resp"),
)
.expect("deserialize addConversationListener response");
// Now send a user message via the wire API and expect an OK (empty object) result.
let send_id = mcp
.send_send_user_message_request(SendUserMessageParams {
conversation_id,
items: vec![InputItem::Text {
text: "Hello".to_string(),
}],
})
.await
.expect("send sendUserMessage");
let send_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_id)),
)
.await
.expect("send response timeout")
.expect("send response error");
assert_eq!(
send_resp.result["structuredContent"],
json!({ "status": "ok" })
);
.expect("sendUserMessage timeout")
.expect("sendUserMessage resp");
let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(send_resp)
.expect("deserialize sendUserMessage response");
// avoid race condition by waiting for the mock server to receive the chat.completions request
let deadline = std::time::Instant::now() + DEFAULT_READ_TIMEOUT;

View File

@@ -3,15 +3,24 @@
use std::path::Path;
use codex_core::protocol::TurnAbortReason;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_mcp_server::CodexToolCallParam;
use serde_json::json;
use codex_mcp_server::wire_format::AddConversationListenerParams;
use codex_mcp_server::wire_format::InterruptConversationParams;
use codex_mcp_server::wire_format::InterruptConversationResponse;
use codex_mcp_server::wire_format::NewConversationParams;
use codex_mcp_server::wire_format::NewConversationResponse;
use codex_mcp_server::wire_format::SendUserMessageParams;
use codex_mcp_server::wire_format::SendUserMessageResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use tempfile::TempDir;
use tokio::time::timeout;
use mcp_test_support::McpProcess;
use mcp_test_support::create_mock_chat_completions_server;
use mcp_test_support::create_shell_sse_response;
use mcp_test_support::to_response;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
@@ -39,93 +48,91 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 60".to_string(),
"Start-Sleep -Seconds 10".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "60".to_string()];
let workdir_for_shell_function_call = TempDir::new()?;
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let tmp = TempDir::new()?;
// Temporary Codex home with config pointing at the mock server.
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
// Create mock server with a single SSE response: the long sleep command
let server = create_mock_chat_completions_server(vec![
create_shell_sse_response(
shell_command.clone(),
Some(workdir_for_shell_function_call.path()),
Some(60_000), // 60 seconds timeout in ms
"call_sleep",
)?,
create_shell_sse_response(
shell_command.clone(),
Some(workdir_for_shell_function_call.path()),
Some(60_000), // 60 seconds timeout in ms
"call_sleep",
)?,
])
let server = create_mock_chat_completions_server(vec![create_shell_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000), // 10 seconds timeout in ms
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, server.uri())?;
// Create Codex configuration
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), server.uri())?;
let mut mcp_process = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp_process.initialize()).await??;
// Start MCP server and initialize.
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Send codex tool call that triggers "sleep 60"
let codex_request_id = mcp_process
.send_codex_tool_call(CodexToolCallParam {
cwd: None,
prompt: "First Run: run `sleep 60`".to_string(),
model: None,
profile: None,
approval_policy: None,
sandbox: None,
config: None,
base_instructions: None,
include_plan_tool: None,
// 1) newConversation
let new_conv_id = mcp
.send_new_conversation_request(NewConversationParams {
cwd: Some(working_directory.to_string_lossy().into_owned()),
..Default::default()
})
.await?;
let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
)
.await??;
let new_conv_resp = to_response::<NewConversationResponse>(new_conv_resp)?;
let NewConversationResponse {
conversation_id, ..
} = new_conv_resp;
let session_id = mcp_process
.read_stream_until_configured_response_message()
// 2) addConversationListener
let add_listener_id = mcp
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
.await?;
let _add_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
)
.await??;
// 3) sendUserMessage (should trigger notifications; we only validate an OK response)
let send_user_id = mcp
.send_send_user_message_request(SendUserMessageParams {
conversation_id,
items: vec![codex_mcp_server::wire_format::InputItem::Text {
text: "run first sleep command".to_string(),
}],
})
.await?;
let send_user_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)),
)
.await??;
let SendUserMessageResponse {} = to_response::<SendUserMessageResponse>(send_user_resp)?;
// Give the command a moment to start
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Send interrupt notification
mcp_process
.send_notification(
"notifications/cancelled",
Some(json!({ "requestId": codex_request_id })),
)
// 4) send interrupt request
let interrupt_id = mcp
.send_interrupt_conversation_request(InterruptConversationParams { conversation_id })
.await?;
// Expect Codex to emit a TurnAborted event notification
let _turn_aborted = timeout(
let interrupt_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp_process.read_stream_until_notification_message("turn_aborted"),
mcp.read_stream_until_response_message(RequestId::Integer(interrupt_id)),
)
.await??;
let InterruptConversationResponse { abort_reason } =
to_response::<InterruptConversationResponse>(interrupt_resp)?;
assert_eq!(TurnAbortReason::Interrupted, abort_reason);
let codex_reply_request_id = mcp_process
.send_codex_reply_tool_call(&session_id, "Second Run: run `sleep 60`")
.await?;
// Give the command a moment to start
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Send interrupt notification
mcp_process
.send_notification(
"notifications/cancelled",
Some(json!({ "requestId": codex_reply_request_id })),
)
.await?;
// Expect Codex to emit a TurnAborted event notification
let _turn_aborted = timeout(
DEFAULT_READ_TIMEOUT,
mcp_process.read_stream_until_notification_message("turn_aborted"),
)
.await??;
Ok(())
}

View File

@@ -1,16 +1,21 @@
use std::path::Path;
use std::thread::sleep;
use std::time::Duration;
use codex_mcp_server::CodexToolCallParam;
use codex_mcp_server::wire_format::AddConversationListenerParams;
use codex_mcp_server::wire_format::AddConversationSubscriptionResponse;
use codex_mcp_server::wire_format::ConversationId;
use codex_mcp_server::wire_format::InputItem;
use codex_mcp_server::wire_format::NewConversationParams;
use codex_mcp_server::wire_format::NewConversationResponse;
use codex_mcp_server::wire_format::SendUserMessageParams;
use codex_mcp_server::wire_format::SendUserMessageResponse;
use mcp_test_support::McpProcess;
use mcp_test_support::create_final_assistant_message_sse_response;
use mcp_test_support::create_mock_chat_completions_server;
use mcp_types::JSONRPC_VERSION;
use mcp_test_support::to_response;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -31,76 +36,94 @@ async fn test_send_message_success() {
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml");
// Start MCP server process and initialize.
let mut mcp_process = McpProcess::new(codex_home.path())
let mut mcp = McpProcess::new(codex_home.path())
.await
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp_process.initialize())
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timed out")
.expect("init failed");
// Kick off a Codex session so we have a valid session_id.
let codex_request_id = mcp_process
.send_codex_tool_call(CodexToolCallParam {
prompt: "Start a session".to_string(),
..Default::default()
})
// Start a conversation using the new wire API.
let new_conv_id = mcp
.send_new_conversation_request(NewConversationParams::default())
.await
.expect("send codex tool call");
// Wait for the session_configured event to get the session_id.
let session_id = mcp_process
.read_stream_until_configured_response_message()
.await
.expect("read session_configured");
// The original codex call will finish quickly given our mock; consume its response.
timeout(
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)),
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
)
.await
.expect("codex response timeout")
.expect("codex response error");
.expect("newConversation timeout")
.expect("newConversation resp");
let NewConversationResponse {
conversation_id, ..
} = to_response::<_>(new_conv_resp).expect("deserialize newConversation response");
// Now exercise the send-user-message tool.
let send_msg_request_id = mcp_process
.send_user_message_tool_call("Hello again", &session_id)
// 2) addConversationListener
let add_listener_id = mcp
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
.await
.expect("send send-message tool call");
.expect("send addConversationListener");
let add_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
)
.await
.expect("addConversationListener timeout")
.expect("addConversationListener resp");
let AddConversationSubscriptionResponse { subscription_id: _ } =
to_response::<_>(add_listener_resp).expect("deserialize addConversationListener response");
// Now exercise sendUserMessage twice.
send_message("Hello", conversation_id, &mut mcp).await;
send_message("Hello again", conversation_id, &mut mcp).await;
}
#[expect(clippy::expect_used)]
async fn send_message(message: &str, conversation_id: ConversationId, mcp: &mut McpProcess) {
// Now exercise sendUserMessage.
let send_id = mcp
.send_send_user_message_request(SendUserMessageParams {
conversation_id,
items: vec![InputItem::Text {
text: message.to_string(),
}],
})
.await
.expect("send sendUserMessage");
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp_process.read_stream_until_response_message(RequestId::Integer(send_msg_request_id)),
mcp.read_stream_until_response_message(RequestId::Integer(send_id)),
)
.await
.expect("send-user-message response timeout")
.expect("send-user-message response error");
.expect("sendUserMessage response timeout")
.expect("sendUserMessage response error");
let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(response)
.expect("deserialize sendUserMessage response");
// Verify the task_finished notification is received.
// Note this also ensures that the final request to the server was made.
let task_finished_notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await
.expect("task_finished_notification timeout")
.expect("task_finished_notification resp");
let serde_json::Value::Object(map) = task_finished_notification
.params
.expect("notification should have params")
else {
panic!("task_finished_notification should have params");
};
assert_eq!(
JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: RequestId::Integer(send_msg_request_id),
result: json!({
"content": [
{
"text": "{\"status\":\"ok\"}",
"type": "text",
}
],
"isError": false,
"structuredContent": {
"status": "ok"
}
}),
},
response
map.get("conversationId")
.expect("should have conversationId"),
&serde_json::Value::String(conversation_id.to_string())
);
// wait for the server to hear the user message
sleep(Duration::from_secs(5));
// Ensure the server and tempdir live until end of test
drop(server);
}
#[tokio::test]
@@ -113,24 +136,26 @@ async fn test_send_message_session_not_found() {
.expect("timeout")
.expect("init");
let unknown = uuid::Uuid::new_v4().to_string();
let unknown = ConversationId(uuid::Uuid::new_v4());
let req_id = mcp
.send_user_message_tool_call("ping", &unknown)
.send_send_user_message_request(SendUserMessageParams {
conversation_id: unknown,
items: vec![InputItem::Text {
text: "ping".to_string(),
}],
})
.await
.expect("send tool");
.expect("send sendUserMessage");
let resp: JSONRPCResponse = timeout(
// Expect an error response for unknown conversation.
let err = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
mcp.read_stream_until_error_message(RequestId::Integer(req_id)),
)
.await
.expect("timeout")
.expect("resp");
let result = resp.result.clone();
let content = result["content"][0]["text"].as_str().unwrap_or("");
assert!(content.contains("Session does not exist"));
assert_eq!(result["isError"], json!(true));
.expect("error");
assert_eq!(err.id, RequestId::Integer(req_id));
}
// ---------------------------------------------------------------------------