fix: separate codex mcp into codex mcp-server and codex app-server (#4471)
This is a very large PR with some non-backwards-compatible changes. Historically, `codex mcp` (or `codex mcp serve`) started a JSON-RPC-ish server that had two overlapping responsibilities: - Running an MCP server, providing some basic tool calls. - Running the app server used to power experiences such as the VS Code extension. This PR aims to separate these into distinct concepts: - `codex mcp-server` for the MCP server - `codex app-server` for the "application server" Note `codex mcp` still exists because it already has its own subcommands for MCP management (`list`, `add`, etc.) The MCP logic continues to live in `codex-rs/mcp-server` whereas the refactored app server logic is in the new `codex-rs/app-server` folder. Note that most of the existing integration tests in `codex-rs/mcp-server/tests/suite` were actually for the app server, so all the tests have been moved with the exception of `codex-rs/mcp-server/tests/suite/mod.rs`. Because this is already a large diff, I tried not to change more than I had to, so `codex-rs/app-server/tests/common/mcp_process.rs` still uses the name `McpProcess` for now, but I will do some mechanical renamings to things like `AppServer` in subsequent PRs. While `mcp-server` and `app-server` share some overlapping functionality (like reading streams of JSONL and dispatching based on message types) and some differences (completely different message types), I ended up doing a bit of copypasta between the two crates, as both have somewhat similar `message_processor.rs` and `outgoing_message.rs` files for now, though I expect them to diverge more in the near future. One material change is that of the initialize handshake for `codex app-server`, as we no longer use the MCP types for that handshake. Instead, we update `codex-rs/protocol/src/mcp_protocol.rs` to add an `Initialize` variant to `ClientRequest`, which takes the `ClientInfo` object we need to update the `USER_AGENT_SUFFIX` in `codex-rs/app-server/src/message_processor.rs`. One other material change is in `codex-rs/app-server/src/codex_message_processor.rs` where I eliminated a use of the `send_event_as_notification()` method I am generally trying to deprecate (because it blindly maps an `EventMsg` into a `JSONNotification`) in favor of `send_server_notification()`, which takes a `ServerNotification`, as that is intended to be a custom enum of all notification types supported by the app server. So to make this update, I had to introduce a new variant of `ServerNotification`, `SessionConfigured`, which is a non-backwards compatible change with the old `codex mcp`, and clients will have to be updated after the next release that contains this PR. Note that `codex-rs/app-server/tests/suite/list_resume.rs` also had to be update to reflect this change. I introduced `codex-rs/utils/json-to-toml/src/lib.rs` as a small utility crate to avoid some of the copying between `mcp-server` and `app-server`.
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -2,6 +2,7 @@
|
||||
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_utils_json_to_toml::json_to_toml;
|
||||
use mcp_types::Tool;
|
||||
use mcp_types::ToolInputSchema;
|
||||
use schemars::JsonSchema;
|
||||
@@ -11,8 +12,6 @@ use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::json_to_toml::json_to_toml;
|
||||
|
||||
/// Client-supplied configuration for a `codex` tool-call.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
use std::num::NonZero;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use codex_file_search as file_search;
|
||||
use codex_protocol::mcp_protocol::FuzzyFileSearchResult;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::warn;
|
||||
|
||||
const LIMIT_PER_ROOT: usize = 50;
|
||||
const MAX_THREADS: usize = 12;
|
||||
const COMPUTE_INDICES: bool = true;
|
||||
|
||||
pub(crate) async fn run_fuzzy_file_search(
|
||||
query: String,
|
||||
roots: Vec<String>,
|
||||
cancellation_flag: Arc<AtomicBool>,
|
||||
) -> Vec<FuzzyFileSearchResult> {
|
||||
#[expect(clippy::expect_used)]
|
||||
let limit_per_root =
|
||||
NonZero::new(LIMIT_PER_ROOT).expect("LIMIT_PER_ROOT should be a valid non-zero usize");
|
||||
|
||||
let cores = std::thread::available_parallelism()
|
||||
.map(std::num::NonZero::get)
|
||||
.unwrap_or(1);
|
||||
let threads = cores.min(MAX_THREADS);
|
||||
let threads_per_root = (threads / roots.len()).max(1);
|
||||
let threads = NonZero::new(threads_per_root).unwrap_or(NonZeroUsize::MIN);
|
||||
|
||||
let mut files: Vec<FuzzyFileSearchResult> = Vec::new();
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
for root in roots {
|
||||
let search_dir = PathBuf::from(&root);
|
||||
let query = query.clone();
|
||||
let cancel_flag = cancellation_flag.clone();
|
||||
join_set.spawn_blocking(move || {
|
||||
match file_search::run(
|
||||
query.as_str(),
|
||||
limit_per_root,
|
||||
&search_dir,
|
||||
Vec::new(),
|
||||
threads,
|
||||
cancel_flag,
|
||||
COMPUTE_INDICES,
|
||||
) {
|
||||
Ok(res) => Ok((root, res)),
|
||||
Err(err) => Err((root, err)),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(res) = join_set.join_next().await {
|
||||
match res {
|
||||
Ok(Ok((root, res))) => {
|
||||
for m in res.matches {
|
||||
let result = FuzzyFileSearchResult {
|
||||
root: root.clone(),
|
||||
path: m.path,
|
||||
score: m.score,
|
||||
indices: m.indices,
|
||||
};
|
||||
files.push(result);
|
||||
}
|
||||
}
|
||||
Ok(Err((root, err))) => {
|
||||
warn!("fuzzy-file-search in dir '{root}' failed: {err}");
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("fuzzy-file-search join_next failed: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
|
||||
FuzzyFileSearchResult,
|
||||
_,
|
||||
_,
|
||||
>(|f| f.score, |f| f.path.as_str()));
|
||||
|
||||
files
|
||||
}
|
||||
@@ -1,83 +0,0 @@
|
||||
use serde_json::Value as JsonValue;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
/// Convert a `serde_json::Value` into a semantically equivalent `toml::Value`.
|
||||
pub(crate) fn json_to_toml(v: JsonValue) -> TomlValue {
|
||||
match v {
|
||||
JsonValue::Null => TomlValue::String(String::new()),
|
||||
JsonValue::Bool(b) => TomlValue::Boolean(b),
|
||||
JsonValue::Number(n) => {
|
||||
if let Some(i) = n.as_i64() {
|
||||
TomlValue::Integer(i)
|
||||
} else if let Some(f) = n.as_f64() {
|
||||
TomlValue::Float(f)
|
||||
} else {
|
||||
TomlValue::String(n.to_string())
|
||||
}
|
||||
}
|
||||
JsonValue::String(s) => TomlValue::String(s),
|
||||
JsonValue::Array(arr) => TomlValue::Array(arr.into_iter().map(json_to_toml).collect()),
|
||||
JsonValue::Object(map) => {
|
||||
let tbl = map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, json_to_toml(v)))
|
||||
.collect::<toml::value::Table>();
|
||||
TomlValue::Table(tbl)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn json_number_to_toml() {
|
||||
let json_value = json!(123);
|
||||
assert_eq!(TomlValue::Integer(123), json_to_toml(json_value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_array_to_toml() {
|
||||
let json_value = json!([true, 1]);
|
||||
assert_eq!(
|
||||
TomlValue::Array(vec![TomlValue::Boolean(true), TomlValue::Integer(1)]),
|
||||
json_to_toml(json_value)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_bool_to_toml() {
|
||||
let json_value = json!(false);
|
||||
assert_eq!(TomlValue::Boolean(false), json_to_toml(json_value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_float_to_toml() {
|
||||
let json_value = json!(1.25);
|
||||
assert_eq!(TomlValue::Float(1.25), json_to_toml(json_value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_null_to_toml() {
|
||||
let json_value = serde_json::Value::Null;
|
||||
assert_eq!(TomlValue::String(String::new()), json_to_toml(json_value));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_object_nested() {
|
||||
let json_value = json!({ "outer": { "inner": 2 } });
|
||||
let expected = {
|
||||
let mut inner = toml::value::Table::new();
|
||||
inner.insert("inner".into(), TomlValue::Integer(2));
|
||||
|
||||
let mut outer = toml::value::Table::new();
|
||||
outer.insert("outer".into(), TomlValue::Table(inner));
|
||||
TomlValue::Table(outer)
|
||||
};
|
||||
|
||||
assert_eq!(json_to_toml(json_value), expected);
|
||||
}
|
||||
}
|
||||
@@ -20,13 +20,10 @@ use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
mod codex_message_processor;
|
||||
mod codex_tool_config;
|
||||
mod codex_tool_runner;
|
||||
mod error_code;
|
||||
mod exec_approval;
|
||||
mod fuzzy_file_search;
|
||||
mod json_to_toml;
|
||||
pub(crate) mod message_processor;
|
||||
mod outgoing_message;
|
||||
mod patch_approval;
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::codex_message_processor::CodexMessageProcessor;
|
||||
use crate::codex_tool_config::CodexToolCallParam;
|
||||
use crate::codex_tool_config::CodexToolCallReplyParam;
|
||||
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
|
||||
use crate::codex_tool_config::create_tool_for_codex_tool_call_reply_param;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use codex_protocol::mcp_protocol::ClientRequest;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
|
||||
use codex_core::AuthManager;
|
||||
@@ -38,7 +36,6 @@ use tokio::sync::Mutex;
|
||||
use tokio::task;
|
||||
|
||||
pub(crate) struct MessageProcessor {
|
||||
codex_message_processor: CodexMessageProcessor,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
initialized: bool,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
@@ -56,16 +53,8 @@ impl MessageProcessor {
|
||||
) -> Self {
|
||||
let outgoing = Arc::new(outgoing);
|
||||
let auth_manager = AuthManager::shared(config.codex_home.clone());
|
||||
let conversation_manager = Arc::new(ConversationManager::new(auth_manager.clone()));
|
||||
let codex_message_processor = CodexMessageProcessor::new(
|
||||
auth_manager,
|
||||
conversation_manager.clone(),
|
||||
outgoing.clone(),
|
||||
codex_linux_sandbox_exe.clone(),
|
||||
config,
|
||||
);
|
||||
let conversation_manager = Arc::new(ConversationManager::new(auth_manager));
|
||||
Self {
|
||||
codex_message_processor,
|
||||
outgoing,
|
||||
initialized: false,
|
||||
codex_linux_sandbox_exe,
|
||||
@@ -75,17 +64,6 @@ impl MessageProcessor {
|
||||
}
|
||||
|
||||
pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
|
||||
if let Ok(request_json) = serde_json::to_value(request.clone())
|
||||
&& let Ok(codex_request) = serde_json::from_value::<ClientRequest>(request_json)
|
||||
{
|
||||
// If the request is a Codex request, handle it with the Codex
|
||||
// message processor.
|
||||
self.codex_message_processor
|
||||
.process_request(codex_request)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Hold on to the ID so we can respond.
|
||||
let request_id = request.id.clone();
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_core::protocol::Event;
|
||||
use codex_protocol::mcp_protocol::ServerNotification;
|
||||
use mcp_types::JSONRPC_VERSION;
|
||||
use mcp_types::JSONRPCError;
|
||||
use mcp_types::JSONRPCErrorError;
|
||||
@@ -125,12 +124,6 @@ impl OutgoingMessageSender {
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_server_notification(&self, notification: ServerNotification) {
|
||||
let _ = self
|
||||
.sender
|
||||
.send(OutgoingMessage::AppServerNotification(notification));
|
||||
}
|
||||
|
||||
pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
|
||||
let outgoing_message = OutgoingMessage::Notification(notification);
|
||||
let _ = self.sender.send(outgoing_message);
|
||||
@@ -146,9 +139,6 @@ impl OutgoingMessageSender {
|
||||
pub(crate) enum OutgoingMessage {
|
||||
Request(OutgoingRequest),
|
||||
Notification(OutgoingNotification),
|
||||
/// AppServerNotification is specific to the case where this is run as an
|
||||
/// "app server" as opposed to an MCP server.
|
||||
AppServerNotification(ServerNotification),
|
||||
Response(OutgoingResponse),
|
||||
Error(OutgoingError),
|
||||
}
|
||||
@@ -172,21 +162,6 @@ impl From<OutgoingMessage> for JSONRPCMessage {
|
||||
params,
|
||||
})
|
||||
}
|
||||
AppServerNotification(notification) => {
|
||||
let method = notification.to_string();
|
||||
let params = match notification.to_params() {
|
||||
Ok(params) => Some(params),
|
||||
Err(err) => {
|
||||
warn!("failed to serialize notification params: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
jsonrpc: JSONRPC_VERSION.into(),
|
||||
method,
|
||||
params,
|
||||
})
|
||||
}
|
||||
Response(OutgoingResponse { id, result }) => {
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
jsonrpc: JSONRPC_VERSION.into(),
|
||||
@@ -261,11 +236,9 @@ mod tests {
|
||||
use codex_core::protocol::SessionConfiguredEvent;
|
||||
use codex_protocol::config_types::ReasoningEffort;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::mcp_protocol::LoginChatGptCompleteNotification;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::NamedTempFile;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -357,29 +330,4 @@ mod tests {
|
||||
assert_eq!(params.unwrap(), expected_params);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_server_notification_serialization() {
|
||||
let notification =
|
||||
ServerNotification::LoginChatGptComplete(LoginChatGptCompleteNotification {
|
||||
login_id: Uuid::nil(),
|
||||
success: true,
|
||||
error: None,
|
||||
});
|
||||
|
||||
let jsonrpc_notification: JSONRPCMessage =
|
||||
OutgoingMessage::AppServerNotification(notification).into();
|
||||
assert_eq!(
|
||||
JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
jsonrpc: "2.0".into(),
|
||||
method: "loginChatGptComplete".into(),
|
||||
params: Some(json!({
|
||||
"loginId": Uuid::nil(),
|
||||
"success": true,
|
||||
})),
|
||||
}),
|
||||
jsonrpc_notification,
|
||||
"ensure the strum macros serialize the method field correctly"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user