Files
llmx/codex-rs/mcp-server/src/main.rs
Michael Bolin 2b72d05c5e feat: make Codex available as a tool when running it as an MCP server (#811)
This PR replaces the placeholder `"echo"` tool call in the MCP server
with a `"codex"` tool that calls Codex. Events such as
`ExecApprovalRequest` and `ApplyPatchApprovalRequest` are not handled
properly yet, but I have `approval_policy = "never"` set in my
`~/.codex/config.toml` such that those codepaths are not exercised.

The schema for this MPC tool is defined by a new `CodexToolCallParam`
struct introduced in this PR. It is fairly similar to `ConfigOverrides`,
as the param is used to help create the `Config` used to start the Codex
session, though it also includes the `prompt` used to kick off the
session.

This PR also introduces the use of the third-party `schemars` crate to
generate the JSON schema, which is verified in the
`verify_codex_tool_json_schema()` unit test.

Events that are dispatched during the Codex session are sent back to the
MCP client as MCP notifications. This gives the client a way to monitor
progress as the tool call itself may take minutes to complete depending
on the complexity of the task requested by the user.

In the video below, I launched the server via:

```shell
mcp-server$ RUST_LOG=debug npx @modelcontextprotocol/inspector cargo run --
```

In the video, you can see the flow of:

* requesting the list of tools
* choosing the **codex** tool
* entering a value for **prompt** and then making the tool call

Note that I left the other fields blank because when unspecified, the
values in my `~/.codex/config.toml` were used:


https://github.com/user-attachments/assets/1975058c-b004-43ef-8c8d-800a953b8192

Note that while using the inspector, I did run into
https://github.com/modelcontextprotocol/inspector/issues/293, though the
tip about ensuring I had only one instance of the **MCP Inspector** tab
open in my browser seemed to fix things.
2025-05-05 07:16:19 -07:00

115 lines
4.2 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Prototype MCP server.
#![deny(clippy::print_stdout, clippy::print_stderr)]
use std::io::Result as IoResult;
use mcp_types::JSONRPCMessage;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::sync::mpsc;
use tracing::debug;
use tracing::error;
use tracing::info;
mod codex_tool_config;
mod codex_tool_runner;
mod message_processor;
use crate::message_processor::MessageProcessor;
/// Size of the bounded channels used to communicate between tasks. The value
/// is a balance between throughput and memory usage 128 messages should be
/// plenty for an interactive CLI.
const CHANNEL_CAPACITY: usize = 128;
#[tokio::main]
async fn main() -> IoResult<()> {
// Install a simple subscriber so `tracing` output is visible. Users can
// control the log level with `RUST_LOG`.
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.init();
// Set up channels.
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
// Task: read from stdin, push to `incoming_tx`.
let stdin_reader_handle = tokio::spawn({
let incoming_tx = incoming_tx.clone();
async move {
let stdin = io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await.unwrap_or_default() {
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(msg) => {
if incoming_tx.send(msg).await.is_err() {
// Receiver gone nothing left to do.
break;
}
}
Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
}
}
debug!("stdin reader finished (EOF)");
}
});
// Task: process incoming messages.
let processor_handle = tokio::spawn({
let mut processor = MessageProcessor::new(outgoing_tx.clone());
async move {
while let Some(msg) = incoming_rx.recv().await {
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r),
JSONRPCMessage::Response(r) => processor.process_response(r),
JSONRPCMessage::Notification(n) => processor.process_notification(n),
JSONRPCMessage::BatchRequest(b) => processor.process_batch_request(b),
JSONRPCMessage::Error(e) => processor.process_error(e),
JSONRPCMessage::BatchResponse(b) => processor.process_batch_response(b),
}
}
info!("processor task exited (channel closed)");
}
});
// Task: write outgoing messages to stdout.
let stdout_writer_handle = tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some(msg) = outgoing_rx.recv().await {
match serde_json::to_string(&msg) {
Ok(json) => {
if let Err(e) = stdout.write_all(json.as_bytes()).await {
error!("Failed to write to stdout: {e}");
break;
}
if let Err(e) = stdout.write_all(b"\n").await {
error!("Failed to write newline to stdout: {e}");
break;
}
if let Err(e) = stdout.flush().await {
error!("Failed to flush stdout: {e}");
break;
}
}
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
}
}
info!("stdout writer exited (channel closed)");
});
// Wait for all tasks to finish. The typical exit path is the stdin reader
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
// the processor and then to the stdout task.
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
Ok(())
}