From eda50d8372c9b6ddf40d676d4b792d915241e5fb Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Fri, 15 Aug 2025 10:05:58 -0700 Subject: [PATCH] feat: introduce ClientRequest::SendUserTurn (#2345) This adds a new request type, `SendUserTurn`, that makes it possible to submit a `Op::UserTurn` operation (introduced in #2329) to a conversation. This PR also adds a new integration test that verifies that changing from `AskForApproval::UnlessTrusted` to `AskForApproval::Never` mid-conversation ensures that an elicitation is no longer sent for running `python3 -c print(42)`. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2345). * __->__ #2345 * #2329 * #2343 * #2340 * #2338 --- .../mcp-server/src/codex_message_processor.rs | 57 ++++++ codex-rs/mcp-server/src/wire_format.rs | 26 +++ .../tests/codex_message_processor_flow.rs | 187 +++++++++++++++++- .../mcp-server/tests/common/mcp_process.rs | 10 + 4 files changed, 279 insertions(+), 1 deletion(-) diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index cdf24d92..d930c03b 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -42,6 +42,8 @@ use crate::wire_format::RemoveConversationListenerParams; use crate::wire_format::RemoveConversationSubscriptionResponse; use crate::wire_format::SendUserMessageParams; use crate::wire_format::SendUserMessageResponse; +use crate::wire_format::SendUserTurnParams; +use crate::wire_format::SendUserTurnResponse; use codex_core::protocol::InputItem as CoreInputItem; use codex_core::protocol::Op; @@ -78,6 +80,9 @@ impl CodexMessageProcessor { ClientRequest::SendUserMessage { request_id, params } => { self.send_user_message(request_id, params).await; } + ClientRequest::SendUserTurn { request_id, params } => { + self.send_user_turn(request_id, params).await; + } ClientRequest::InterruptConversation { request_id, params } => { self.interrupt_conversation(request_id, params).await; } @@ -169,6 +174,58 @@ impl CodexMessageProcessor { .await; } + async fn send_user_turn(&self, request_id: RequestId, params: SendUserTurnParams) { + let SendUserTurnParams { + conversation_id, + items, + cwd, + approval_policy, + sandbox_policy, + model, + effort, + summary, + } = params; + + let Ok(conversation) = self + .conversation_manager + .get_conversation(conversation_id.0) + .await + else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("conversation not found: {conversation_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; + + let mapped_items: Vec = items + .into_iter() + .map(|item| match item { + WireInputItem::Text { text } => CoreInputItem::Text { text }, + WireInputItem::Image { image_url } => CoreInputItem::Image { image_url }, + WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path }, + }) + .collect(); + + let _ = conversation + .submit(Op::UserTurn { + items: mapped_items, + cwd, + approval_policy, + sandbox_policy, + model, + effort, + summary, + }) + .await; + + self.outgoing + .send_response(request_id, SendUserTurnResponse {}) + .await; + } + async fn interrupt_conversation( &mut self, request_id: RequestId, diff --git a/codex-rs/mcp-server/src/wire_format.rs b/codex-rs/mcp-server/src/wire_format.rs index e2ba729e..68d9aeb9 100644 --- a/codex-rs/mcp-server/src/wire_format.rs +++ b/codex-rs/mcp-server/src/wire_format.rs @@ -2,8 +2,12 @@ use std::collections::HashMap; use std::fmt::Display; use std::path::PathBuf; +use codex_core::config_types::ReasoningEffort; +use codex_core::config_types::ReasoningSummary; +use codex_core::protocol::AskForApproval; use codex_core::protocol::FileChange; use codex_core::protocol::ReviewDecision; +use codex_core::protocol::SandboxPolicy; use mcp_types::RequestId; use serde::Deserialize; use serde::Serialize; @@ -36,6 +40,11 @@ pub enum ClientRequest { request_id: RequestId, params: SendUserMessageParams, }, + SendUserTurn { + #[serde(rename = "id")] + request_id: RequestId, + params: SendUserTurnParams, + }, InterruptConversation { #[serde(rename = "id")] request_id: RequestId, @@ -120,6 +129,23 @@ pub struct SendUserMessageParams { pub items: Vec, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SendUserTurnParams { + pub conversation_id: ConversationId, + pub items: Vec, + pub cwd: PathBuf, + pub approval_policy: AskForApproval, + pub sandbox_policy: SandboxPolicy, + pub model: String, + pub effort: ReasoningEffort, + pub summary: ReasoningSummary, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SendUserTurnResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(rename_all = "camelCase")] pub struct InterruptConversationParams { diff --git a/codex-rs/mcp-server/tests/codex_message_processor_flow.rs b/codex-rs/mcp-server/tests/codex_message_processor_flow.rs index 2cc55c6d..e0c7a832 100644 --- a/codex-rs/mcp-server/tests/codex_message_processor_flow.rs +++ b/codex-rs/mcp-server/tests/codex_message_processor_flow.rs @@ -1,14 +1,21 @@ use std::path::Path; +use codex_core::config_types::ReasoningEffort; +use codex_core::config_types::ReasoningSummary; +use codex_core::protocol::AskForApproval; +use codex_core::protocol::SandboxPolicy; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_mcp_server::wire_format::AddConversationListenerParams; use codex_mcp_server::wire_format::AddConversationSubscriptionResponse; +use codex_mcp_server::wire_format::EXEC_COMMAND_APPROVAL_METHOD; use codex_mcp_server::wire_format::NewConversationParams; use codex_mcp_server::wire_format::NewConversationResponse; use codex_mcp_server::wire_format::RemoveConversationListenerParams; use codex_mcp_server::wire_format::RemoveConversationSubscriptionResponse; use codex_mcp_server::wire_format::SendUserMessageParams; use codex_mcp_server::wire_format::SendUserMessageResponse; +use codex_mcp_server::wire_format::SendUserTurnParams; +use codex_mcp_server::wire_format::SendUserTurnResponse; use mcp_test_support::McpProcess; use mcp_test_support::create_final_assistant_message_sse_response; use mcp_test_support::create_mock_chat_completions_server; @@ -167,6 +174,184 @@ fn to_response(response: JSONRPCResponse) -> anyhow::Result Ok(codex_response) } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_send_user_turn_changes_approval_policy_behavior() { + if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + let tmp = TempDir::new().expect("tmp dir"); + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home).expect("create codex home dir"); + let working_directory = tmp.path().join("workdir"); + std::fs::create_dir(&working_directory).expect("create working directory"); + + // Mock server will request a python shell call for the first and second turn, then finish. + let responses = vec![ + create_shell_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + Some(&working_directory), + Some(5000), + "call1", + ) + .expect("create first shell sse response"), + create_final_assistant_message_sse_response("done 1") + .expect("create final assistant message 1"), + create_shell_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + Some(&working_directory), + Some(5000), + "call2", + ) + .expect("create second shell sse response"), + create_final_assistant_message_sse_response("done 2") + .expect("create final assistant message 2"), + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(&codex_home, &server.uri()).expect("write config"); + + // Start MCP server and initialize. + let mut mcp = McpProcess::new(&codex_home).await.expect("spawn mcp"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timeout") + .expect("init error"); + + // 1) Start conversation with approval_policy=untrusted + let new_conv_id = mcp + .send_new_conversation_request(NewConversationParams { + cwd: Some(working_directory.to_string_lossy().into_owned()), + ..Default::default() + }) + .await + .expect("send newConversation"); + let new_conv_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), + ) + .await + .expect("newConversation timeout") + .expect("newConversation resp"); + let NewConversationResponse { + conversation_id, .. + } = to_response::(new_conv_resp) + .expect("deserialize newConversation response"); + + // 2) addConversationListener + let add_listener_id = mcp + .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .await + .expect("send addConversationListener"); + let _: AddConversationSubscriptionResponse = + to_response::( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), + ) + .await + .expect("addConversationListener timeout") + .expect("addConversationListener resp"), + ) + .expect("deserialize addConversationListener response"); + + // 3) sendUserMessage triggers a shell call; approval policy is Untrusted so we should get an elicitation + let send_user_id = mcp + .send_send_user_message_request(SendUserMessageParams { + conversation_id, + items: vec![codex_mcp_server::wire_format::InputItem::Text { + text: "run python".to_string(), + }], + }) + .await + .expect("send sendUserMessage"); + let _send_user_resp: SendUserMessageResponse = to_response::( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)), + ) + .await + .expect("sendUserMessage timeout") + .expect("sendUserMessage resp"), + ) + .expect("deserialize sendUserMessage response"); + + // Expect an ExecCommandApproval request (elicitation) + let request = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await + .expect("waiting for exec approval request timeout") + .expect("exec approval request"); + assert_eq!(request.method, EXEC_COMMAND_APPROVAL_METHOD); + + // Approve so the first turn can complete + mcp.send_response( + request.id, + serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }), + ) + .await + .expect("send approval response"); + + // Wait for first TaskComplete + let _ = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await + .expect("task_complete 1 timeout") + .expect("task_complete 1 notification"); + + // 4) sendUserTurn with approval_policy=never should run without elicitation + let send_turn_id = mcp + .send_send_user_turn_request(SendUserTurnParams { + conversation_id, + items: vec![codex_mcp_server::wire_format::InputItem::Text { + text: "run python again".to_string(), + }], + cwd: working_directory.clone(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + model: "mock-model".to_string(), + effort: ReasoningEffort::Medium, + summary: ReasoningSummary::Auto, + }) + .await + .expect("send sendUserTurn"); + // Acknowledge sendUserTurn + let _send_turn_resp: SendUserTurnResponse = to_response::( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id)), + ) + .await + .expect("sendUserTurn timeout") + .expect("sendUserTurn resp"), + ) + .expect("deserialize sendUserTurn response"); + + // Ensure we do NOT receive an ExecCommandApproval request before the task completes. + // If any Request is seen while waiting for task_complete, the helper will error and the test fails. + let _ = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await + .expect("task_complete 2 timeout") + .expect("task_complete 2 notification"); +} + // Helper: minimal config.toml pointing at mock provider. fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); @@ -175,7 +360,7 @@ fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<() format!( r#" model = "mock-model" -approval_policy = "never" +approval_policy = "untrusted" model_provider = "mock_provider" diff --git a/codex-rs/mcp-server/tests/common/mcp_process.rs b/codex-rs/mcp-server/tests/common/mcp_process.rs index 35484264..dc783344 100644 --- a/codex-rs/mcp-server/tests/common/mcp_process.rs +++ b/codex-rs/mcp-server/tests/common/mcp_process.rs @@ -22,6 +22,7 @@ use codex_mcp_server::wire_format::AddConversationListenerParams; use codex_mcp_server::wire_format::NewConversationParams; use codex_mcp_server::wire_format::RemoveConversationListenerParams; use codex_mcp_server::wire_format::SendUserMessageParams; +use codex_mcp_server::wire_format::SendUserTurnParams; use mcp_types::CallToolRequestParams; use mcp_types::ClientCapabilities; @@ -281,6 +282,15 @@ impl McpProcess { .await } + /// Send a `sendUserTurn` JSON-RPC request. + pub async fn send_send_user_turn_request( + &mut self, + params: SendUserTurnParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("sendUserTurn", params).await + } + async fn send_request( &mut self, method: &str,