2025-05-14 13:15:41 -07:00
|
|
|
|
//! Prototype MCP server.
|
|
|
|
|
|
#![deny(clippy::print_stdout, clippy::print_stderr)]
|
|
|
|
|
|
|
2025-08-20 20:36:34 -07:00
|
|
|
|
use std::io::ErrorKind;
|
2025-05-14 13:15:41 -07:00
|
|
|
|
use std::io::Result as IoResult;
|
2025-05-22 21:52:28 -07:00
|
|
|
|
use std::path::PathBuf;
|
2025-05-14 13:15:41 -07:00
|
|
|
|
|
2025-08-20 20:36:34 -07:00
|
|
|
|
use codex_common::CliConfigOverrides;
|
|
|
|
|
|
use codex_core::config::Config;
|
|
|
|
|
|
use codex_core::config::ConfigOverrides;
|
|
|
|
|
|
|
2025-05-14 13:15:41 -07:00
|
|
|
|
use mcp_types::JSONRPCMessage;
|
|
|
|
|
|
use tokio::io::AsyncBufReadExt;
|
|
|
|
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
|
|
use tokio::io::BufReader;
|
|
|
|
|
|
use tokio::io::{self};
|
|
|
|
|
|
use tokio::sync::mpsc;
|
|
|
|
|
|
use tracing::debug;
|
|
|
|
|
|
use tracing::error;
|
|
|
|
|
|
use tracing::info;
|
2025-07-25 11:45:23 -07:00
|
|
|
|
use tracing_subscriber::EnvFilter;
|
2025-05-14 13:15:41 -07:00
|
|
|
|
|
|
|
|
|
|
mod codex_tool_config;
|
|
|
|
|
|
mod codex_tool_runner;
|
2025-08-13 14:29:13 -07:00
|
|
|
|
mod error_code;
|
2025-07-21 23:58:41 -07:00
|
|
|
|
mod exec_approval;
|
2025-08-01 10:04:12 -07:00
|
|
|
|
pub(crate) mod message_processor;
|
chore: introduce OutgoingMessageSender (#1622)
Previous to this change, `MessageProcessor` had a
`tokio::sync::mpsc::Sender<JSONRPCMessage>` as an abstraction for server
code to send a message down to the MCP client. Because `Sender` is cheap
to `clone()`, it was straightforward to make it available to tasks
scheduled with `tokio::task::spawn()`.
This worked well when we were only sending notifications or responses
back down to the client, but we want to add support for sending
elicitations in #1623, which means that we need to be able to send
_requests_ to the client, and now we need a bit of centralization to
ensure all request ids are unique.
To that end, this PR introduces `OutgoingMessageSender`, which houses
the existing `Sender<OutgoingMessage>` as well as an `AtomicI64` to mint
out new, unique request ids. It has methods like `send_request()` and
`send_response()` so that callers do not have to deal with
`JSONRPCMessage` directly, as having to set the `jsonrpc` for each
message was a bit tedious (this cleans up `codex_tool_runner.rs` quite a
bit).
We do not have `OutgoingMessageSender` implement `Clone` because it is
important that the `AtomicI64` is shared across all users of
`OutgoingMessageSender`. As such, `Arc<OutgoingMessageSender>` must be
used instead, as it is frequently shared with new tokio tasks.
As part of this change, we update `message_processor.rs` to embrace
`await`, though we must be careful that no individual handler blocks the
main loop and prevents other messages from being handled.
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/1622).
* #1623
* __->__ #1622
* #1621
* #1620
2025-07-19 00:30:56 -04:00
|
|
|
|
mod outgoing_message;
|
2025-07-21 23:58:41 -07:00
|
|
|
|
mod patch_approval;
|
2025-05-14 13:15:41 -07:00
|
|
|
|
|
|
|
|
|
|
use crate::message_processor::MessageProcessor;
|
chore: introduce OutgoingMessageSender (#1622)
Previous to this change, `MessageProcessor` had a
`tokio::sync::mpsc::Sender<JSONRPCMessage>` as an abstraction for server
code to send a message down to the MCP client. Because `Sender` is cheap
to `clone()`, it was straightforward to make it available to tasks
scheduled with `tokio::task::spawn()`.
This worked well when we were only sending notifications or responses
back down to the client, but we want to add support for sending
elicitations in #1623, which means that we need to be able to send
_requests_ to the client, and now we need a bit of centralization to
ensure all request ids are unique.
To that end, this PR introduces `OutgoingMessageSender`, which houses
the existing `Sender<OutgoingMessage>` as well as an `AtomicI64` to mint
out new, unique request ids. It has methods like `send_request()` and
`send_response()` so that callers do not have to deal with
`JSONRPCMessage` directly, as having to set the `jsonrpc` for each
message was a bit tedious (this cleans up `codex_tool_runner.rs` quite a
bit).
We do not have `OutgoingMessageSender` implement `Clone` because it is
important that the `AtomicI64` is shared across all users of
`OutgoingMessageSender`. As such, `Arc<OutgoingMessageSender>` must be
used instead, as it is frequently shared with new tokio tasks.
As part of this change, we update `message_processor.rs` to embrace
`await`, though we must be careful that no individual handler blocks the
main loop and prevents other messages from being handled.
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/1622).
* #1623
* __->__ #1622
* #1621
* #1620
2025-07-19 00:30:56 -04:00
|
|
|
|
use crate::outgoing_message::OutgoingMessage;
|
|
|
|
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
2025-05-14 13:15:41 -07:00
|
|
|
|
|
test: add integration test for MCP server (#1633)
This PR introduces a single integration test for `cargo mcp`, though it
also introduces a number of reusable components so that it should be
easier to introduce more integration tests going forward.
The new test is introduced in `codex-rs/mcp-server/tests/elicitation.rs`
and the reusable pieces are in `codex-rs/mcp-server/tests/common`.
The test itself verifies new functionality around elicitations
introduced in https://github.com/openai/codex/pull/1623 (and the fix
introduced in https://github.com/openai/codex/pull/1629) by doing the
following:
- starts a mock model provider with canned responses for
`/v1/chat/completions`
- starts the MCP server with a `config.toml` to use that model provider
(and `approval_policy = "untrusted"`)
- sends the `codex` tool call which causes the mock model provider to
request a shell call for `git init`
- the MCP server sends an elicitation to the client to approve the
request
- the client replies to the elicitation with `"approved"`
- the MCP server runs the command and re-samples the model, getting a
`"finish_reason": "stop"`
- in turn, the MCP server sends the final response to the original
`codex` tool call
- verifies that `git init` ran as expected
To test:
```
cargo test shell_command_approval_triggers_elicitation
```
In writing this test, I discovered that `ExecApprovalResponse` does not
conform to `ElicitResult`, so I added a TODO to fix that, since I think
that should be updated in a separate PR. As it stands, this PR does not
update any business logic, though it does make a number of members of
the `mcp-server` crate `pub` so they can be used in the test.
One additional learning from this PR is that
`std::process::Command::cargo_bin()` from the `assert_cmd` trait is only
available for `std::process::Command`, but we really want to use
`tokio::process::Command` so that everything is async and we can
leverage utilities like `tokio::time::timeout()`. The trick I came up
with was to use `cargo_bin()` to locate the program, and then to use
`std::process::Command::get_program()` when constructing the
`tokio::process::Command`.
2025-07-21 10:27:07 -07:00
|
|
|
|
pub use crate::codex_tool_config::CodexToolCallParam;
|
2025-07-22 13:33:49 -07:00
|
|
|
|
pub use crate::codex_tool_config::CodexToolCallReplyParam;
|
2025-07-21 23:58:41 -07:00
|
|
|
|
pub use crate::exec_approval::ExecApprovalElicitRequestParams;
|
|
|
|
|
|
pub use crate::exec_approval::ExecApprovalResponse;
|
|
|
|
|
|
pub use crate::patch_approval::PatchApprovalElicitRequestParams;
|
|
|
|
|
|
pub use crate::patch_approval::PatchApprovalResponse;
|
test: add integration test for MCP server (#1633)
This PR introduces a single integration test for `cargo mcp`, though it
also introduces a number of reusable components so that it should be
easier to introduce more integration tests going forward.
The new test is introduced in `codex-rs/mcp-server/tests/elicitation.rs`
and the reusable pieces are in `codex-rs/mcp-server/tests/common`.
The test itself verifies new functionality around elicitations
introduced in https://github.com/openai/codex/pull/1623 (and the fix
introduced in https://github.com/openai/codex/pull/1629) by doing the
following:
- starts a mock model provider with canned responses for
`/v1/chat/completions`
- starts the MCP server with a `config.toml` to use that model provider
(and `approval_policy = "untrusted"`)
- sends the `codex` tool call which causes the mock model provider to
request a shell call for `git init`
- the MCP server sends an elicitation to the client to approve the
request
- the client replies to the elicitation with `"approved"`
- the MCP server runs the command and re-samples the model, getting a
`"finish_reason": "stop"`
- in turn, the MCP server sends the final response to the original
`codex` tool call
- verifies that `git init` ran as expected
To test:
```
cargo test shell_command_approval_triggers_elicitation
```
In writing this test, I discovered that `ExecApprovalResponse` does not
conform to `ElicitResult`, so I added a TODO to fix that, since I think
that should be updated in a separate PR. As it stands, this PR does not
update any business logic, though it does make a number of members of
the `mcp-server` crate `pub` so they can be used in the test.
One additional learning from this PR is that
`std::process::Command::cargo_bin()` from the `assert_cmd` trait is only
available for `std::process::Command`, but we really want to use
`tokio::process::Command` so that everything is async and we can
leverage utilities like `tokio::time::timeout()`. The trick I came up
with was to use `cargo_bin()` to locate the program, and then to use
`std::process::Command::get_program()` when constructing the
`tokio::process::Command`.
2025-07-21 10:27:07 -07:00
|
|
|
|
|
2025-05-14 13:15:41 -07:00
|
|
|
|
/// Size of the bounded channels used to communicate between tasks. The value
|
|
|
|
|
|
/// is a balance between throughput and memory usage – 128 messages should be
|
|
|
|
|
|
/// plenty for an interactive CLI.
|
|
|
|
|
|
const CHANNEL_CAPACITY: usize = 128;
|
|
|
|
|
|
|
2025-08-20 20:36:34 -07:00
|
|
|
|
pub async fn run_main(
|
|
|
|
|
|
codex_linux_sandbox_exe: Option<PathBuf>,
|
|
|
|
|
|
cli_config_overrides: CliConfigOverrides,
|
|
|
|
|
|
) -> IoResult<()> {
|
2025-05-14 13:15:41 -07:00
|
|
|
|
// Install a simple subscriber so `tracing` output is visible. Users can
|
|
|
|
|
|
// control the log level with `RUST_LOG`.
|
|
|
|
|
|
tracing_subscriber::fmt()
|
|
|
|
|
|
.with_writer(std::io::stderr)
|
2025-07-25 11:45:23 -07:00
|
|
|
|
.with_env_filter(EnvFilter::from_default_env())
|
2025-05-14 13:15:41 -07:00
|
|
|
|
.init();
|
|
|
|
|
|
|
|
|
|
|
|
// Set up channels.
|
|
|
|
|
|
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
fix: switch to unbounded channel (#2874)
#2747 encouraged me to audit our codebase for similar issues, as now I
am particularly suspicious that our flaky tests are due to a racy
deadlock.
I asked Codex to audit our code, and one of its suggestions was this:
> **High-Risk Patterns**
>
> All `send_*` methods await on a bounded
`mpsc::Sender<OutgoingMessage>`. If the writer blocks, the channel fills
and the processor task blocks on send, stops draining incoming requests,
and stdin reader eventually blocks on its send. This creates a
backpressure deadlock cycle across the three tasks.
>
> **Recommendations**
> * Server outgoing path: break the backpressure cycle
> * Option A (minimal risk): Change `OutgoingMessageSender` to use an
unbounded channel to decouple producer from stdout. Add rate logging so
floods are visible.
> * Option B (bounded + drop policy): Change `send_*` to try_send and
drop messages (or coalesce) when the queue is full, logging a warning.
This prevents processor stalls at the cost of losing messages under
extreme backpressure.
> * Option C (two-stage buffer): Keep bounded channel, but have a
dedicated “egress” task that drains an unbounded internal queue, writing
to stdout with retries and a shutdown timeout. This centralizes
backpressure policy.
So this PR is Option A.
Indeed, we previously used a bounded channel with a capacity of `128`,
but as we discovered recently with #2776, there are certainly cases
where we can get flooded with events.
That said, `test_shell_command_approval_triggers_elicitation` just
failed one one build when I put up this PR, so clearly we are not out of
the woods yet...
**Update:** I think I found the true source of the deadlock! See
https://github.com/openai/codex/pull/2876
2025-08-28 22:20:10 -07:00
|
|
|
|
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
|
2025-05-14 13:15:41 -07:00
|
|
|
|
|
|
|
|
|
|
// Task: read from stdin, push to `incoming_tx`.
|
|
|
|
|
|
let stdin_reader_handle = tokio::spawn({
|
|
|
|
|
|
async move {
|
|
|
|
|
|
let stdin = io::stdin();
|
|
|
|
|
|
let reader = BufReader::new(stdin);
|
|
|
|
|
|
let mut lines = reader.lines();
|
|
|
|
|
|
|
|
|
|
|
|
while let Some(line) = lines.next_line().await.unwrap_or_default() {
|
|
|
|
|
|
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
|
|
|
|
|
Ok(msg) => {
|
|
|
|
|
|
if incoming_tx.send(msg).await.is_err() {
|
|
|
|
|
|
// Receiver gone – nothing left to do.
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
debug!("stdin reader finished (EOF)");
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
|
2025-08-20 20:36:34 -07:00
|
|
|
|
// Parse CLI overrides once and derive the base Config eagerly so later
|
|
|
|
|
|
// components do not need to work with raw TOML values.
|
|
|
|
|
|
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
|
|
|
|
|
|
std::io::Error::new(
|
|
|
|
|
|
ErrorKind::InvalidInput,
|
|
|
|
|
|
format!("error parsing -c overrides: {e}"),
|
|
|
|
|
|
)
|
|
|
|
|
|
})?;
|
|
|
|
|
|
let config = Config::load_with_cli_overrides(cli_kv_overrides, ConfigOverrides::default())
|
|
|
|
|
|
.map_err(|e| {
|
|
|
|
|
|
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
|
|
|
|
|
|
})?;
|
|
|
|
|
|
|
2025-05-14 13:15:41 -07:00
|
|
|
|
// Task: process incoming messages.
|
|
|
|
|
|
let processor_handle = tokio::spawn({
|
chore: introduce OutgoingMessageSender (#1622)
Previous to this change, `MessageProcessor` had a
`tokio::sync::mpsc::Sender<JSONRPCMessage>` as an abstraction for server
code to send a message down to the MCP client. Because `Sender` is cheap
to `clone()`, it was straightforward to make it available to tasks
scheduled with `tokio::task::spawn()`.
This worked well when we were only sending notifications or responses
back down to the client, but we want to add support for sending
elicitations in #1623, which means that we need to be able to send
_requests_ to the client, and now we need a bit of centralization to
ensure all request ids are unique.
To that end, this PR introduces `OutgoingMessageSender`, which houses
the existing `Sender<OutgoingMessage>` as well as an `AtomicI64` to mint
out new, unique request ids. It has methods like `send_request()` and
`send_response()` so that callers do not have to deal with
`JSONRPCMessage` directly, as having to set the `jsonrpc` for each
message was a bit tedious (this cleans up `codex_tool_runner.rs` quite a
bit).
We do not have `OutgoingMessageSender` implement `Clone` because it is
important that the `AtomicI64` is shared across all users of
`OutgoingMessageSender`. As such, `Arc<OutgoingMessageSender>` must be
used instead, as it is frequently shared with new tokio tasks.
As part of this change, we update `message_processor.rs` to embrace
`await`, though we must be careful that no individual handler blocks the
main loop and prevents other messages from being handled.
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/1622).
* #1623
* __->__ #1622
* #1621
* #1620
2025-07-19 00:30:56 -04:00
|
|
|
|
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
2025-08-20 20:36:34 -07:00
|
|
|
|
let mut processor = MessageProcessor::new(
|
|
|
|
|
|
outgoing_message_sender,
|
|
|
|
|
|
codex_linux_sandbox_exe,
|
|
|
|
|
|
std::sync::Arc::new(config),
|
|
|
|
|
|
);
|
2025-05-14 13:15:41 -07:00
|
|
|
|
async move {
|
|
|
|
|
|
while let Some(msg) = incoming_rx.recv().await {
|
|
|
|
|
|
match msg {
|
chore: introduce OutgoingMessageSender (#1622)
Previous to this change, `MessageProcessor` had a
`tokio::sync::mpsc::Sender<JSONRPCMessage>` as an abstraction for server
code to send a message down to the MCP client. Because `Sender` is cheap
to `clone()`, it was straightforward to make it available to tasks
scheduled with `tokio::task::spawn()`.
This worked well when we were only sending notifications or responses
back down to the client, but we want to add support for sending
elicitations in #1623, which means that we need to be able to send
_requests_ to the client, and now we need a bit of centralization to
ensure all request ids are unique.
To that end, this PR introduces `OutgoingMessageSender`, which houses
the existing `Sender<OutgoingMessage>` as well as an `AtomicI64` to mint
out new, unique request ids. It has methods like `send_request()` and
`send_response()` so that callers do not have to deal with
`JSONRPCMessage` directly, as having to set the `jsonrpc` for each
message was a bit tedious (this cleans up `codex_tool_runner.rs` quite a
bit).
We do not have `OutgoingMessageSender` implement `Clone` because it is
important that the `AtomicI64` is shared across all users of
`OutgoingMessageSender`. As such, `Arc<OutgoingMessageSender>` must be
used instead, as it is frequently shared with new tokio tasks.
As part of this change, we update `message_processor.rs` to embrace
`await`, though we must be careful that no individual handler blocks the
main loop and prevents other messages from being handled.
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/1622).
* #1623
* __->__ #1622
* #1621
* #1620
2025-07-19 00:30:56 -04:00
|
|
|
|
JSONRPCMessage::Request(r) => processor.process_request(r).await,
|
2025-07-19 01:32:03 -04:00
|
|
|
|
JSONRPCMessage::Response(r) => processor.process_response(r).await,
|
2025-07-22 13:33:49 -07:00
|
|
|
|
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
|
2025-05-14 13:15:41 -07:00
|
|
|
|
JSONRPCMessage::Error(e) => processor.process_error(e),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
info!("processor task exited (channel closed)");
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
// Task: write outgoing messages to stdout.
|
|
|
|
|
|
let stdout_writer_handle = tokio::spawn(async move {
|
|
|
|
|
|
let mut stdout = io::stdout();
|
chore: introduce OutgoingMessageSender (#1622)
Previous to this change, `MessageProcessor` had a
`tokio::sync::mpsc::Sender<JSONRPCMessage>` as an abstraction for server
code to send a message down to the MCP client. Because `Sender` is cheap
to `clone()`, it was straightforward to make it available to tasks
scheduled with `tokio::task::spawn()`.
This worked well when we were only sending notifications or responses
back down to the client, but we want to add support for sending
elicitations in #1623, which means that we need to be able to send
_requests_ to the client, and now we need a bit of centralization to
ensure all request ids are unique.
To that end, this PR introduces `OutgoingMessageSender`, which houses
the existing `Sender<OutgoingMessage>` as well as an `AtomicI64` to mint
out new, unique request ids. It has methods like `send_request()` and
`send_response()` so that callers do not have to deal with
`JSONRPCMessage` directly, as having to set the `jsonrpc` for each
message was a bit tedious (this cleans up `codex_tool_runner.rs` quite a
bit).
We do not have `OutgoingMessageSender` implement `Clone` because it is
important that the `AtomicI64` is shared across all users of
`OutgoingMessageSender`. As such, `Arc<OutgoingMessageSender>` must be
used instead, as it is frequently shared with new tokio tasks.
As part of this change, we update `message_processor.rs` to embrace
`await`, though we must be careful that no individual handler blocks the
main loop and prevents other messages from being handled.
---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/1622).
* #1623
* __->__ #1622
* #1621
* #1620
2025-07-19 00:30:56 -04:00
|
|
|
|
while let Some(outgoing_message) = outgoing_rx.recv().await {
|
|
|
|
|
|
let msg: JSONRPCMessage = outgoing_message.into();
|
2025-05-14 13:15:41 -07:00
|
|
|
|
match serde_json::to_string(&msg) {
|
|
|
|
|
|
Ok(json) => {
|
|
|
|
|
|
if let Err(e) = stdout.write_all(json.as_bytes()).await {
|
|
|
|
|
|
error!("Failed to write to stdout: {e}");
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
if let Err(e) = stdout.write_all(b"\n").await {
|
|
|
|
|
|
error!("Failed to write newline to stdout: {e}");
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
info!("stdout writer exited (channel closed)");
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
// Wait for all tasks to finish. The typical exit path is the stdin reader
|
|
|
|
|
|
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
|
|
|
|
|
|
// the processor and then to the stdout task.
|
|
|
|
|
|
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
|
|
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
|
}
|