feat: introduce ClientRequest::SendUserTurn (#2345)
This adds a new request type, `SendUserTurn`, that makes it possible to submit a `Op::UserTurn` operation (introduced in #2329) to a conversation. This PR also adds a new integration test that verifies that changing from `AskForApproval::UnlessTrusted` to `AskForApproval::Never` mid-conversation ensures that an elicitation is no longer sent for running `python3 -c print(42)`. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2345). * __->__ #2345 * #2329 * #2343 * #2340 * #2338
This commit is contained in:
@@ -42,6 +42,8 @@ use crate::wire_format::RemoveConversationListenerParams;
|
||||
use crate::wire_format::RemoveConversationSubscriptionResponse;
|
||||
use crate::wire_format::SendUserMessageParams;
|
||||
use crate::wire_format::SendUserMessageResponse;
|
||||
use crate::wire_format::SendUserTurnParams;
|
||||
use crate::wire_format::SendUserTurnResponse;
|
||||
use codex_core::protocol::InputItem as CoreInputItem;
|
||||
use codex_core::protocol::Op;
|
||||
|
||||
@@ -78,6 +80,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::SendUserMessage { request_id, params } => {
|
||||
self.send_user_message(request_id, params).await;
|
||||
}
|
||||
ClientRequest::SendUserTurn { request_id, params } => {
|
||||
self.send_user_turn(request_id, params).await;
|
||||
}
|
||||
ClientRequest::InterruptConversation { request_id, params } => {
|
||||
self.interrupt_conversation(request_id, params).await;
|
||||
}
|
||||
@@ -169,6 +174,58 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn send_user_turn(&self, request_id: RequestId, params: SendUserTurnParams) {
|
||||
let SendUserTurnParams {
|
||||
conversation_id,
|
||||
items,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
} = params;
|
||||
|
||||
let Ok(conversation) = self
|
||||
.conversation_manager
|
||||
.get_conversation(conversation_id.0)
|
||||
.await
|
||||
else {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("conversation not found: {conversation_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
};
|
||||
|
||||
let mapped_items: Vec<CoreInputItem> = items
|
||||
.into_iter()
|
||||
.map(|item| match item {
|
||||
WireInputItem::Text { text } => CoreInputItem::Text { text },
|
||||
WireInputItem::Image { image_url } => CoreInputItem::Image { image_url },
|
||||
WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path },
|
||||
})
|
||||
.collect();
|
||||
|
||||
let _ = conversation
|
||||
.submit(Op::UserTurn {
|
||||
items: mapped_items,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
})
|
||||
.await;
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, SendUserTurnResponse {})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn interrupt_conversation(
|
||||
&mut self,
|
||||
request_id: RequestId,
|
||||
|
||||
@@ -2,8 +2,12 @@ use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_core::config_types::ReasoningEffort;
|
||||
use codex_core::config_types::ReasoningSummary;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::FileChange;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use mcp_types::RequestId;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -36,6 +40,11 @@ pub enum ClientRequest {
|
||||
request_id: RequestId,
|
||||
params: SendUserMessageParams,
|
||||
},
|
||||
SendUserTurn {
|
||||
#[serde(rename = "id")]
|
||||
request_id: RequestId,
|
||||
params: SendUserTurnParams,
|
||||
},
|
||||
InterruptConversation {
|
||||
#[serde(rename = "id")]
|
||||
request_id: RequestId,
|
||||
@@ -120,6 +129,23 @@ pub struct SendUserMessageParams {
|
||||
pub items: Vec<InputItem>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SendUserTurnParams {
|
||||
pub conversation_id: ConversationId,
|
||||
pub items: Vec<InputItem>,
|
||||
pub cwd: PathBuf,
|
||||
pub approval_policy: AskForApproval,
|
||||
pub sandbox_policy: SandboxPolicy,
|
||||
pub model: String,
|
||||
pub effort: ReasoningEffort,
|
||||
pub summary: ReasoningSummary,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SendUserTurnResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct InterruptConversationParams {
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
use std::path::Path;
|
||||
|
||||
use codex_core::config_types::ReasoningEffort;
|
||||
use codex_core::config_types::ReasoningSummary;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_mcp_server::wire_format::AddConversationListenerParams;
|
||||
use codex_mcp_server::wire_format::AddConversationSubscriptionResponse;
|
||||
use codex_mcp_server::wire_format::EXEC_COMMAND_APPROVAL_METHOD;
|
||||
use codex_mcp_server::wire_format::NewConversationParams;
|
||||
use codex_mcp_server::wire_format::NewConversationResponse;
|
||||
use codex_mcp_server::wire_format::RemoveConversationListenerParams;
|
||||
use codex_mcp_server::wire_format::RemoveConversationSubscriptionResponse;
|
||||
use codex_mcp_server::wire_format::SendUserMessageParams;
|
||||
use codex_mcp_server::wire_format::SendUserMessageResponse;
|
||||
use codex_mcp_server::wire_format::SendUserTurnParams;
|
||||
use codex_mcp_server::wire_format::SendUserTurnResponse;
|
||||
use mcp_test_support::McpProcess;
|
||||
use mcp_test_support::create_final_assistant_message_sse_response;
|
||||
use mcp_test_support::create_mock_chat_completions_server;
|
||||
@@ -167,6 +174,184 @@ fn to_response<T: DeserializeOwned>(response: JSONRPCResponse) -> anyhow::Result
|
||||
Ok(codex_response)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_send_user_turn_changes_approval_policy_behavior() {
|
||||
if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
|
||||
println!(
|
||||
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let tmp = TempDir::new().expect("tmp dir");
|
||||
let codex_home = tmp.path().join("codex_home");
|
||||
std::fs::create_dir(&codex_home).expect("create codex home dir");
|
||||
let working_directory = tmp.path().join("workdir");
|
||||
std::fs::create_dir(&working_directory).expect("create working directory");
|
||||
|
||||
// Mock server will request a python shell call for the first and second turn, then finish.
|
||||
let responses = vec![
|
||||
create_shell_sse_response(
|
||||
vec![
|
||||
"python3".to_string(),
|
||||
"-c".to_string(),
|
||||
"print(42)".to_string(),
|
||||
],
|
||||
Some(&working_directory),
|
||||
Some(5000),
|
||||
"call1",
|
||||
)
|
||||
.expect("create first shell sse response"),
|
||||
create_final_assistant_message_sse_response("done 1")
|
||||
.expect("create final assistant message 1"),
|
||||
create_shell_sse_response(
|
||||
vec![
|
||||
"python3".to_string(),
|
||||
"-c".to_string(),
|
||||
"print(42)".to_string(),
|
||||
],
|
||||
Some(&working_directory),
|
||||
Some(5000),
|
||||
"call2",
|
||||
)
|
||||
.expect("create second shell sse response"),
|
||||
create_final_assistant_message_sse_response("done 2")
|
||||
.expect("create final assistant message 2"),
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri()).expect("write config");
|
||||
|
||||
// Start MCP server and initialize.
|
||||
let mut mcp = McpProcess::new(&codex_home).await.expect("spawn mcp");
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
|
||||
.await
|
||||
.expect("init timeout")
|
||||
.expect("init error");
|
||||
|
||||
// 1) Start conversation with approval_policy=untrusted
|
||||
let new_conv_id = mcp
|
||||
.send_new_conversation_request(NewConversationParams {
|
||||
cwd: Some(working_directory.to_string_lossy().into_owned()),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("send newConversation");
|
||||
let new_conv_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
|
||||
)
|
||||
.await
|
||||
.expect("newConversation timeout")
|
||||
.expect("newConversation resp");
|
||||
let NewConversationResponse {
|
||||
conversation_id, ..
|
||||
} = to_response::<NewConversationResponse>(new_conv_resp)
|
||||
.expect("deserialize newConversation response");
|
||||
|
||||
// 2) addConversationListener
|
||||
let add_listener_id = mcp
|
||||
.send_add_conversation_listener_request(AddConversationListenerParams { conversation_id })
|
||||
.await
|
||||
.expect("send addConversationListener");
|
||||
let _: AddConversationSubscriptionResponse =
|
||||
to_response::<AddConversationSubscriptionResponse>(
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
|
||||
)
|
||||
.await
|
||||
.expect("addConversationListener timeout")
|
||||
.expect("addConversationListener resp"),
|
||||
)
|
||||
.expect("deserialize addConversationListener response");
|
||||
|
||||
// 3) sendUserMessage triggers a shell call; approval policy is Untrusted so we should get an elicitation
|
||||
let send_user_id = mcp
|
||||
.send_send_user_message_request(SendUserMessageParams {
|
||||
conversation_id,
|
||||
items: vec![codex_mcp_server::wire_format::InputItem::Text {
|
||||
text: "run python".to_string(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.expect("send sendUserMessage");
|
||||
let _send_user_resp: SendUserMessageResponse = to_response::<SendUserMessageResponse>(
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)),
|
||||
)
|
||||
.await
|
||||
.expect("sendUserMessage timeout")
|
||||
.expect("sendUserMessage resp"),
|
||||
)
|
||||
.expect("deserialize sendUserMessage response");
|
||||
|
||||
// Expect an ExecCommandApproval request (elicitation)
|
||||
let request = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_request_message(),
|
||||
)
|
||||
.await
|
||||
.expect("waiting for exec approval request timeout")
|
||||
.expect("exec approval request");
|
||||
assert_eq!(request.method, EXEC_COMMAND_APPROVAL_METHOD);
|
||||
|
||||
// Approve so the first turn can complete
|
||||
mcp.send_response(
|
||||
request.id,
|
||||
serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }),
|
||||
)
|
||||
.await
|
||||
.expect("send approval response");
|
||||
|
||||
// Wait for first TaskComplete
|
||||
let _ = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
||||
)
|
||||
.await
|
||||
.expect("task_complete 1 timeout")
|
||||
.expect("task_complete 1 notification");
|
||||
|
||||
// 4) sendUserTurn with approval_policy=never should run without elicitation
|
||||
let send_turn_id = mcp
|
||||
.send_send_user_turn_request(SendUserTurnParams {
|
||||
conversation_id,
|
||||
items: vec![codex_mcp_server::wire_format::InputItem::Text {
|
||||
text: "run python again".to_string(),
|
||||
}],
|
||||
cwd: working_directory.clone(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
model: "mock-model".to_string(),
|
||||
effort: ReasoningEffort::Medium,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await
|
||||
.expect("send sendUserTurn");
|
||||
// Acknowledge sendUserTurn
|
||||
let _send_turn_resp: SendUserTurnResponse = to_response::<SendUserTurnResponse>(
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id)),
|
||||
)
|
||||
.await
|
||||
.expect("sendUserTurn timeout")
|
||||
.expect("sendUserTurn resp"),
|
||||
)
|
||||
.expect("deserialize sendUserTurn response");
|
||||
|
||||
// Ensure we do NOT receive an ExecCommandApproval request before the task completes.
|
||||
// If any Request is seen while waiting for task_complete, the helper will error and the test fails.
|
||||
let _ = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
||||
)
|
||||
.await
|
||||
.expect("task_complete 2 timeout")
|
||||
.expect("task_complete 2 notification");
|
||||
}
|
||||
|
||||
// Helper: minimal config.toml pointing at mock provider.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
@@ -175,7 +360,7 @@ fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
approval_policy = "untrusted"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_mcp_server::wire_format::AddConversationListenerParams;
|
||||
use codex_mcp_server::wire_format::NewConversationParams;
|
||||
use codex_mcp_server::wire_format::RemoveConversationListenerParams;
|
||||
use codex_mcp_server::wire_format::SendUserMessageParams;
|
||||
use codex_mcp_server::wire_format::SendUserTurnParams;
|
||||
|
||||
use mcp_types::CallToolRequestParams;
|
||||
use mcp_types::ClientCapabilities;
|
||||
@@ -281,6 +282,15 @@ impl McpProcess {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send a `sendUserTurn` JSON-RPC request.
|
||||
pub async fn send_send_user_turn_request(
|
||||
&mut self,
|
||||
params: SendUserTurnParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("sendUserTurn", params).await
|
||||
}
|
||||
|
||||
async fn send_request(
|
||||
&mut self,
|
||||
method: &str,
|
||||
|
||||
Reference in New Issue
Block a user