diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7d056adc..5cd5a679 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -561,15 +561,35 @@ async fn submission_loop( let writable_roots = Mutex::new(get_writable_roots(&cwd)); - let mcp_connection_manager = + // Error messages to dispatch after SessionConfigured is sent. + let mut mcp_connection_errors = Vec::::new(); + let (mcp_connection_manager, failed_clients) = match McpConnectionManager::new(config.mcp_servers.clone()).await { - Ok(mgr) => mgr, + Ok((mgr, failures)) => (mgr, failures), Err(e) => { - error!("Failed to create MCP connection manager: {e:#}"); - McpConnectionManager::default() + let message = format!("Failed to create MCP connection manager: {e:#}"); + error!("{message}"); + mcp_connection_errors.push(Event { + id: sub.id.clone(), + msg: EventMsg::Error { message }, + }); + (McpConnectionManager::default(), Default::default()) } }; + // Surface individual client start-up failures to the user. + if !failed_clients.is_empty() { + for (server_name, err) in failed_clients { + let message = + format!("MCP client for `{server_name}` failed to start: {err:#}"); + error!("{message}"); + mcp_connection_errors.push(Event { + id: sub.id.clone(), + msg: EventMsg::Error { message }, + }); + } + } + // Attempt to create a RolloutRecorder *before* moving the // `instructions` value into the Session struct. let rollout_recorder = match RolloutRecorder::new(instructions.clone()).await { @@ -596,12 +616,15 @@ async fn submission_loop( })); // ack - let event = Event { - id: sub.id, + let events = std::iter::once(Event { + id: sub.id.clone(), msg: EventMsg::SessionConfigured { model }, - }; - if tx_event.send(event).await.is_err() { - return; + }) + .chain(mcp_connection_errors.into_iter()); + for event in events { + if let Err(e) = tx_event.send(event).await { + error!("failed to send event: {e:?}"); + } } } Op::UserInput { items } => { diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index 734c3514..e4124b90 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -29,6 +29,10 @@ const MCP_TOOL_NAME_DELIMITER: &str = "__OAI_CODEX_MCP__"; /// Timeout for the `tools/list` request. const LIST_TOOLS_TIMEOUT: Duration = Duration::from_secs(10); +/// Map that holds a startup error for every MCP server that could **not** be +/// spawned successfully. +pub type ClientStartErrors = HashMap; + fn fully_qualified_tool_name(server: &str, tool: &str) -> String { format!("{server}{MCP_TOOL_NAME_DELIMITER}{tool}") } @@ -60,40 +64,49 @@ impl McpConnectionManager { /// * `mcp_servers` – Map loaded from the user configuration where *keys* /// are human-readable server identifiers and *values* are the spawn /// instructions. - pub async fn new(mcp_servers: HashMap) -> Result { + /// + /// Servers that fail to start are reported in `ClientStartErrors`: the + /// user should be informed about these errors. + pub async fn new( + mcp_servers: HashMap, + ) -> Result<(Self, ClientStartErrors)> { // Early exit if no servers are configured. if mcp_servers.is_empty() { - return Ok(Self::default()); + return Ok((Self::default(), ClientStartErrors::default())); } - // Spin up all servers concurrently. + // Launch all configured servers concurrently. let mut join_set = JoinSet::new(); - // Spawn tasks to launch each server. for (server_name, cfg) in mcp_servers { // TODO: Verify server name: require `^[a-zA-Z0-9_-]+$`? join_set.spawn(async move { let McpServerConfig { command, args, env } = cfg; let client_res = McpClient::new_stdio_client(command, args, env).await; - (server_name, client_res) }); } let mut clients: HashMap> = HashMap::with_capacity(join_set.len()); + let mut errors = ClientStartErrors::new(); + while let Some(res) = join_set.join_next().await { - let (server_name, client_res) = res?; + let (server_name, client_res) = res?; // JoinError propagation - let client = client_res - .with_context(|| format!("failed to spawn MCP server `{server_name}`"))?; - - clients.insert(server_name, std::sync::Arc::new(client)); + match client_res { + Ok(client) => { + clients.insert(server_name, std::sync::Arc::new(client)); + } + Err(e) => { + errors.insert(server_name, e.into()); + } + } } let tools = list_all_tools(&clients).await?; - Ok(Self { clients, tools }) + Ok((Self { clients, tools }, errors)) } /// Returns a single map that contains **all** tools. Each key is the