Files
llmx/codex-rs/mcp-server/tests/common/mcp_process.rs

260 lines
8.6 KiB
Rust
Raw Normal View History

test: add integration test for MCP server (#1633) This PR introduces a single integration test for `cargo mcp`, though it also introduces a number of reusable components so that it should be easier to introduce more integration tests going forward. The new test is introduced in `codex-rs/mcp-server/tests/elicitation.rs` and the reusable pieces are in `codex-rs/mcp-server/tests/common`. The test itself verifies new functionality around elicitations introduced in https://github.com/openai/codex/pull/1623 (and the fix introduced in https://github.com/openai/codex/pull/1629) by doing the following: - starts a mock model provider with canned responses for `/v1/chat/completions` - starts the MCP server with a `config.toml` to use that model provider (and `approval_policy = "untrusted"`) - sends the `codex` tool call which causes the mock model provider to request a shell call for `git init` - the MCP server sends an elicitation to the client to approve the request - the client replies to the elicitation with `"approved"` - the MCP server runs the command and re-samples the model, getting a `"finish_reason": "stop"` - in turn, the MCP server sends the final response to the original `codex` tool call - verifies that `git init` ran as expected To test: ``` cargo test shell_command_approval_triggers_elicitation ``` In writing this test, I discovered that `ExecApprovalResponse` does not conform to `ElicitResult`, so I added a TODO to fix that, since I think that should be updated in a separate PR. As it stands, this PR does not update any business logic, though it does make a number of members of the `mcp-server` crate `pub` so they can be used in the test. One additional learning from this PR is that `std::process::Command::cargo_bin()` from the `assert_cmd` trait is only available for `std::process::Command`, but we really want to use `tokio::process::Command` so that everything is async and we can leverage utilities like `tokio::time::timeout()`. The trick I came up with was to use `cargo_bin()` to locate the program, and then to use `std::process::Command::get_program()` when constructing the `tokio::process::Command`.
2025-07-21 10:27:07 -07:00
use std::path::Path;
use std::process::Stdio;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Child;
use tokio::process::ChildStdin;
use tokio::process::ChildStdout;
use anyhow::Context;
use assert_cmd::prelude::*;
use codex_mcp_server::CodexToolCallParam;
use mcp_types::CallToolRequestParams;
use mcp_types::ClientCapabilities;
use mcp_types::Implementation;
use mcp_types::InitializeRequestParams;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCRequest;
use mcp_types::JSONRPCResponse;
use mcp_types::ModelContextProtocolNotification;
use mcp_types::ModelContextProtocolRequest;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::process::Command as StdCommand;
use tokio::process::Command;
pub struct McpProcess {
next_request_id: AtomicI64,
/// Retain this child process until the client is dropped. The Tokio runtime
/// will make a "best effort" to reap the process after it exits, but it is
/// not a guarantee. See the `kill_on_drop` documentation for details.
#[allow(dead_code)]
process: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl McpProcess {
pub async fn new(codex_home: &Path) -> anyhow::Result<Self> {
// Use assert_cmd to locate the binary path and then switch to tokio::process::Command
let std_cmd = StdCommand::cargo_bin("codex-mcp-server")
.context("should find binary for codex-mcp-server")?;
let program = std_cmd.get_program().to_owned();
let mut cmd = Command::new(program);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.env("CODEX_HOME", codex_home);
cmd.env("RUST_LOG", "debug");
let mut process = cmd
.kill_on_drop(true)
.spawn()
.context("codex-mcp-server proc should start")?;
let stdin = process
.stdin
.take()
.ok_or_else(|| anyhow::format_err!("mcp should have stdin fd"))?;
let stdout = process
.stdout
.take()
.ok_or_else(|| anyhow::format_err!("mcp should have stdout fd"))?;
let stdout = BufReader::new(stdout);
Ok(Self {
next_request_id: AtomicI64::new(0),
process,
stdin,
stdout,
})
}
/// Performs the initialization handshake with the MCP server.
pub async fn initialize(&mut self) -> anyhow::Result<()> {
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
let params = InitializeRequestParams {
capabilities: ClientCapabilities {
elicitation: Some(json!({})),
experimental: None,
roots: None,
sampling: None,
},
client_info: Implementation {
name: "elicitation test".into(),
title: Some("Elicitation Test".into()),
version: "0.0.0".into(),
},
protocol_version: mcp_types::MCP_SCHEMA_VERSION.into(),
};
let params_value = serde_json::to_value(params)?;
self.send_jsonrpc_message(JSONRPCMessage::Request(JSONRPCRequest {
jsonrpc: JSONRPC_VERSION.into(),
id: RequestId::Integer(request_id),
method: mcp_types::InitializeRequest::METHOD.into(),
params: Some(params_value),
}))
.await?;
let initialized = self.read_jsonrpc_message().await?;
assert_eq!(
JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: RequestId::Integer(request_id),
result: json!({
"capabilities": {
"tools": {
"listChanged": true
},
},
"serverInfo": {
"name": "codex-mcp-server",
"title": "Codex",
"version": "0.0.0"
},
"protocolVersion": mcp_types::MCP_SCHEMA_VERSION
})
}),
initialized
);
// Send notifications/initialized to ack the response.
self.send_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: mcp_types::InitializedNotification::METHOD.into(),
params: None,
}))
.await?;
Ok(())
}
/// Returns the id used to make the request so it can be used when
/// correlating notifications.
pub async fn send_codex_tool_call(
&mut self,
cwd: Option<String>,
prompt: &str,
) -> anyhow::Result<i64> {
test: add integration test for MCP server (#1633) This PR introduces a single integration test for `cargo mcp`, though it also introduces a number of reusable components so that it should be easier to introduce more integration tests going forward. The new test is introduced in `codex-rs/mcp-server/tests/elicitation.rs` and the reusable pieces are in `codex-rs/mcp-server/tests/common`. The test itself verifies new functionality around elicitations introduced in https://github.com/openai/codex/pull/1623 (and the fix introduced in https://github.com/openai/codex/pull/1629) by doing the following: - starts a mock model provider with canned responses for `/v1/chat/completions` - starts the MCP server with a `config.toml` to use that model provider (and `approval_policy = "untrusted"`) - sends the `codex` tool call which causes the mock model provider to request a shell call for `git init` - the MCP server sends an elicitation to the client to approve the request - the client replies to the elicitation with `"approved"` - the MCP server runs the command and re-samples the model, getting a `"finish_reason": "stop"` - in turn, the MCP server sends the final response to the original `codex` tool call - verifies that `git init` ran as expected To test: ``` cargo test shell_command_approval_triggers_elicitation ``` In writing this test, I discovered that `ExecApprovalResponse` does not conform to `ElicitResult`, so I added a TODO to fix that, since I think that should be updated in a separate PR. As it stands, this PR does not update any business logic, though it does make a number of members of the `mcp-server` crate `pub` so they can be used in the test. One additional learning from this PR is that `std::process::Command::cargo_bin()` from the `assert_cmd` trait is only available for `std::process::Command`, but we really want to use `tokio::process::Command` so that everything is async and we can leverage utilities like `tokio::time::timeout()`. The trick I came up with was to use `cargo_bin()` to locate the program, and then to use `std::process::Command::get_program()` when constructing the `tokio::process::Command`.
2025-07-21 10:27:07 -07:00
let codex_tool_call_params = CallToolRequestParams {
name: "codex".to_string(),
arguments: Some(serde_json::to_value(CodexToolCallParam {
cwd,
test: add integration test for MCP server (#1633) This PR introduces a single integration test for `cargo mcp`, though it also introduces a number of reusable components so that it should be easier to introduce more integration tests going forward. The new test is introduced in `codex-rs/mcp-server/tests/elicitation.rs` and the reusable pieces are in `codex-rs/mcp-server/tests/common`. The test itself verifies new functionality around elicitations introduced in https://github.com/openai/codex/pull/1623 (and the fix introduced in https://github.com/openai/codex/pull/1629) by doing the following: - starts a mock model provider with canned responses for `/v1/chat/completions` - starts the MCP server with a `config.toml` to use that model provider (and `approval_policy = "untrusted"`) - sends the `codex` tool call which causes the mock model provider to request a shell call for `git init` - the MCP server sends an elicitation to the client to approve the request - the client replies to the elicitation with `"approved"` - the MCP server runs the command and re-samples the model, getting a `"finish_reason": "stop"` - in turn, the MCP server sends the final response to the original `codex` tool call - verifies that `git init` ran as expected To test: ``` cargo test shell_command_approval_triggers_elicitation ``` In writing this test, I discovered that `ExecApprovalResponse` does not conform to `ElicitResult`, so I added a TODO to fix that, since I think that should be updated in a separate PR. As it stands, this PR does not update any business logic, though it does make a number of members of the `mcp-server` crate `pub` so they can be used in the test. One additional learning from this PR is that `std::process::Command::cargo_bin()` from the `assert_cmd` trait is only available for `std::process::Command`, but we really want to use `tokio::process::Command` so that everything is async and we can leverage utilities like `tokio::time::timeout()`. The trick I came up with was to use `cargo_bin()` to locate the program, and then to use `std::process::Command::get_program()` when constructing the `tokio::process::Command`.
2025-07-21 10:27:07 -07:00
prompt: prompt.to_string(),
model: None,
profile: None,
approval_policy: None,
sandbox: None,
config: None,
})?),
};
self.send_request(
mcp_types::CallToolRequest::METHOD,
Some(serde_json::to_value(codex_tool_call_params)?),
)
.await
}
async fn send_request(
&mut self,
method: &str,
params: Option<serde_json::Value>,
) -> anyhow::Result<i64> {
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
let message = JSONRPCMessage::Request(JSONRPCRequest {
jsonrpc: JSONRPC_VERSION.into(),
id: RequestId::Integer(request_id),
method: method.to_string(),
params,
});
self.send_jsonrpc_message(message).await?;
Ok(request_id)
}
pub async fn send_response(
&mut self,
id: RequestId,
result: serde_json::Value,
) -> anyhow::Result<()> {
self.send_jsonrpc_message(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id,
result,
}))
.await
}
async fn send_jsonrpc_message(&mut self, message: JSONRPCMessage) -> anyhow::Result<()> {
let payload = serde_json::to_string(&message)?;
self.stdin.write_all(payload.as_bytes()).await?;
self.stdin.write_all(b"\n").await?;
self.stdin.flush().await?;
Ok(())
}
async fn read_jsonrpc_message(&mut self) -> anyhow::Result<JSONRPCMessage> {
let mut line = String::new();
self.stdout.read_line(&mut line).await?;
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
Ok(message)
}
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<JSONRPCRequest> {
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");
match message {
JSONRPCMessage::Notification(_) => {
eprintln!("notification: {message:?}");
}
JSONRPCMessage::Request(jsonrpc_request) => {
return Ok(jsonrpc_request);
}
JSONRPCMessage::Error(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
}
JSONRPCMessage::Response(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
}
}
}
}
pub async fn read_stream_until_response_message(
&mut self,
request_id: RequestId,
) -> anyhow::Result<JSONRPCResponse> {
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");
match message {
JSONRPCMessage::Notification(_) => {
eprintln!("notification: {message:?}");
}
JSONRPCMessage::Request(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
}
JSONRPCMessage::Error(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
}
JSONRPCMessage::Response(jsonrpc_response) => {
if jsonrpc_response.id == request_id {
return Ok(jsonrpc_response);
}
}
}
}
}
}