From 539f4b290e2e960c4347c1e9ce4648c4578e1729 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Wed, 13 Aug 2025 23:00:50 -0700 Subject: [PATCH] fix: add support for exec and apply_patch approvals in the new wire format (#2286) Now when `CodexMessageProcessor` receives either a `EventMsg::ApplyPatchApprovalRequest` or a `EventMsg::ExecApprovalRequest`, it sends the appropriate request from the server to the client. When it gets a response, it forwards it on to the `CodexConversation`. Note this takes a lot of code from: https://github.com/openai/codex/blob/main/codex-rs/mcp-server/src/conversation_loop.rs https://github.com/openai/codex/blob/main/codex-rs/mcp-server/src/exec_approval.rs https://github.com/openai/codex/blob/main/codex-rs/mcp-server/src/patch_approval.rs I am copy/pasting for now because I am trying to consolidate around the new `wire_format.rs`, so I plan to delete these other files soon. Now that we have requests going both from client-to-server and server-to-client, I renamed `CodexRequest` to `ClientRequest`. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2286). * #2287 * __->__ #2286 * #2285 --- .../mcp-server/src/codex_message_processor.rs | 166 +++++++++++++++++- codex-rs/mcp-server/src/message_processor.rs | 34 ++-- codex-rs/mcp-server/src/wire_format.rs | 64 ++++++- 3 files changed, 237 insertions(+), 27 deletions(-) diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index 5e0e4cad..74b69fbc 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -2,13 +2,20 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use codex_core::CodexConversation; use codex_core::ConversationManager; use codex_core::NewConversation; use codex_core::config::Config; use codex_core::config::ConfigOverrides; +use codex_core::protocol::ApplyPatchApprovalRequestEvent; +use codex_core::protocol::Event; +use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecApprovalRequestEvent; +use codex_core::protocol::ReviewDecision; use mcp_types::JSONRPCErrorError; use mcp_types::RequestId; use tokio::sync::oneshot; +use tracing::error; use uuid::Uuid; use crate::error_code::INTERNAL_ERROR_CODE; @@ -16,10 +23,16 @@ use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::json_to_toml::json_to_toml; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::OutgoingNotification; +use crate::wire_format::APPLY_PATCH_APPROVAL_METHOD; use crate::wire_format::AddConversationListenerParams; use crate::wire_format::AddConversationSubscriptionResponse; -use crate::wire_format::CodexRequest; +use crate::wire_format::ApplyPatchApprovalParams; +use crate::wire_format::ApplyPatchApprovalResponse; +use crate::wire_format::ClientRequest; use crate::wire_format::ConversationId; +use crate::wire_format::EXEC_COMMAND_APPROVAL_METHOD; +use crate::wire_format::ExecCommandApprovalParams; +use crate::wire_format::ExecCommandApprovalResponse; use crate::wire_format::InputItem as WireInputItem; use crate::wire_format::NewConversationParams; use crate::wire_format::NewConversationResponse; @@ -52,21 +65,21 @@ impl CodexMessageProcessor { } } - pub async fn process_request(&mut self, request: CodexRequest) { + pub async fn process_request(&mut self, request: ClientRequest) { match request { - CodexRequest::NewConversation { request_id, params } => { + ClientRequest::NewConversation { request_id, params } => { // Do not tokio::spawn() to process new_conversation() // asynchronously because we need to ensure the conversation is // created before processing any subsequent messages. self.process_new_conversation(request_id, params).await; } - CodexRequest::SendUserMessage { request_id, params } => { + ClientRequest::SendUserMessage { request_id, params } => { self.send_user_message(request_id, params).await; } - CodexRequest::AddConversationListener { request_id, params } => { + ClientRequest::AddConversationListener { request_id, params } => { self.add_conversation_listener(request_id, params).await; } - CodexRequest::RemoveConversationListener { request_id, params } => { + ClientRequest::RemoveConversationListener { request_id, params } => { self.remove_conversation_listener(request_id, params).await; } } @@ -192,8 +205,12 @@ impl CodexMessageProcessor { } }; + // For now, we send a notification for every event, + // JSON-serializing the `Event` as-is, but we will move + // to creating a special enum for notifications with a + // stable wire format. let method = format!("codex/event/{}", event.msg); - let mut params = match serde_json::to_value(event) { + let mut params = match serde_json::to_value(event.clone()) { Ok(serde_json::Value::Object(map)) => map, Ok(_) => { tracing::error!("event did not serialize to an object"); @@ -211,6 +228,8 @@ impl CodexMessageProcessor { params: Some(params.into()), }) .await; + + apply_bespoke_event_handling(event, conversation_id, conversation.clone(), outgoing_for_task.clone()).await; } } } @@ -244,6 +263,61 @@ impl CodexMessageProcessor { } } +async fn apply_bespoke_event_handling( + event: Event, + conversation_id: ConversationId, + conversation: Arc, + outgoing: Arc, +) { + let Event { id: event_id, msg } = event; + match msg { + EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { + call_id: _, + changes, + reason, + grant_root, + }) => { + let params = ApplyPatchApprovalParams { + conversation_id, + file_changes: changes, + reason, + grant_root, + }; + let value = serde_json::to_value(¶ms).unwrap_or_default(); + let rx = outgoing + .send_request(APPLY_PATCH_APPROVAL_METHOD, Some(value)) + .await; + // TODO(mbolin): Enforce a timeout so this task does not live indefinitely? + tokio::spawn(async move { + on_patch_approval_response(event_id, rx, conversation).await; + }); + } + EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { + call_id: _, + command, + cwd, + reason, + }) => { + let params = ExecCommandApprovalParams { + conversation_id, + command, + cwd, + reason, + }; + let value = serde_json::to_value(¶ms).unwrap_or_default(); + let rx = outgoing + .send_request(EXEC_COMMAND_APPROVAL_METHOD, Some(value)) + .await; + + // TODO(mbolin): Enforce a timeout so this task does not live indefinitely? + tokio::spawn(async move { + on_exec_approval_response(event_id, rx, conversation).await; + }); + } + _ => {} + } +} + fn derive_config_from_params( params: NewConversationParams, codex_linux_sandbox_exe: Option, @@ -280,3 +354,81 @@ fn derive_config_from_params( Config::load_with_cli_overrides(cli_overrides, overrides) } + +async fn on_patch_approval_response( + event_id: String, + receiver: tokio::sync::oneshot::Receiver, + codex: Arc, +) { + let response = receiver.await; + let value = match response { + Ok(value) => value, + Err(err) => { + error!("request failed: {err:?}"); + if let Err(submit_err) = codex + .submit(Op::PatchApproval { + id: event_id.clone(), + decision: ReviewDecision::Denied, + }) + .await + { + error!("failed to submit denied PatchApproval after request failure: {submit_err}"); + } + return; + } + }; + + let response = + serde_json::from_value::(value).unwrap_or_else(|err| { + error!("failed to deserialize ApplyPatchApprovalResponse: {err}"); + ApplyPatchApprovalResponse { + decision: ReviewDecision::Denied, + } + }); + + if let Err(err) = codex + .submit(Op::PatchApproval { + id: event_id, + decision: response.decision, + }) + .await + { + error!("failed to submit PatchApproval: {err}"); + } +} + +async fn on_exec_approval_response( + event_id: String, + receiver: tokio::sync::oneshot::Receiver, + conversation: Arc, +) { + let response = receiver.await; + let value = match response { + Ok(value) => value, + Err(err) => { + tracing::error!("request failed: {err:?}"); + return; + } + }; + + // Try to deserialize `value` and then make the appropriate call to `codex`. + let response = + serde_json::from_value::(value).unwrap_or_else(|err| { + error!("failed to deserialize ExecCommandApprovalResponse: {err}"); + // If we cannot deserialize the response, we deny the request to be + // conservative. + ExecCommandApprovalResponse { + decision: ReviewDecision::Denied, + } + }); + + if let Err(err) = conversation + .submit(Op::ExecApproval { + id: event_id, + decision: response.decision, + }) + .await + { + error!("failed to submit ExecApproval: {err}"); + } +} diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index d043493c..763e51bf 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -15,7 +15,7 @@ use crate::mcp_protocol::ToolCallResponseResult; use crate::outgoing_message::OutgoingMessageSender; use crate::tool_handlers::create_conversation::handle_create_conversation; use crate::tool_handlers::send_message::handle_send_message; -use crate::wire_format::CodexRequest; +use crate::wire_format::ClientRequest; use codex_core::ConversationManager; use codex_core::config::Config as CodexConfig; @@ -23,7 +23,7 @@ use codex_core::protocol::Submission; use mcp_types::CallToolRequest; use mcp_types::CallToolRequestParams; use mcp_types::CallToolResult; -use mcp_types::ClientRequest; +use mcp_types::ClientRequest as McpClientRequest; use mcp_types::ContentBlock; use mcp_types::JSONRPCError; use mcp_types::JSONRPCErrorError; @@ -90,7 +90,7 @@ impl MessageProcessor { pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) { if let Ok(request_json) = serde_json::to_value(request.clone()) - && let Ok(codex_request) = serde_json::from_value::(request_json) + && let Ok(codex_request) = serde_json::from_value::(request_json) { // If the request is a Codex request, handle it with the Codex // message processor. @@ -103,7 +103,7 @@ impl MessageProcessor { // Hold on to the ID so we can respond. let request_id = request.id.clone(); - let client_request = match ClientRequest::try_from(request) { + let client_request = match McpClientRequest::try_from(request) { Ok(client_request) => client_request, Err(e) => { tracing::warn!("Failed to convert request: {e}"); @@ -113,43 +113,43 @@ impl MessageProcessor { // Dispatch to a dedicated handler for each request type. match client_request { - ClientRequest::InitializeRequest(params) => { + McpClientRequest::InitializeRequest(params) => { self.handle_initialize(request_id, params).await; } - ClientRequest::PingRequest(params) => { + McpClientRequest::PingRequest(params) => { self.handle_ping(request_id, params).await; } - ClientRequest::ListResourcesRequest(params) => { + McpClientRequest::ListResourcesRequest(params) => { self.handle_list_resources(params); } - ClientRequest::ListResourceTemplatesRequest(params) => { + McpClientRequest::ListResourceTemplatesRequest(params) => { self.handle_list_resource_templates(params); } - ClientRequest::ReadResourceRequest(params) => { + McpClientRequest::ReadResourceRequest(params) => { self.handle_read_resource(params); } - ClientRequest::SubscribeRequest(params) => { + McpClientRequest::SubscribeRequest(params) => { self.handle_subscribe(params); } - ClientRequest::UnsubscribeRequest(params) => { + McpClientRequest::UnsubscribeRequest(params) => { self.handle_unsubscribe(params); } - ClientRequest::ListPromptsRequest(params) => { + McpClientRequest::ListPromptsRequest(params) => { self.handle_list_prompts(params); } - ClientRequest::GetPromptRequest(params) => { + McpClientRequest::GetPromptRequest(params) => { self.handle_get_prompt(params); } - ClientRequest::ListToolsRequest(params) => { + McpClientRequest::ListToolsRequest(params) => { self.handle_list_tools(request_id, params).await; } - ClientRequest::CallToolRequest(params) => { + McpClientRequest::CallToolRequest(params) => { self.handle_call_tool(request_id, params).await; } - ClientRequest::SetLevelRequest(params) => { + McpClientRequest::SetLevelRequest(params) => { self.handle_set_level(params); } - ClientRequest::CompleteRequest(params) => { + McpClientRequest::CompleteRequest(params) => { self.handle_complete(params); } } diff --git a/codex-rs/mcp-server/src/wire_format.rs b/codex-rs/mcp-server/src/wire_format.rs index 61cd881b..5b223e60 100644 --- a/codex-rs/mcp-server/src/wire_format.rs +++ b/codex-rs/mcp-server/src/wire_format.rs @@ -1,6 +1,9 @@ use std::collections::HashMap; use std::fmt::Display; +use std::path::PathBuf; +use codex_core::protocol::FileChange; +use codex_core::protocol::ReviewDecision; use mcp_types::RequestId; use serde::Deserialize; use serde::Serialize; @@ -22,7 +25,7 @@ impl Display for ConversationId { /// Request from the client to the server. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(tag = "method", rename_all = "camelCase")] -pub enum CodexRequest { +pub enum ClientRequest { NewConversation { #[serde(rename = "id")] request_id: RequestId, @@ -139,10 +142,65 @@ pub enum InputItem { /// Local image path provided by the user. This will be converted to an /// `Image` variant (base64 data URL) during request serialization. LocalImage { - path: std::path::PathBuf, + path: PathBuf, }, } +// TODO(mbolin): Need test to ensure these constants match the enum variants. + +pub const APPLY_PATCH_APPROVAL_METHOD: &str = "applyPatchApproval"; +pub const EXEC_COMMAND_APPROVAL_METHOD: &str = "execCommandApproval"; + +/// Request initiated from the server and sent to the client. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(tag = "method", rename_all = "camelCase")] +pub enum ServerRequest { + /// Request to approve a patch. + ApplyPatchApproval { + #[serde(rename = "id")] + request_id: RequestId, + params: ApplyPatchApprovalParams, + }, + /// Request to exec a command. + ExecCommandApproval { + #[serde(rename = "id")] + request_id: RequestId, + params: ExecCommandApprovalParams, + }, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct ApplyPatchApprovalParams { + pub conversation_id: ConversationId, + pub file_changes: HashMap, + /// Optional explanatory reason (e.g. request for extra write access). + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, + /// When set, the agent is asking the user to allow writes under this root + /// for the remainder of the session (unclear if this is honored today). + #[serde(skip_serializing_if = "Option::is_none")] + pub grant_root: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct ExecCommandApprovalParams { + pub conversation_id: ConversationId, + pub command: Vec, + pub cwd: PathBuf, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct ExecCommandApprovalResponse { + pub decision: ReviewDecision, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct ApplyPatchApprovalResponse { + pub decision: ReviewDecision, +} + #[allow(clippy::unwrap_used)] #[cfg(test)] mod tests { @@ -152,7 +210,7 @@ mod tests { #[test] fn serialize_new_conversation() { - let request = CodexRequest::NewConversation { + let request = ClientRequest::NewConversation { request_id: RequestId::Integer(42), params: NewConversationParams { model: Some("gpt-5".to_string()),