feat: add mcp subcommand to CLI to run Codex as an MCP server (#934)
Previously, running Codex as an MCP server required a standalone binary in our Cargo workspace, but this PR makes it available as a subcommand (`mcp`) of the main CLI. Ran this with: ``` RUST_LOG=debug npx @modelcontextprotocol/inspector cargo run --bin codex -- mcp ``` and verified it worked as expected in the inspector at `http://127.0.0.1:6274/`.
This commit is contained in:
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -491,6 +491,7 @@ dependencies = [
|
||||
"codex-common",
|
||||
"codex-core",
|
||||
"codex-exec",
|
||||
"codex-mcp-server",
|
||||
"codex-tui",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
|
||||
@@ -24,6 +24,7 @@ clap = { version = "4", features = ["derive"] }
|
||||
codex-core = { path = "../core" }
|
||||
codex-common = { path = "../common", features = ["cli"] }
|
||||
codex-exec = { path = "../exec" }
|
||||
codex-mcp-server = { path = "../mcp-server" }
|
||||
codex-tui = { path = "../tui" }
|
||||
serde_json = "1"
|
||||
tokio = { version = "1", features = [
|
||||
|
||||
@@ -33,6 +33,9 @@ enum Subcommand {
|
||||
#[clap(visible_alias = "e")]
|
||||
Exec(ExecCli),
|
||||
|
||||
/// Experimental: run Codex as an MCP server.
|
||||
Mcp,
|
||||
|
||||
/// Run the Protocol stream via stdin/stdout
|
||||
#[clap(visible_alias = "p")]
|
||||
Proto(ProtoCli),
|
||||
@@ -70,6 +73,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
Some(Subcommand::Exec(exec_cli)) => {
|
||||
codex_exec::run_main(exec_cli).await?;
|
||||
}
|
||||
Some(Subcommand::Mcp) => {
|
||||
codex_mcp_server::run_main().await?;
|
||||
}
|
||||
Some(Subcommand::Proto(proto_cli)) => {
|
||||
proto::run_main(proto_cli).await?;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,14 @@ name = "codex-mcp-server"
|
||||
version = { workspace = true }
|
||||
edition = "2024"
|
||||
|
||||
[[bin]]
|
||||
name = "codex-mcp-server"
|
||||
path = "src/main.rs"
|
||||
|
||||
[lib]
|
||||
name = "codex_mcp_server"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
|
||||
113
codex-rs/mcp-server/src/lib.rs
Normal file
113
codex-rs/mcp-server/src/lib.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
//! Prototype MCP server.
|
||||
#![deny(clippy::print_stdout, clippy::print_stderr)]
|
||||
|
||||
use std::io::Result as IoResult;
|
||||
|
||||
use mcp_types::JSONRPCMessage;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::{self};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
|
||||
mod codex_tool_config;
|
||||
mod codex_tool_runner;
|
||||
mod message_processor;
|
||||
|
||||
use crate::message_processor::MessageProcessor;
|
||||
|
||||
/// Size of the bounded channels used to communicate between tasks. The value
|
||||
/// is a balance between throughput and memory usage – 128 messages should be
|
||||
/// plenty for an interactive CLI.
|
||||
const CHANNEL_CAPACITY: usize = 128;
|
||||
|
||||
pub async fn run_main() -> IoResult<()> {
|
||||
// Install a simple subscriber so `tracing` output is visible. Users can
|
||||
// control the log level with `RUST_LOG`.
|
||||
tracing_subscriber::fmt()
|
||||
.with_writer(std::io::stderr)
|
||||
.init();
|
||||
|
||||
// Set up channels.
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
|
||||
// Task: read from stdin, push to `incoming_tx`.
|
||||
let stdin_reader_handle = tokio::spawn({
|
||||
let incoming_tx = incoming_tx.clone();
|
||||
async move {
|
||||
let stdin = io::stdin();
|
||||
let reader = BufReader::new(stdin);
|
||||
let mut lines = reader.lines();
|
||||
|
||||
while let Some(line) = lines.next_line().await.unwrap_or_default() {
|
||||
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
||||
Ok(msg) => {
|
||||
if incoming_tx.send(msg).await.is_err() {
|
||||
// Receiver gone – nothing left to do.
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
debug!("stdin reader finished (EOF)");
|
||||
}
|
||||
});
|
||||
|
||||
// Task: process incoming messages.
|
||||
let processor_handle = tokio::spawn({
|
||||
let mut processor = MessageProcessor::new(outgoing_tx.clone());
|
||||
async move {
|
||||
while let Some(msg) = incoming_rx.recv().await {
|
||||
match msg {
|
||||
JSONRPCMessage::Request(r) => processor.process_request(r),
|
||||
JSONRPCMessage::Response(r) => processor.process_response(r),
|
||||
JSONRPCMessage::Notification(n) => processor.process_notification(n),
|
||||
JSONRPCMessage::BatchRequest(b) => processor.process_batch_request(b),
|
||||
JSONRPCMessage::Error(e) => processor.process_error(e),
|
||||
JSONRPCMessage::BatchResponse(b) => processor.process_batch_response(b),
|
||||
}
|
||||
}
|
||||
|
||||
info!("processor task exited (channel closed)");
|
||||
}
|
||||
});
|
||||
|
||||
// Task: write outgoing messages to stdout.
|
||||
let stdout_writer_handle = tokio::spawn(async move {
|
||||
let mut stdout = io::stdout();
|
||||
while let Some(msg) = outgoing_rx.recv().await {
|
||||
match serde_json::to_string(&msg) {
|
||||
Ok(json) => {
|
||||
if let Err(e) = stdout.write_all(json.as_bytes()).await {
|
||||
error!("Failed to write to stdout: {e}");
|
||||
break;
|
||||
}
|
||||
if let Err(e) = stdout.write_all(b"\n").await {
|
||||
error!("Failed to write newline to stdout: {e}");
|
||||
break;
|
||||
}
|
||||
if let Err(e) = stdout.flush().await {
|
||||
error!("Failed to flush stdout: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
info!("stdout writer exited (channel closed)");
|
||||
});
|
||||
|
||||
// Wait for all tasks to finish. The typical exit path is the stdin reader
|
||||
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
|
||||
// the processor and then to the stdout task.
|
||||
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,114 +1,7 @@
|
||||
//! Prototype MCP server.
|
||||
#![deny(clippy::print_stdout, clippy::print_stderr)]
|
||||
|
||||
use std::io::Result as IoResult;
|
||||
|
||||
use mcp_types::JSONRPCMessage;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::{self};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
|
||||
mod codex_tool_config;
|
||||
mod codex_tool_runner;
|
||||
mod message_processor;
|
||||
|
||||
use crate::message_processor::MessageProcessor;
|
||||
|
||||
/// Size of the bounded channels used to communicate between tasks. The value
|
||||
/// is a balance between throughput and memory usage – 128 messages should be
|
||||
/// plenty for an interactive CLI.
|
||||
const CHANNEL_CAPACITY: usize = 128;
|
||||
use codex_mcp_server::run_main;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> IoResult<()> {
|
||||
// Install a simple subscriber so `tracing` output is visible. Users can
|
||||
// control the log level with `RUST_LOG`.
|
||||
tracing_subscriber::fmt()
|
||||
.with_writer(std::io::stderr)
|
||||
.init();
|
||||
|
||||
// Set up channels.
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
|
||||
// Task: read from stdin, push to `incoming_tx`.
|
||||
let stdin_reader_handle = tokio::spawn({
|
||||
let incoming_tx = incoming_tx.clone();
|
||||
async move {
|
||||
let stdin = io::stdin();
|
||||
let reader = BufReader::new(stdin);
|
||||
let mut lines = reader.lines();
|
||||
|
||||
while let Some(line) = lines.next_line().await.unwrap_or_default() {
|
||||
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
||||
Ok(msg) => {
|
||||
if incoming_tx.send(msg).await.is_err() {
|
||||
// Receiver gone – nothing left to do.
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
debug!("stdin reader finished (EOF)");
|
||||
}
|
||||
});
|
||||
|
||||
// Task: process incoming messages.
|
||||
let processor_handle = tokio::spawn({
|
||||
let mut processor = MessageProcessor::new(outgoing_tx.clone());
|
||||
async move {
|
||||
while let Some(msg) = incoming_rx.recv().await {
|
||||
match msg {
|
||||
JSONRPCMessage::Request(r) => processor.process_request(r),
|
||||
JSONRPCMessage::Response(r) => processor.process_response(r),
|
||||
JSONRPCMessage::Notification(n) => processor.process_notification(n),
|
||||
JSONRPCMessage::BatchRequest(b) => processor.process_batch_request(b),
|
||||
JSONRPCMessage::Error(e) => processor.process_error(e),
|
||||
JSONRPCMessage::BatchResponse(b) => processor.process_batch_response(b),
|
||||
}
|
||||
}
|
||||
|
||||
info!("processor task exited (channel closed)");
|
||||
}
|
||||
});
|
||||
|
||||
// Task: write outgoing messages to stdout.
|
||||
let stdout_writer_handle = tokio::spawn(async move {
|
||||
let mut stdout = io::stdout();
|
||||
while let Some(msg) = outgoing_rx.recv().await {
|
||||
match serde_json::to_string(&msg) {
|
||||
Ok(json) => {
|
||||
if let Err(e) = stdout.write_all(json.as_bytes()).await {
|
||||
error!("Failed to write to stdout: {e}");
|
||||
break;
|
||||
}
|
||||
if let Err(e) = stdout.write_all(b"\n").await {
|
||||
error!("Failed to write newline to stdout: {e}");
|
||||
break;
|
||||
}
|
||||
if let Err(e) = stdout.flush().await {
|
||||
error!("Failed to flush stdout: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
info!("stdout writer exited (channel closed)");
|
||||
});
|
||||
|
||||
// Wait for all tasks to finish. The typical exit path is the stdin reader
|
||||
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
|
||||
// the processor and then to the stdout task.
|
||||
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
|
||||
|
||||
async fn main() -> std::io::Result<()> {
|
||||
run_main().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,7 +7,9 @@ use strum_macros::EnumString;
|
||||
use strum_macros::IntoStaticStr;
|
||||
|
||||
/// Commands that can be invoked by starting a message with a leading slash.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, EnumIter, AsRefStr, IntoStaticStr)]
|
||||
#[derive(
|
||||
Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, EnumIter, AsRefStr, IntoStaticStr,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum SlashCommand {
|
||||
Clear,
|
||||
|
||||
Reference in New Issue
Block a user