From 497c5396c05a698247b711e8b6f93898f0882e33 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Wed, 14 May 2025 13:15:41 -0700 Subject: [PATCH] feat: add mcp subcommand to CLI to run Codex as an MCP server (#934) Previously, running Codex as an MCP server required a standalone binary in our Cargo workspace, but this PR makes it available as a subcommand (`mcp`) of the main CLI. Ran this with: ``` RUST_LOG=debug npx @modelcontextprotocol/inspector cargo run --bin codex -- mcp ``` and verified it worked as expected in the inspector at `http://127.0.0.1:6274/`. --- codex-rs/Cargo.lock | 1 + codex-rs/cli/Cargo.toml | 1 + codex-rs/cli/src/main.rs | 6 ++ codex-rs/mcp-server/Cargo.toml | 8 +++ codex-rs/mcp-server/src/lib.rs | 113 ++++++++++++++++++++++++++++++ codex-rs/mcp-server/src/main.rs | 113 +----------------------------- codex-rs/tui/src/slash_command.rs | 4 +- 7 files changed, 135 insertions(+), 111 deletions(-) create mode 100644 codex-rs/mcp-server/src/lib.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index d4abcd3d..a4f64eaf 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -491,6 +491,7 @@ dependencies = [ "codex-common", "codex-core", "codex-exec", + "codex-mcp-server", "codex-tui", "serde_json", "tokio", diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index d10bf02d..f7ad70e9 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -24,6 +24,7 @@ clap = { version = "4", features = ["derive"] } codex-core = { path = "../core" } codex-common = { path = "../common", features = ["cli"] } codex-exec = { path = "../exec" } +codex-mcp-server = { path = "../mcp-server" } codex-tui = { path = "../tui" } serde_json = "1" tokio = { version = "1", features = [ diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 70d122fc..aa0691d8 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -33,6 +33,9 @@ enum Subcommand { #[clap(visible_alias = "e")] Exec(ExecCli), + /// Experimental: run Codex as an MCP server. + Mcp, + /// Run the Protocol stream via stdin/stdout #[clap(visible_alias = "p")] Proto(ProtoCli), @@ -70,6 +73,9 @@ async fn main() -> anyhow::Result<()> { Some(Subcommand::Exec(exec_cli)) => { codex_exec::run_main(exec_cli).await?; } + Some(Subcommand::Mcp) => { + codex_mcp_server::run_main().await?; + } Some(Subcommand::Proto(proto_cli)) => { proto::run_main(proto_cli).await?; } diff --git a/codex-rs/mcp-server/Cargo.toml b/codex-rs/mcp-server/Cargo.toml index aa5721e4..9b5153a5 100644 --- a/codex-rs/mcp-server/Cargo.toml +++ b/codex-rs/mcp-server/Cargo.toml @@ -3,6 +3,14 @@ name = "codex-mcp-server" version = { workspace = true } edition = "2024" +[[bin]] +name = "codex-mcp-server" +path = "src/main.rs" + +[lib] +name = "codex_mcp_server" +path = "src/lib.rs" + [lints] workspace = true diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs new file mode 100644 index 00000000..e621f779 --- /dev/null +++ b/codex-rs/mcp-server/src/lib.rs @@ -0,0 +1,113 @@ +//! Prototype MCP server. +#![deny(clippy::print_stdout, clippy::print_stderr)] + +use std::io::Result as IoResult; + +use mcp_types::JSONRPCMessage; +use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::BufReader; +use tokio::io::{self}; +use tokio::sync::mpsc; +use tracing::debug; +use tracing::error; +use tracing::info; + +mod codex_tool_config; +mod codex_tool_runner; +mod message_processor; + +use crate::message_processor::MessageProcessor; + +/// Size of the bounded channels used to communicate between tasks. The value +/// is a balance between throughput and memory usage – 128 messages should be +/// plenty for an interactive CLI. +const CHANNEL_CAPACITY: usize = 128; + +pub async fn run_main() -> IoResult<()> { + // Install a simple subscriber so `tracing` output is visible. Users can + // control the log level with `RUST_LOG`. + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .init(); + + // Set up channels. + let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + + // Task: read from stdin, push to `incoming_tx`. + let stdin_reader_handle = tokio::spawn({ + let incoming_tx = incoming_tx.clone(); + async move { + let stdin = io::stdin(); + let reader = BufReader::new(stdin); + let mut lines = reader.lines(); + + while let Some(line) = lines.next_line().await.unwrap_or_default() { + match serde_json::from_str::(&line) { + Ok(msg) => { + if incoming_tx.send(msg).await.is_err() { + // Receiver gone – nothing left to do. + break; + } + } + Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"), + } + } + + debug!("stdin reader finished (EOF)"); + } + }); + + // Task: process incoming messages. + let processor_handle = tokio::spawn({ + let mut processor = MessageProcessor::new(outgoing_tx.clone()); + async move { + while let Some(msg) = incoming_rx.recv().await { + match msg { + JSONRPCMessage::Request(r) => processor.process_request(r), + JSONRPCMessage::Response(r) => processor.process_response(r), + JSONRPCMessage::Notification(n) => processor.process_notification(n), + JSONRPCMessage::BatchRequest(b) => processor.process_batch_request(b), + JSONRPCMessage::Error(e) => processor.process_error(e), + JSONRPCMessage::BatchResponse(b) => processor.process_batch_response(b), + } + } + + info!("processor task exited (channel closed)"); + } + }); + + // Task: write outgoing messages to stdout. + let stdout_writer_handle = tokio::spawn(async move { + let mut stdout = io::stdout(); + while let Some(msg) = outgoing_rx.recv().await { + match serde_json::to_string(&msg) { + Ok(json) => { + if let Err(e) = stdout.write_all(json.as_bytes()).await { + error!("Failed to write to stdout: {e}"); + break; + } + if let Err(e) = stdout.write_all(b"\n").await { + error!("Failed to write newline to stdout: {e}"); + break; + } + if let Err(e) = stdout.flush().await { + error!("Failed to flush stdout: {e}"); + break; + } + } + Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"), + } + } + + info!("stdout writer exited (channel closed)"); + }); + + // Wait for all tasks to finish. The typical exit path is the stdin reader + // hitting EOF which, once it drops `incoming_tx`, propagates shutdown to + // the processor and then to the stdout task. + let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle); + + Ok(()) +} diff --git a/codex-rs/mcp-server/src/main.rs b/codex-rs/mcp-server/src/main.rs index 87e8d7bb..baef8587 100644 --- a/codex-rs/mcp-server/src/main.rs +++ b/codex-rs/mcp-server/src/main.rs @@ -1,114 +1,7 @@ -//! Prototype MCP server. -#![deny(clippy::print_stdout, clippy::print_stderr)] - -use std::io::Result as IoResult; - -use mcp_types::JSONRPCMessage; -use tokio::io::AsyncBufReadExt; -use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; -use tokio::io::{self}; -use tokio::sync::mpsc; -use tracing::debug; -use tracing::error; -use tracing::info; - -mod codex_tool_config; -mod codex_tool_runner; -mod message_processor; - -use crate::message_processor::MessageProcessor; - -/// Size of the bounded channels used to communicate between tasks. The value -/// is a balance between throughput and memory usage – 128 messages should be -/// plenty for an interactive CLI. -const CHANNEL_CAPACITY: usize = 128; +use codex_mcp_server::run_main; #[tokio::main] -async fn main() -> IoResult<()> { - // Install a simple subscriber so `tracing` output is visible. Users can - // control the log level with `RUST_LOG`. - tracing_subscriber::fmt() - .with_writer(std::io::stderr) - .init(); - - // Set up channels. - let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); - - // Task: read from stdin, push to `incoming_tx`. - let stdin_reader_handle = tokio::spawn({ - let incoming_tx = incoming_tx.clone(); - async move { - let stdin = io::stdin(); - let reader = BufReader::new(stdin); - let mut lines = reader.lines(); - - while let Some(line) = lines.next_line().await.unwrap_or_default() { - match serde_json::from_str::(&line) { - Ok(msg) => { - if incoming_tx.send(msg).await.is_err() { - // Receiver gone – nothing left to do. - break; - } - } - Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"), - } - } - - debug!("stdin reader finished (EOF)"); - } - }); - - // Task: process incoming messages. - let processor_handle = tokio::spawn({ - let mut processor = MessageProcessor::new(outgoing_tx.clone()); - async move { - while let Some(msg) = incoming_rx.recv().await { - match msg { - JSONRPCMessage::Request(r) => processor.process_request(r), - JSONRPCMessage::Response(r) => processor.process_response(r), - JSONRPCMessage::Notification(n) => processor.process_notification(n), - JSONRPCMessage::BatchRequest(b) => processor.process_batch_request(b), - JSONRPCMessage::Error(e) => processor.process_error(e), - JSONRPCMessage::BatchResponse(b) => processor.process_batch_response(b), - } - } - - info!("processor task exited (channel closed)"); - } - }); - - // Task: write outgoing messages to stdout. - let stdout_writer_handle = tokio::spawn(async move { - let mut stdout = io::stdout(); - while let Some(msg) = outgoing_rx.recv().await { - match serde_json::to_string(&msg) { - Ok(json) => { - if let Err(e) = stdout.write_all(json.as_bytes()).await { - error!("Failed to write to stdout: {e}"); - break; - } - if let Err(e) = stdout.write_all(b"\n").await { - error!("Failed to write newline to stdout: {e}"); - break; - } - if let Err(e) = stdout.flush().await { - error!("Failed to flush stdout: {e}"); - break; - } - } - Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"), - } - } - - info!("stdout writer exited (channel closed)"); - }); - - // Wait for all tasks to finish. The typical exit path is the stdin reader - // hitting EOF which, once it drops `incoming_tx`, propagates shutdown to - // the processor and then to the stdout task. - let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle); - +async fn main() -> std::io::Result<()> { + run_main().await?; Ok(()) } diff --git a/codex-rs/tui/src/slash_command.rs b/codex-rs/tui/src/slash_command.rs index d5befd9d..c56f2d94 100644 --- a/codex-rs/tui/src/slash_command.rs +++ b/codex-rs/tui/src/slash_command.rs @@ -7,7 +7,9 @@ use strum_macros::EnumString; use strum_macros::IntoStaticStr; /// Commands that can be invoked by starting a message with a leading slash. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, EnumIter, AsRefStr, IntoStaticStr)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, EnumIter, AsRefStr, IntoStaticStr, +)] #[strum(serialize_all = "kebab-case")] pub enum SlashCommand { Clear,