fix: add support for exec and apply_patch approvals in the new wire format (#2286)
Now when `CodexMessageProcessor` receives either a `EventMsg::ApplyPatchApprovalRequest` or a `EventMsg::ExecApprovalRequest`, it sends the appropriate request from the server to the client. When it gets a response, it forwards it on to the `CodexConversation`. Note this takes a lot of code from: https://github.com/openai/codex/blob/main/codex-rs/mcp-server/src/conversation_loop.rs https://github.com/openai/codex/blob/main/codex-rs/mcp-server/src/exec_approval.rs https://github.com/openai/codex/blob/main/codex-rs/mcp-server/src/patch_approval.rs I am copy/pasting for now because I am trying to consolidate around the new `wire_format.rs`, so I plan to delete these other files soon. Now that we have requests going both from client-to-server and server-to-client, I renamed `CodexRequest` to `ClientRequest`. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2286). * #2287 * __->__ #2286 * #2285
This commit is contained in:
@@ -2,13 +2,20 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_core::CodexConversation;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ExecApprovalRequestEvent;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use mcp_types::JSONRPCErrorError;
|
||||
use mcp_types::RequestId;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
@@ -16,10 +23,16 @@ use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::json_to_toml::json_to_toml;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::outgoing_message::OutgoingNotification;
|
||||
use crate::wire_format::APPLY_PATCH_APPROVAL_METHOD;
|
||||
use crate::wire_format::AddConversationListenerParams;
|
||||
use crate::wire_format::AddConversationSubscriptionResponse;
|
||||
use crate::wire_format::CodexRequest;
|
||||
use crate::wire_format::ApplyPatchApprovalParams;
|
||||
use crate::wire_format::ApplyPatchApprovalResponse;
|
||||
use crate::wire_format::ClientRequest;
|
||||
use crate::wire_format::ConversationId;
|
||||
use crate::wire_format::EXEC_COMMAND_APPROVAL_METHOD;
|
||||
use crate::wire_format::ExecCommandApprovalParams;
|
||||
use crate::wire_format::ExecCommandApprovalResponse;
|
||||
use crate::wire_format::InputItem as WireInputItem;
|
||||
use crate::wire_format::NewConversationParams;
|
||||
use crate::wire_format::NewConversationResponse;
|
||||
@@ -52,21 +65,21 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_request(&mut self, request: CodexRequest) {
|
||||
pub async fn process_request(&mut self, request: ClientRequest) {
|
||||
match request {
|
||||
CodexRequest::NewConversation { request_id, params } => {
|
||||
ClientRequest::NewConversation { request_id, params } => {
|
||||
// Do not tokio::spawn() to process new_conversation()
|
||||
// asynchronously because we need to ensure the conversation is
|
||||
// created before processing any subsequent messages.
|
||||
self.process_new_conversation(request_id, params).await;
|
||||
}
|
||||
CodexRequest::SendUserMessage { request_id, params } => {
|
||||
ClientRequest::SendUserMessage { request_id, params } => {
|
||||
self.send_user_message(request_id, params).await;
|
||||
}
|
||||
CodexRequest::AddConversationListener { request_id, params } => {
|
||||
ClientRequest::AddConversationListener { request_id, params } => {
|
||||
self.add_conversation_listener(request_id, params).await;
|
||||
}
|
||||
CodexRequest::RemoveConversationListener { request_id, params } => {
|
||||
ClientRequest::RemoveConversationListener { request_id, params } => {
|
||||
self.remove_conversation_listener(request_id, params).await;
|
||||
}
|
||||
}
|
||||
@@ -192,8 +205,12 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
// For now, we send a notification for every event,
|
||||
// JSON-serializing the `Event` as-is, but we will move
|
||||
// to creating a special enum for notifications with a
|
||||
// stable wire format.
|
||||
let method = format!("codex/event/{}", event.msg);
|
||||
let mut params = match serde_json::to_value(event) {
|
||||
let mut params = match serde_json::to_value(event.clone()) {
|
||||
Ok(serde_json::Value::Object(map)) => map,
|
||||
Ok(_) => {
|
||||
tracing::error!("event did not serialize to an object");
|
||||
@@ -211,6 +228,8 @@ impl CodexMessageProcessor {
|
||||
params: Some(params.into()),
|
||||
})
|
||||
.await;
|
||||
|
||||
apply_bespoke_event_handling(event, conversation_id, conversation.clone(), outgoing_for_task.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -244,6 +263,61 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_bespoke_event_handling(
|
||||
event: Event,
|
||||
conversation_id: ConversationId,
|
||||
conversation: Arc<CodexConversation>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
) {
|
||||
let Event { id: event_id, msg } = event;
|
||||
match msg {
|
||||
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id: _,
|
||||
changes,
|
||||
reason,
|
||||
grant_root,
|
||||
}) => {
|
||||
let params = ApplyPatchApprovalParams {
|
||||
conversation_id,
|
||||
file_changes: changes,
|
||||
reason,
|
||||
grant_root,
|
||||
};
|
||||
let value = serde_json::to_value(¶ms).unwrap_or_default();
|
||||
let rx = outgoing
|
||||
.send_request(APPLY_PATCH_APPROVAL_METHOD, Some(value))
|
||||
.await;
|
||||
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
|
||||
tokio::spawn(async move {
|
||||
on_patch_approval_response(event_id, rx, conversation).await;
|
||||
});
|
||||
}
|
||||
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
call_id: _,
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
}) => {
|
||||
let params = ExecCommandApprovalParams {
|
||||
conversation_id,
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
};
|
||||
let value = serde_json::to_value(¶ms).unwrap_or_default();
|
||||
let rx = outgoing
|
||||
.send_request(EXEC_COMMAND_APPROVAL_METHOD, Some(value))
|
||||
.await;
|
||||
|
||||
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
|
||||
tokio::spawn(async move {
|
||||
on_exec_approval_response(event_id, rx, conversation).await;
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn derive_config_from_params(
|
||||
params: NewConversationParams,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
@@ -280,3 +354,81 @@ fn derive_config_from_params(
|
||||
|
||||
Config::load_with_cli_overrides(cli_overrides, overrides)
|
||||
}
|
||||
|
||||
async fn on_patch_approval_response(
|
||||
event_id: String,
|
||||
receiver: tokio::sync::oneshot::Receiver<mcp_types::Result>,
|
||||
codex: Arc<CodexConversation>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let value = match response {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
if let Err(submit_err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event_id.clone(),
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("failed to submit denied PatchApproval after request failure: {submit_err}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let response =
|
||||
serde_json::from_value::<ApplyPatchApprovalResponse>(value).unwrap_or_else(|err| {
|
||||
error!("failed to deserialize ApplyPatchApprovalResponse: {err}");
|
||||
ApplyPatchApprovalResponse {
|
||||
decision: ReviewDecision::Denied,
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event_id,
|
||||
decision: response.decision,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("failed to submit PatchApproval: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_exec_approval_response(
|
||||
event_id: String,
|
||||
receiver: tokio::sync::oneshot::Receiver<mcp_types::Result>,
|
||||
conversation: Arc<CodexConversation>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let value = match response {
|
||||
Ok(value) => value,
|
||||
Err(err) => {
|
||||
tracing::error!("request failed: {err:?}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Try to deserialize `value` and then make the appropriate call to `codex`.
|
||||
let response =
|
||||
serde_json::from_value::<ExecCommandApprovalResponse>(value).unwrap_or_else(|err| {
|
||||
error!("failed to deserialize ExecCommandApprovalResponse: {err}");
|
||||
// If we cannot deserialize the response, we deny the request to be
|
||||
// conservative.
|
||||
ExecCommandApprovalResponse {
|
||||
decision: ReviewDecision::Denied,
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(err) = conversation
|
||||
.submit(Op::ExecApproval {
|
||||
id: event_id,
|
||||
decision: response.decision,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("failed to submit ExecApproval: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user