diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 29768df8..bdc96cad 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::path::Path; use std::process::Stdio; use std::sync::atomic::AtomicI64; @@ -48,7 +47,6 @@ pub struct McpProcess { process: Child, stdin: ChildStdin, stdout: BufReader, - pending_user_messages: VecDeque, } impl McpProcess { @@ -119,7 +117,6 @@ impl McpProcess { process, stdin, stdout, - pending_user_messages: VecDeque::new(), }) } @@ -378,9 +375,8 @@ impl McpProcess { let message = self.read_jsonrpc_message().await?; match message { - JSONRPCMessage::Notification(notification) => { - eprintln!("notification: {notification:?}"); - self.enqueue_user_message(notification); + JSONRPCMessage::Notification(_) => { + eprintln!("notification: {message:?}"); } JSONRPCMessage::Request(jsonrpc_request) => { return jsonrpc_request.try_into().with_context( @@ -406,9 +402,8 @@ impl McpProcess { loop { let message = self.read_jsonrpc_message().await?; match message { - JSONRPCMessage::Notification(notification) => { - eprintln!("notification: {notification:?}"); - self.enqueue_user_message(notification); + JSONRPCMessage::Notification(_) => { + eprintln!("notification: {message:?}"); } JSONRPCMessage::Request(_) => { anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}"); @@ -432,9 +427,8 @@ impl McpProcess { loop { let message = self.read_jsonrpc_message().await?; match message { - JSONRPCMessage::Notification(notification) => { - eprintln!("notification: {notification:?}"); - self.enqueue_user_message(notification); + JSONRPCMessage::Notification(_) => { + eprintln!("notification: {message:?}"); } JSONRPCMessage::Request(_) => { anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}"); @@ -457,10 +451,6 @@ impl McpProcess { ) -> anyhow::Result { eprintln!("in read_stream_until_notification_message({method})"); - if let Some(notification) = self.take_pending_notification_by_method(method) { - return Ok(notification); - } - loop { let message = self.read_jsonrpc_message().await?; match message { @@ -468,7 +458,6 @@ impl McpProcess { if notification.method == method { return Ok(notification); } - self.enqueue_user_message(notification); } JSONRPCMessage::Request(_) => { anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}"); @@ -482,21 +471,4 @@ impl McpProcess { } } } - - fn take_pending_notification_by_method(&mut self, method: &str) -> Option { - if let Some(pos) = self - .pending_user_messages - .iter() - .position(|notification| notification.method == method) - { - return self.pending_user_messages.remove(pos); - } - None - } - - fn enqueue_user_message(&mut self, notification: JSONRPCNotification) { - if notification.method == "codex/event/user_message" { - self.pending_user_messages.push_back(notification); - } - } } diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index 4dff2a15..f1f34f95 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -8,7 +8,6 @@ use app_test_support::to_response; use codex_app_server_protocol::AddConversationListenerParams; use codex_app_server_protocol::AddConversationSubscriptionResponse; use codex_app_server_protocol::ExecCommandApprovalParams; -use codex_app_server_protocol::InputItem; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::NewConversationParams; @@ -26,10 +25,6 @@ use codex_core::protocol::SandboxPolicy; use codex_core::protocol_config_types::ReasoningEffort; use codex_core::protocol_config_types::ReasoningSummary; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; -use codex_protocol::config_types::SandboxMode; -use codex_protocol::protocol::Event; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::InputMessageKind; use pretty_assertions::assert_eq; use std::env; use tempfile::TempDir; @@ -372,234 +367,6 @@ async fn test_send_user_turn_changes_approval_policy_behavior() { } // Helper: minimal config.toml pointing at mock provider. - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() { - if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { - println!( - "Skipping test because it cannot execute when network is disabled in a Codex sandbox." - ); - return; - } - - let tmp = TempDir::new().expect("tmp dir"); - let codex_home = tmp.path().join("codex_home"); - std::fs::create_dir(&codex_home).expect("create codex home dir"); - let workspace_root = tmp.path().join("workspace"); - std::fs::create_dir(&workspace_root).expect("create workspace root"); - let first_cwd = workspace_root.join("turn1"); - let second_cwd = workspace_root.join("turn2"); - std::fs::create_dir(&first_cwd).expect("create first cwd"); - std::fs::create_dir(&second_cwd).expect("create second cwd"); - - let responses = vec![ - create_shell_sse_response( - vec![ - "bash".to_string(), - "-lc".to_string(), - "echo first turn".to_string(), - ], - None, - Some(5000), - "call-first", - ) - .expect("create first shell response"), - create_final_assistant_message_sse_response("done first") - .expect("create first final assistant message"), - create_shell_sse_response( - vec![ - "bash".to_string(), - "-lc".to_string(), - "echo second turn".to_string(), - ], - None, - Some(5000), - "call-second", - ) - .expect("create second shell response"), - create_final_assistant_message_sse_response("done second") - .expect("create second final assistant message"), - ]; - let server = create_mock_chat_completions_server(responses).await; - create_config_toml(&codex_home, &server.uri()).expect("write config"); - - let mut mcp = McpProcess::new(&codex_home) - .await - .expect("spawn mcp process"); - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) - .await - .expect("init timeout") - .expect("init failed"); - - let new_conv_id = mcp - .send_new_conversation_request(NewConversationParams { - cwd: Some(first_cwd.to_string_lossy().into_owned()), - approval_policy: Some(AskForApproval::Never), - sandbox: Some(SandboxMode::WorkspaceWrite), - ..Default::default() - }) - .await - .expect("send newConversation"); - let new_conv_resp: JSONRPCResponse = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), - ) - .await - .expect("newConversation timeout") - .expect("newConversation resp"); - let NewConversationResponse { - conversation_id, - model, - .. - } = to_response::(new_conv_resp) - .expect("deserialize newConversation response"); - - let add_listener_id = mcp - .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) - .await - .expect("send addConversationListener"); - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), - ) - .await - .expect("addConversationListener timeout") - .expect("addConversationListener resp"); - - let first_turn_id = mcp - .send_send_user_turn_request(SendUserTurnParams { - conversation_id, - items: vec![InputItem::Text { - text: "first turn".to_string(), - }], - cwd: first_cwd.clone(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::WorkspaceWrite { - writable_roots: vec![first_cwd.clone()], - network_access: false, - exclude_tmpdir_env_var: false, - exclude_slash_tmp: false, - }, - model: model.clone(), - effort: Some(ReasoningEffort::Medium), - summary: ReasoningSummary::Auto, - }) - .await - .expect("send first sendUserTurn"); - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(first_turn_id)), - ) - .await - .expect("sendUserTurn 1 timeout") - .expect("sendUserTurn 1 resp"); - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), - ) - .await - .expect("task_complete 1 timeout") - .expect("task_complete 1 notification"); - - let second_turn_id = mcp - .send_send_user_turn_request(SendUserTurnParams { - conversation_id, - items: vec![InputItem::Text { - text: "second turn".to_string(), - }], - cwd: second_cwd.clone(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::DangerFullAccess, - model: model.clone(), - effort: Some(ReasoningEffort::Medium), - summary: ReasoningSummary::Auto, - }) - .await - .expect("send second sendUserTurn"); - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(second_turn_id)), - ) - .await - .expect("sendUserTurn 2 timeout") - .expect("sendUserTurn 2 resp"); - - let mut env_message: Option = None; - let second_cwd_str = second_cwd.to_string_lossy().into_owned(); - for _ in 0..10 { - let notification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/user_message"), - ) - .await - .expect("user_message timeout") - .expect("user_message notification"); - let params = notification - .params - .clone() - .expect("user_message should include params"); - let event: Event = serde_json::from_value(params).expect("deserialize user_message event"); - if let EventMsg::UserMessage(user) = event.msg - && matches!(user.kind, Some(InputMessageKind::EnvironmentContext)) - && user.message.contains(&second_cwd_str) - { - env_message = Some(user.message); - break; - } - } - let env_message = env_message.expect("expected environment context update"); - assert!( - env_message.contains("danger-full-access"), - "env context should reflect new sandbox mode: {env_message}" - ); - assert!( - env_message.contains("enabled"), - "env context should enable network access for danger-full-access policy: {env_message}" - ); - assert!( - env_message.contains(&second_cwd_str), - "env context should include updated cwd: {env_message}" - ); - - let exec_begin_notification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/exec_command_begin"), - ) - .await - .expect("exec_command_begin timeout") - .expect("exec_command_begin notification"); - let params = exec_begin_notification - .params - .clone() - .expect("exec_command_begin params"); - let event: Event = serde_json::from_value(params).expect("deserialize exec begin event"); - let exec_begin = match event.msg { - EventMsg::ExecCommandBegin(exec_begin) => exec_begin, - other => panic!("expected ExecCommandBegin event, got {other:?}"), - }; - assert_eq!( - exec_begin.cwd, second_cwd, - "exec turn should run from updated cwd" - ); - assert_eq!( - exec_begin.command, - vec![ - "bash".to_string(), - "-lc".to_string(), - "echo second turn".to_string() - ], - "exec turn should run expected command" - ); - - timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message("codex/event/task_complete"), - ) - .await - .expect("task_complete 2 timeout") - .expect("task_complete 2 notification"); -} - fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( diff --git a/codex-rs/core/src/apply_patch.rs b/codex-rs/core/src/apply_patch.rs index 5b6728ad..836b8596 100644 --- a/codex-rs/core/src/apply_patch.rs +++ b/codex-rs/core/src/apply_patch.rs @@ -27,7 +27,6 @@ pub(crate) enum InternalApplyPatchInvocation { DelegateToExec(ApplyPatchExec), } -#[derive(Debug)] pub(crate) struct ApplyPatchExec { pub(crate) action: ApplyPatchAction, pub(crate) user_explicitly_approved_this_action: bool, @@ -110,28 +109,3 @@ pub(crate) fn convert_apply_patch_to_protocol( } result } - -#[cfg(test)] -mod tests { - use super::*; - use pretty_assertions::assert_eq; - - use tempfile::tempdir; - - #[test] - fn convert_apply_patch_maps_add_variant() { - let tmp = tempdir().expect("tmp"); - let p = tmp.path().join("a.txt"); - // Create an action with a single Add change - let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); - - let got = convert_apply_patch_to_protocol(&action); - - assert_eq!( - got.get(&p), - Some(&FileChange::Add { - content: "hello".to_string() - }) - ); - } -} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 2bba2b42..4baa63ec 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1,9 +1,11 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Debug; +use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicU64; +use std::time::Duration; use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; @@ -43,6 +45,7 @@ use tracing::warn; use crate::ModelProviderInfo; use crate::apply_patch; use crate::apply_patch::ApplyPatchExec; +use crate::apply_patch::CODEX_APPLY_PATCH_ARG1; use crate::apply_patch::InternalApplyPatchInvocation; use crate::apply_patch::convert_apply_patch_to_protocol; use crate::client::ModelClient; @@ -55,21 +58,19 @@ use crate::environment_context::EnvironmentContext; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::error::SandboxErr; +use crate::error::get_error_message_ui; use crate::exec::ExecParams; use crate::exec::ExecToolCallOutput; +use crate::exec::SandboxType; use crate::exec::StdoutStream; -#[cfg(test)] use crate::exec::StreamOutput; +use crate::exec::process_exec_tool_call; use crate::exec_command::EXEC_COMMAND_TOOL_NAME; use crate::exec_command::ExecCommandParams; use crate::exec_command::ExecSessionManager; use crate::exec_command::WRITE_STDIN_TOOL_NAME; use crate::exec_command::WriteStdinParams; use crate::exec_env::create_env; -use crate::executor::ExecutionMode; -use crate::executor::Executor; -use crate::executor::ExecutorConfig; -use crate::executor::normalize_exec_result; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_tool_call::handle_mcp_tool_call; use crate::model_family::find_family_for_model; @@ -114,6 +115,9 @@ use crate::protocol::ViewImageToolCallEvent; use crate::protocol::WebSearchBeginEvent; use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; +use crate::safety::SafetyCheck; +use crate::safety::assess_command_safety; +use crate::safety::assess_safety_for_untrusted_command; use crate::shell; use crate::state::ActiveTurn; use crate::state::SessionServices; @@ -126,6 +130,7 @@ use crate::user_instructions::UserInstructions; use crate::user_notification::UserNotification; use crate::util::backoff; use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_event_manager::ToolDecisionSource; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::custom_prompts::CustomPrompt; @@ -490,13 +495,9 @@ impl Session { unified_exec_manager: UnifiedExecSessionManager::default(), notifier: notify, rollout: Mutex::new(Some(rollout_recorder)), + codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), user_shell: default_shell, show_raw_agent_reasoning: config.show_raw_agent_reasoning, - executor: Executor::new(ExecutorConfig::new( - turn_context.sandbox_policy.clone(), - turn_context.cwd.clone(), - config.codex_linux_sandbox_exe.clone(), - )), }; let sess = Arc::new(Session { @@ -581,11 +582,6 @@ impl Session { } } - /// Emit an exec approval request event and await the user's decision. - /// - /// The request is keyed by `sub_id`/`call_id` so matching responses are delivered - /// to the correct in-flight turn. If the task is aborted, this returns the - /// default `ReviewDecision` (`Denied`). pub async fn request_command_approval( &self, sub_id: String, @@ -683,6 +679,11 @@ impl Session { } } + pub async fn add_approved_command(&self, cmd: Vec) { + let mut state = self.state.lock().await; + state.add_approved_command(cmd); + } + /// Records input items: always append to conversation history and /// persist these response items to rollout. async fn record_conversation_items(&self, items: &[ResponseItem]) { @@ -840,7 +841,6 @@ impl Session { command_for_display, cwd, apply_patch, - .. } = exec_command_context; let msg = match apply_patch { Some(ApplyPatchCommandContext { @@ -937,29 +937,45 @@ impl Session { /// command even on error. /// /// Returns the output of the exec tool call. - async fn run_exec_with_events( + async fn run_exec_with_events<'a>( &self, turn_diff_tracker: &mut TurnDiffTracker, - prepared: PreparedExec, - approval_policy: AskForApproval, - ) -> Result { - let PreparedExec { context, request } = prepared; - let is_apply_patch = context.apply_patch.is_some(); - let sub_id = context.sub_id.clone(); - let call_id = context.call_id.clone(); + begin_ctx: ExecCommandContext, + exec_args: ExecInvokeArgs<'a>, + ) -> crate::error::Result { + let is_apply_patch = begin_ctx.apply_patch.is_some(); + let sub_id = begin_ctx.sub_id.clone(); + let call_id = begin_ctx.call_id.clone(); - self.on_exec_command_begin(turn_diff_tracker, context.clone()) + self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone()) .await; - let result = self - .services - .executor - .run(request, self, approval_policy, &context) - .await; - - let normalized = normalize_exec_result(&result); - let borrowed = normalized.event_output(); + let result = process_exec_tool_call( + exec_args.params, + exec_args.sandbox_type, + exec_args.sandbox_policy, + exec_args.sandbox_cwd, + exec_args.codex_linux_sandbox_exe, + exec_args.stdout_stream, + ) + .await; + let output_stderr; + let borrowed: &ExecToolCallOutput = match &result { + Ok(output) => output, + Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => output, + Err(e) => { + output_stderr = ExecToolCallOutput { + exit_code: -1, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(get_error_message_ui(e)), + aggregated_output: StreamOutput::new(get_error_message_ui(e)), + duration: Duration::default(), + timed_out: false, + }; + &output_stderr + } + }; self.on_exec_command_end( turn_diff_tracker, &sub_id, @@ -969,15 +985,13 @@ impl Session { ) .await; - drop(normalized); - result } /// Helper that emits a BackgroundEvent with the given message. This keeps /// the call‑sites terse so adding more diagnostics does not clutter the /// core agent logic. - pub(crate) async fn notify_background_event(&self, sub_id: &str, message: impl Into) { + async fn notify_background_event(&self, sub_id: &str, message: impl Into) { let event = Event { id: sub_id.to_string(), msg: EventMsg::BackgroundEvent(BackgroundEventEvent { @@ -1065,7 +1079,7 @@ impl Session { &self.services.notifier } - pub(crate) fn user_shell(&self) -> &shell::Shell { + fn user_shell(&self) -> &shell::Shell { &self.services.user_shell } @@ -1087,8 +1101,6 @@ pub(crate) struct ExecCommandContext { pub(crate) command_for_display: Vec, pub(crate) cwd: PathBuf, pub(crate) apply_patch: Option, - pub(crate) tool_name: String, - pub(crate) otel_event_manager: OtelEventManager, } #[derive(Clone, Debug)] @@ -1295,19 +1307,8 @@ async fn submission_loop( let previous_env_context = EnvironmentContext::from(turn_context.as_ref()); let new_env_context = EnvironmentContext::from(&fresh_turn_context); if !new_env_context.equals_except_shell(&previous_env_context) { - let env_response_item = ResponseItem::from(new_env_context); - sess.record_conversation_items(std::slice::from_ref(&env_response_item)) + sess.record_conversation_items(&[ResponseItem::from(new_env_context)]) .await; - for msg in map_response_item_to_event_messages( - &env_response_item, - sess.show_raw_agent_reasoning(), - ) { - let event = Event { - id: sub.id.clone(), - msg, - }; - sess.send_event(event).await; - } } // Install the new persistent context for subsequent tasks/turns. @@ -2626,6 +2627,33 @@ fn parse_container_exec_arguments( }) } +pub struct ExecInvokeArgs<'a> { + pub params: ExecParams, + pub sandbox_type: SandboxType, + pub sandbox_policy: &'a SandboxPolicy, + pub sandbox_cwd: &'a Path, + pub codex_linux_sandbox_exe: &'a Option, + pub stdout_stream: Option, +} + +fn maybe_translate_shell_command( + params: ExecParams, + sess: &Session, + turn_context: &TurnContext, +) -> ExecParams { + let should_translate = matches!(sess.user_shell(), crate::shell::Shell::PowerShell(_)) + || turn_context.shell_environment_policy.use_profile; + + if should_translate + && let Some(command) = sess + .user_shell() + .format_default_shell_invocation(params.command.clone()) + { + return ExecParams { command, ..params }; + } + params +} + async fn handle_container_exec_with_params( tool_name: &str, params: ExecParams, @@ -2671,10 +2699,152 @@ async fn handle_container_exec_with_params( MaybeApplyPatchVerified::NotApplyPatch => None, }; - let command_for_display = if let Some(exec) = apply_patch_exec.as_ref() { - vec!["apply_patch".to_string(), exec.action.patch.clone()] - } else { - params.command.clone() + let (params, safety, command_for_display) = match &apply_patch_exec { + Some(ApplyPatchExec { + action: ApplyPatchAction { patch, cwd, .. }, + user_explicitly_approved_this_action, + }) => { + let path_to_codex = std::env::current_exe() + .ok() + .map(|p| p.to_string_lossy().to_string()); + let Some(path_to_codex) = path_to_codex else { + return Err(FunctionCallError::RespondToModel( + "failed to determine path to codex executable".to_string(), + )); + }; + + let params = ExecParams { + command: vec![ + path_to_codex, + CODEX_APPLY_PATCH_ARG1.to_string(), + patch.clone(), + ], + cwd: cwd.clone(), + timeout_ms: params.timeout_ms, + env: HashMap::new(), + with_escalated_permissions: params.with_escalated_permissions, + justification: params.justification.clone(), + }; + let safety = if *user_explicitly_approved_this_action { + SafetyCheck::AutoApprove { + sandbox_type: SandboxType::None, + user_explicitly_approved: true, + } + } else { + assess_safety_for_untrusted_command( + turn_context.approval_policy, + &turn_context.sandbox_policy, + params.with_escalated_permissions.unwrap_or(false), + ) + }; + ( + params, + safety, + vec!["apply_patch".to_string(), patch.clone()], + ) + } + None => { + let safety = { + let state = sess.state.lock().await; + assess_command_safety( + ¶ms.command, + turn_context.approval_policy, + &turn_context.sandbox_policy, + state.approved_commands_ref(), + params.with_escalated_permissions.unwrap_or(false), + ) + }; + let command_for_display = params.command.clone(); + (params, safety, command_for_display) + } + }; + + let sandbox_type = match safety { + SafetyCheck::AutoApprove { + sandbox_type, + user_explicitly_approved, + } => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Approved, + if user_explicitly_approved { + ToolDecisionSource::User + } else { + ToolDecisionSource::Config + }, + ); + + sandbox_type + } + SafetyCheck::AskUser => { + let decision = sess + .request_command_approval( + sub_id.clone(), + call_id.clone(), + params.command.clone(), + params.cwd.clone(), + params.justification.clone(), + ) + .await; + match decision { + ReviewDecision::Approved => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Approved, + ToolDecisionSource::User, + ); + } + ReviewDecision::ApprovedForSession => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::ApprovedForSession, + ToolDecisionSource::User, + ); + sess.add_approved_command(params.command.clone()).await; + } + ReviewDecision::Denied => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Denied, + ToolDecisionSource::User, + ); + return Err(FunctionCallError::RespondToModel( + "exec command rejected by user".to_string(), + )); + } + ReviewDecision::Abort => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Abort, + ToolDecisionSource::User, + ); + return Err(FunctionCallError::RespondToModel( + "exec command aborted by user".to_string(), + )); + } + } + // No sandboxing is applied because the user has given + // explicit approval. Often, we end up in this case because + // the command cannot be run in a sandbox, such as + // installing a new dependency that requires network access. + SandboxType::None + } + SafetyCheck::Reject { reason } => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Denied, + ToolDecisionSource::Config, + ); + return Err(FunctionCallError::RespondToModel(format!( + "exec command rejected: {reason:?}" + ))); + } }; let exec_command_context = ExecCommandContext { @@ -2682,47 +2852,38 @@ async fn handle_container_exec_with_params( call_id: call_id.clone(), command_for_display: command_for_display.clone(), cwd: params.cwd.clone(), - apply_patch: apply_patch_exec.as_ref().map( + apply_patch: apply_patch_exec.map( |ApplyPatchExec { action, user_explicitly_approved_this_action, }| ApplyPatchCommandContext { - user_explicitly_approved_this_action: *user_explicitly_approved_this_action, - changes: convert_apply_patch_to_protocol(action), + user_explicitly_approved_this_action, + changes: convert_apply_patch_to_protocol(&action), }, ), - tool_name: tool_name.to_string(), - otel_event_manager, }; - let mode = match apply_patch_exec { - Some(exec) => ExecutionMode::ApplyPatch(exec), - None => ExecutionMode::Shell, - }; - - sess.services.executor.update_environment( - turn_context.sandbox_policy.clone(), - turn_context.cwd.clone(), - ); - - let prepared_exec = PreparedExec::new( - exec_command_context, - params, - command_for_display, - mode, - Some(StdoutStream { - sub_id: sub_id.clone(), - call_id: call_id.clone(), - tx_event: sess.tx_event.clone(), - }), - turn_context.shell_environment_policy.use_profile, - ); - + let params = maybe_translate_shell_command(params, sess, turn_context); let output_result = sess .run_exec_with_events( turn_diff_tracker, - prepared_exec, - turn_context.approval_policy, + exec_command_context.clone(), + ExecInvokeArgs { + params: params.clone(), + sandbox_type, + sandbox_policy: &turn_context.sandbox_policy, + sandbox_cwd: &turn_context.cwd, + codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe, + stdout_stream: if exec_command_context.apply_patch.is_some() { + None + } else { + Some(StdoutStream { + sub_id: sub_id.clone(), + call_id: call_id.clone(), + tx_event: sess.tx_event.clone(), + }) + }, + }, ) .await; @@ -2736,16 +2897,154 @@ async fn handle_container_exec_with_params( Err(FunctionCallError::RespondToModel(content)) } } - Err(ExecError::Function(err)) => Err(err), - Err(ExecError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { output }))) => Err( - FunctionCallError::RespondToModel(format_exec_output(&output)), - ), - Err(ExecError::Codex(err)) => Err(FunctionCallError::RespondToModel(format!( - "execution error: {err:?}" + Err(CodexErr::Sandbox(error)) => { + handle_sandbox_error( + tool_name, + turn_diff_tracker, + params, + exec_command_context, + error, + sandbox_type, + sess, + turn_context, + &otel_event_manager, + ) + .await + } + Err(e) => Err(FunctionCallError::RespondToModel(format!( + "execution error: {e:?}" ))), } } +#[allow(clippy::too_many_arguments)] +async fn handle_sandbox_error( + tool_name: &str, + turn_diff_tracker: &mut TurnDiffTracker, + params: ExecParams, + exec_command_context: ExecCommandContext, + error: SandboxErr, + sandbox_type: SandboxType, + sess: &Session, + turn_context: &TurnContext, + otel_event_manager: &OtelEventManager, +) -> Result { + let call_id = exec_command_context.call_id.clone(); + let sub_id = exec_command_context.sub_id.clone(); + let cwd = exec_command_context.cwd.clone(); + + if let SandboxErr::Timeout { output } = &error { + let content = format_exec_output(output); + return Err(FunctionCallError::RespondToModel(content)); + } + + // Early out if either the user never wants to be asked for approval, or + // we're letting the model manage escalation requests. Otherwise, continue + match turn_context.approval_policy { + AskForApproval::Never | AskForApproval::OnRequest => { + return Err(FunctionCallError::RespondToModel(format!( + "failed in sandbox {sandbox_type:?} with execution error: {error:?}" + ))); + } + AskForApproval::UnlessTrusted | AskForApproval::OnFailure => (), + } + + // Note that when `error` is `SandboxErr::Denied`, it could be a false + // positive. That is, it may have exited with a non-zero exit code, not + // because the sandbox denied it, but because that is its expected behavior, + // i.e., a grep command that did not match anything. Ideally we would + // include additional metadata on the command to indicate whether non-zero + // exit codes merit a retry. + + // For now, we categorically ask the user to retry without sandbox and + // emit the raw error as a background event. + sess.notify_background_event(&sub_id, format!("Execution failed: {error}")) + .await; + + let decision = sess + .request_command_approval( + sub_id.clone(), + call_id.clone(), + params.command.clone(), + cwd.clone(), + Some("command failed; retry without sandbox?".to_string()), + ) + .await; + + match decision { + ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { + // Persist this command as pre‑approved for the + // remainder of the session so future + // executions skip the sandbox directly. + // TODO(ragona): Isn't this a bug? It always saves the command in an | fork? + sess.add_approved_command(params.command.clone()).await; + // Inform UI we are retrying without sandbox. + sess.notify_background_event(&sub_id, "retrying command without sandbox") + .await; + + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + decision, + ToolDecisionSource::User, + ); + + // This is an escalated retry; the policy will not be + // examined and the sandbox has been set to `None`. + let retry_output_result = sess + .run_exec_with_events( + turn_diff_tracker, + exec_command_context.clone(), + ExecInvokeArgs { + params, + sandbox_type: SandboxType::None, + sandbox_policy: &turn_context.sandbox_policy, + sandbox_cwd: &turn_context.cwd, + codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe, + stdout_stream: if exec_command_context.apply_patch.is_some() { + None + } else { + Some(StdoutStream { + sub_id: sub_id.clone(), + call_id: call_id.clone(), + tx_event: sess.tx_event.clone(), + }) + }, + }, + ) + .await; + + match retry_output_result { + Ok(retry_output) => { + let ExecToolCallOutput { exit_code, .. } = &retry_output; + let content = format_exec_output(&retry_output); + if *exit_code == 0 { + Ok(content) + } else { + Err(FunctionCallError::RespondToModel(content)) + } + } + Err(e) => Err(FunctionCallError::RespondToModel(format!( + "retry failed: {e}" + ))), + } + } + decision @ (ReviewDecision::Denied | ReviewDecision::Abort) => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + decision, + ToolDecisionSource::User, + ); + + // Fall through to original failure handling. + Err(FunctionCallError::RespondToModel( + "exec command rejected by user".to_string(), + )) + } + } +} + fn format_exec_output_str(exec_output: &ExecToolCallOutput) -> String { let ExecToolCallOutput { aggregated_output, .. @@ -3004,8 +3303,6 @@ pub(crate) async fn exit_review_mode( .await; } -use crate::executor::errors::ExecError; -use crate::executor::linkers::PreparedExec; #[cfg(test)] pub(crate) use tests::make_session_and_context; @@ -3319,13 +3616,9 @@ mod tests { unified_exec_manager: UnifiedExecSessionManager::default(), notifier: UserNotifier::default(), rollout: Mutex::new(None), + codex_linux_sandbox_exe: None, user_shell: shell::Shell::Unknown, show_raw_agent_reasoning: config.show_raw_agent_reasoning, - executor: Executor::new(ExecutorConfig::new( - turn_context.sandbox_policy.clone(), - turn_context.cwd.clone(), - None, - )), }; let session = Session { conversation_id, @@ -3392,13 +3685,9 @@ mod tests { unified_exec_manager: UnifiedExecSessionManager::default(), notifier: UserNotifier::default(), rollout: Mutex::new(None), + codex_linux_sandbox_exe: None, user_shell: shell::Shell::Unknown, show_raw_agent_reasoning: config.show_raw_agent_reasoning, - executor: Executor::new(ExecutorConfig::new( - config.sandbox_policy.clone(), - config.cwd.clone(), - None, - )), }; let session = Arc::new(Session { conversation_id, diff --git a/codex-rs/core/src/executor/backends.rs b/codex-rs/core/src/executor/backends.rs deleted file mode 100644 index 95cdb3ca..00000000 --- a/codex-rs/core/src/executor/backends.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::collections::HashMap; -use std::env; - -use async_trait::async_trait; - -use crate::CODEX_APPLY_PATCH_ARG1; -use crate::apply_patch::ApplyPatchExec; -use crate::exec::ExecParams; -use crate::function_tool::FunctionCallError; - -pub(crate) enum ExecutionMode { - Shell, - ApplyPatch(ApplyPatchExec), -} - -#[async_trait] -/// Backend-specific hooks that prepare and post-process execution requests for a -/// given [`ExecutionMode`]. -pub(crate) trait ExecutionBackend: Send + Sync { - fn prepare( - &self, - params: ExecParams, - // Required for downcasting the apply_patch. - mode: &ExecutionMode, - ) -> Result; - - fn stream_stdout(&self, _mode: &ExecutionMode) -> bool { - true - } -} - -static SHELL_BACKEND: ShellBackend = ShellBackend; -static APPLY_PATCH_BACKEND: ApplyPatchBackend = ApplyPatchBackend; - -pub(crate) fn backend_for_mode(mode: &ExecutionMode) -> &'static dyn ExecutionBackend { - match mode { - ExecutionMode::Shell => &SHELL_BACKEND, - ExecutionMode::ApplyPatch(_) => &APPLY_PATCH_BACKEND, - } -} - -struct ShellBackend; - -#[async_trait] -impl ExecutionBackend for ShellBackend { - fn prepare( - &self, - params: ExecParams, - mode: &ExecutionMode, - ) -> Result { - match mode { - ExecutionMode::Shell => Ok(params), - _ => Err(FunctionCallError::RespondToModel( - "shell backend invoked with non-shell mode".to_string(), - )), - } - } -} - -struct ApplyPatchBackend; - -#[async_trait] -impl ExecutionBackend for ApplyPatchBackend { - fn prepare( - &self, - params: ExecParams, - mode: &ExecutionMode, - ) -> Result { - match mode { - ExecutionMode::ApplyPatch(exec) => { - let path_to_codex = env::current_exe() - .ok() - .map(|p| p.to_string_lossy().to_string()) - .ok_or_else(|| { - FunctionCallError::RespondToModel( - "failed to determine path to codex executable".to_string(), - ) - })?; - - let patch = exec.action.patch.clone(); - Ok(ExecParams { - command: vec![path_to_codex, CODEX_APPLY_PATCH_ARG1.to_string(), patch], - cwd: exec.action.cwd.clone(), - timeout_ms: params.timeout_ms, - // Run apply_patch with a minimal environment for determinism and to - // avoid leaking host environment variables into the patch process. - env: HashMap::new(), - with_escalated_permissions: params.with_escalated_permissions, - justification: params.justification, - }) - } - ExecutionMode::Shell => Err(FunctionCallError::RespondToModel( - "apply_patch backend invoked without patch context".to_string(), - )), - } - } - - fn stream_stdout(&self, _mode: &ExecutionMode) -> bool { - false - } -} diff --git a/codex-rs/core/src/executor/cache.rs b/codex-rs/core/src/executor/cache.rs deleted file mode 100644 index 737ecb92..00000000 --- a/codex-rs/core/src/executor/cache.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::collections::HashSet; -use std::sync::Arc; -use std::sync::Mutex; - -#[derive(Clone, Debug, Default)] -/// Thread-safe store of user approvals so repeated commands can reuse -/// previously granted trust. -pub(crate) struct ApprovalCache { - inner: Arc>>>, -} - -impl ApprovalCache { - pub(crate) fn insert(&self, command: Vec) { - if command.is_empty() { - return; - } - if let Ok(mut guard) = self.inner.lock() { - guard.insert(command); - } - } - - pub(crate) fn snapshot(&self) -> HashSet> { - self.inner.lock().map(|g| g.clone()).unwrap_or_default() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use pretty_assertions::assert_eq; - - #[test] - fn insert_ignores_empty_and_dedupes() { - let cache = ApprovalCache::default(); - - // Empty should be ignored - cache.insert(vec![]); - assert!(cache.snapshot().is_empty()); - - // Insert a command and verify snapshot contains it - let cmd = vec!["foo".to_string(), "bar".to_string()]; - cache.insert(cmd.clone()); - let snap1 = cache.snapshot(); - assert!(snap1.contains(&cmd)); - - // Reinserting should not create duplicates - cache.insert(cmd); - let snap2 = cache.snapshot(); - assert_eq!(snap1, snap2); - } -} diff --git a/codex-rs/core/src/executor/mod.rs b/codex-rs/core/src/executor/mod.rs deleted file mode 100644 index a5a305c6..00000000 --- a/codex-rs/core/src/executor/mod.rs +++ /dev/null @@ -1,64 +0,0 @@ -mod backends; -mod cache; -mod runner; -mod sandbox; - -pub(crate) use backends::ExecutionMode; -pub(crate) use runner::ExecutionRequest; -pub(crate) use runner::Executor; -pub(crate) use runner::ExecutorConfig; -pub(crate) use runner::normalize_exec_result; - -pub(crate) mod linkers { - use crate::codex::ExecCommandContext; - use crate::exec::ExecParams; - use crate::exec::StdoutStream; - use crate::executor::backends::ExecutionMode; - use crate::executor::runner::ExecutionRequest; - - pub struct PreparedExec { - pub(crate) context: ExecCommandContext, - pub(crate) request: ExecutionRequest, - } - - impl PreparedExec { - pub fn new( - context: ExecCommandContext, - params: ExecParams, - approval_command: Vec, - mode: ExecutionMode, - stdout_stream: Option, - use_shell_profile: bool, - ) -> Self { - let request = ExecutionRequest { - params, - approval_command, - mode, - stdout_stream, - use_shell_profile, - }; - - Self { context, request } - } - } -} - -pub mod errors { - use crate::error::CodexErr; - use crate::function_tool::FunctionCallError; - use thiserror::Error; - - #[derive(Debug, Error)] - pub enum ExecError { - #[error(transparent)] - Function(#[from] FunctionCallError), - #[error(transparent)] - Codex(#[from] CodexErr), - } - - impl ExecError { - pub(crate) fn rejection(msg: impl Into) -> Self { - FunctionCallError::RespondToModel(msg.into()).into() - } - } -} diff --git a/codex-rs/core/src/executor/runner.rs b/codex-rs/core/src/executor/runner.rs deleted file mode 100644 index befa8336..00000000 --- a/codex-rs/core/src/executor/runner.rs +++ /dev/null @@ -1,387 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; -use std::sync::RwLock; -use std::time::Duration; - -use super::backends::ExecutionMode; -use super::backends::backend_for_mode; -use super::cache::ApprovalCache; -use crate::codex::ExecCommandContext; -use crate::codex::Session; -use crate::error::CodexErr; -use crate::error::SandboxErr; -use crate::error::get_error_message_ui; -use crate::exec::ExecParams; -use crate::exec::ExecToolCallOutput; -use crate::exec::SandboxType; -use crate::exec::StdoutStream; -use crate::exec::StreamOutput; -use crate::exec::process_exec_tool_call; -use crate::executor::errors::ExecError; -use crate::executor::sandbox::select_sandbox; -use crate::function_tool::FunctionCallError; -use crate::protocol::AskForApproval; -use crate::protocol::ReviewDecision; -use crate::protocol::SandboxPolicy; -use crate::shell; -use codex_otel::otel_event_manager::ToolDecisionSource; - -#[derive(Clone, Debug)] -pub(crate) struct ExecutorConfig { - pub(crate) sandbox_policy: SandboxPolicy, - pub(crate) sandbox_cwd: PathBuf, - codex_linux_sandbox_exe: Option, -} - -impl ExecutorConfig { - pub(crate) fn new( - sandbox_policy: SandboxPolicy, - sandbox_cwd: PathBuf, - codex_linux_sandbox_exe: Option, - ) -> Self { - Self { - sandbox_policy, - sandbox_cwd, - codex_linux_sandbox_exe, - } - } -} - -/// Coordinates sandbox selection, backend-specific preparation, and command -/// execution for tool calls requested by the model. -pub(crate) struct Executor { - approval_cache: ApprovalCache, - config: Arc>, -} - -impl Executor { - pub(crate) fn new(config: ExecutorConfig) -> Self { - Self { - approval_cache: ApprovalCache::default(), - config: Arc::new(RwLock::new(config)), - } - } - - /// Updates the sandbox policy and working directory used for future - /// executions without recreating the executor. - pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) { - if let Ok(mut cfg) = self.config.write() { - cfg.sandbox_policy = sandbox_policy; - cfg.sandbox_cwd = sandbox_cwd; - } - } - - /// Runs a prepared execution request end-to-end: prepares parameters, decides on - /// sandbox placement (prompting the user when necessary), launches the command, - /// and lets the backend post-process the final output. - pub(crate) async fn run( - &self, - mut request: ExecutionRequest, - session: &Session, - approval_policy: AskForApproval, - context: &ExecCommandContext, - ) -> Result { - if matches!(request.mode, ExecutionMode::Shell) { - request.params = - maybe_translate_shell_command(request.params, session, request.use_shell_profile); - } - - // Step 1: Normalise parameters via the selected backend. - let backend = backend_for_mode(&request.mode); - let stdout_stream = if backend.stream_stdout(&request.mode) { - request.stdout_stream.clone() - } else { - None - }; - request.params = backend - .prepare(request.params, &request.mode) - .map_err(ExecError::from)?; - - // Step 2: Snapshot sandbox configuration so it stays stable for this run. - let config = self - .config - .read() - .map_err(|_| ExecError::rejection("executor config poisoned"))? - .clone(); - - // Step 3: Decide sandbox placement, prompting for approval when needed. - let sandbox_decision = select_sandbox( - &request, - approval_policy, - self.approval_cache.snapshot(), - &config, - session, - &context.sub_id, - &context.call_id, - &context.otel_event_manager, - ) - .await?; - if sandbox_decision.record_session_approval { - self.approval_cache.insert(request.approval_command.clone()); - } - - // Step 4: Launch the command within the chosen sandbox. - let first_attempt = self - .spawn( - request.params.clone(), - sandbox_decision.initial_sandbox, - &config, - stdout_stream.clone(), - ) - .await; - - // Step 5: Handle sandbox outcomes, optionally escalating to an unsandboxed retry. - match first_attempt { - Ok(output) => Ok(output), - Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => { - Err(CodexErr::Sandbox(SandboxErr::Timeout { output }).into()) - } - Err(CodexErr::Sandbox(error)) => { - if sandbox_decision.escalate_on_failure { - self.retry_without_sandbox( - &request, - &config, - session, - context, - stdout_stream, - error, - ) - .await - } else { - Err(ExecError::rejection(format!( - "failed in sandbox {:?} with execution error: {error:?}", - sandbox_decision.initial_sandbox - ))) - } - } - Err(err) => Err(err.into()), - } - } - - /// Fallback path invoked when a sandboxed run is denied so the user can - /// approve rerunning without isolation. - async fn retry_without_sandbox( - &self, - request: &ExecutionRequest, - config: &ExecutorConfig, - session: &Session, - context: &ExecCommandContext, - stdout_stream: Option, - sandbox_error: SandboxErr, - ) -> Result { - session - .notify_background_event( - &context.sub_id, - format!("Execution failed: {sandbox_error}"), - ) - .await; - let decision = session - .request_command_approval( - context.sub_id.to_string(), - context.call_id.to_string(), - request.approval_command.clone(), - request.params.cwd.clone(), - Some("command failed; retry without sandbox?".to_string()), - ) - .await; - - context.otel_event_manager.tool_decision( - &context.tool_name, - &context.call_id, - decision, - ToolDecisionSource::User, - ); - match decision { - ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { - if matches!(decision, ReviewDecision::ApprovedForSession) { - self.approval_cache.insert(request.approval_command.clone()); - } - session - .notify_background_event(&context.sub_id, "retrying command without sandbox") - .await; - - let retry_output = self - .spawn( - request.params.clone(), - SandboxType::None, - config, - stdout_stream, - ) - .await?; - - Ok(retry_output) - } - ReviewDecision::Denied | ReviewDecision::Abort => { - Err(ExecError::rejection("exec command rejected by user")) - } - } - } - - async fn spawn( - &self, - params: ExecParams, - sandbox: SandboxType, - config: &ExecutorConfig, - stdout_stream: Option, - ) -> Result { - process_exec_tool_call( - params, - sandbox, - &config.sandbox_policy, - &config.sandbox_cwd, - &config.codex_linux_sandbox_exe, - stdout_stream, - ) - .await - } -} - -fn maybe_translate_shell_command( - params: ExecParams, - session: &Session, - use_shell_profile: bool, -) -> ExecParams { - let should_translate = - matches!(session.user_shell(), shell::Shell::PowerShell(_)) || use_shell_profile; - - if should_translate - && let Some(command) = session - .user_shell() - .format_default_shell_invocation(params.command.clone()) - { - return ExecParams { command, ..params }; - } - - params -} - -pub(crate) struct ExecutionRequest { - pub params: ExecParams, - pub approval_command: Vec, - pub mode: ExecutionMode, - pub stdout_stream: Option, - pub use_shell_profile: bool, -} - -pub(crate) struct NormalizedExecOutput<'a> { - borrowed: Option<&'a ExecToolCallOutput>, - synthetic: Option, -} - -impl<'a> NormalizedExecOutput<'a> { - pub(crate) fn event_output(&'a self) -> &'a ExecToolCallOutput { - match (self.borrowed, self.synthetic.as_ref()) { - (Some(output), _) => output, - (None, Some(output)) => output, - (None, None) => unreachable!("normalized exec output missing data"), - } - } -} - -/// Converts a raw execution result into a uniform view that always exposes an -/// [`ExecToolCallOutput`], synthesizing error output when the command fails -/// before producing a response. -pub(crate) fn normalize_exec_result( - result: &Result, -) -> NormalizedExecOutput<'_> { - match result { - Ok(output) => NormalizedExecOutput { - borrowed: Some(output), - synthetic: None, - }, - Err(ExecError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { output }))) => { - NormalizedExecOutput { - borrowed: Some(output.as_ref()), - synthetic: None, - } - } - Err(err) => { - let message = match err { - ExecError::Function(FunctionCallError::RespondToModel(msg)) => msg.clone(), - ExecError::Codex(e) => get_error_message_ui(e), - }; - let synthetic = ExecToolCallOutput { - exit_code: -1, - stdout: StreamOutput::new(String::new()), - stderr: StreamOutput::new(message.clone()), - aggregated_output: StreamOutput::new(message), - duration: Duration::default(), - timed_out: false, - }; - NormalizedExecOutput { - borrowed: None, - synthetic: Some(synthetic), - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::error::CodexErr; - use crate::error::EnvVarError; - use crate::error::SandboxErr; - use crate::exec::StreamOutput; - use pretty_assertions::assert_eq; - - fn make_output(text: &str) -> ExecToolCallOutput { - ExecToolCallOutput { - exit_code: 1, - stdout: StreamOutput::new(String::new()), - stderr: StreamOutput::new(String::new()), - aggregated_output: StreamOutput::new(text.to_string()), - duration: Duration::from_millis(123), - timed_out: false, - } - } - - #[test] - fn normalize_success_borrows() { - let out = make_output("ok"); - let result: Result = Ok(out); - let normalized = normalize_exec_result(&result); - assert_eq!(normalized.event_output().aggregated_output.text, "ok"); - } - - #[test] - fn normalize_timeout_borrows_embedded_output() { - let out = make_output("timed out payload"); - let err = CodexErr::Sandbox(SandboxErr::Timeout { - output: Box::new(out), - }); - let result: Result = Err(ExecError::Codex(err)); - let normalized = normalize_exec_result(&result); - assert_eq!( - normalized.event_output().aggregated_output.text, - "timed out payload" - ); - } - - #[test] - fn normalize_function_error_synthesizes_payload() { - let err = FunctionCallError::RespondToModel("boom".to_string()); - let result: Result = Err(ExecError::Function(err)); - let normalized = normalize_exec_result(&result); - assert_eq!(normalized.event_output().aggregated_output.text, "boom"); - } - - #[test] - fn normalize_codex_error_synthesizes_user_message() { - // Use a simple EnvVar error which formats to a clear message - let e = CodexErr::EnvVar(EnvVarError { - var: "FOO".to_string(), - instructions: Some("set it".to_string()), - }); - let result: Result = Err(ExecError::Codex(e)); - let normalized = normalize_exec_result(&result); - assert!( - normalized - .event_output() - .aggregated_output - .text - .contains("Missing environment variable: `FOO`"), - "expected synthesized user-friendly message" - ); - } -} diff --git a/codex-rs/core/src/executor/sandbox.rs b/codex-rs/core/src/executor/sandbox.rs deleted file mode 100644 index 5c01ff69..00000000 --- a/codex-rs/core/src/executor/sandbox.rs +++ /dev/null @@ -1,405 +0,0 @@ -use crate::apply_patch::ApplyPatchExec; -use crate::codex::Session; -use crate::exec::SandboxType; -use crate::executor::ExecutionMode; -use crate::executor::ExecutionRequest; -use crate::executor::ExecutorConfig; -use crate::executor::errors::ExecError; -use crate::safety::SafetyCheck; -use crate::safety::assess_command_safety; -use crate::safety::assess_patch_safety; -use codex_otel::otel_event_manager::OtelEventManager; -use codex_otel::otel_event_manager::ToolDecisionSource; -use codex_protocol::protocol::AskForApproval; -use codex_protocol::protocol::ReviewDecision; -use std::collections::HashSet; - -/// Sandbox placement options selected for an execution run, including whether -/// to escalate after failures and whether approvals should persist. -pub(crate) struct SandboxDecision { - pub(crate) initial_sandbox: SandboxType, - pub(crate) escalate_on_failure: bool, - pub(crate) record_session_approval: bool, -} - -impl SandboxDecision { - fn auto(sandbox: SandboxType, escalate_on_failure: bool) -> Self { - Self { - initial_sandbox: sandbox, - escalate_on_failure, - record_session_approval: false, - } - } - - fn user_override(record_session_approval: bool) -> Self { - Self { - initial_sandbox: SandboxType::None, - escalate_on_failure: false, - record_session_approval, - } - } -} - -fn should_escalate_on_failure(approval: AskForApproval, sandbox: SandboxType) -> bool { - matches!( - (approval, sandbox), - ( - AskForApproval::UnlessTrusted | AskForApproval::OnFailure, - SandboxType::MacosSeatbelt | SandboxType::LinuxSeccomp - ) - ) -} - -/// Determines how a command should be sandboxed, prompting the user when -/// policy requires explicit approval. -#[allow(clippy::too_many_arguments)] -pub async fn select_sandbox( - request: &ExecutionRequest, - approval_policy: AskForApproval, - approval_cache: HashSet>, - config: &ExecutorConfig, - session: &Session, - sub_id: &str, - call_id: &str, - otel_event_manager: &OtelEventManager, -) -> Result { - match &request.mode { - ExecutionMode::Shell => { - select_shell_sandbox( - request, - approval_policy, - approval_cache, - config, - session, - sub_id, - call_id, - otel_event_manager, - ) - .await - } - ExecutionMode::ApplyPatch(exec) => { - select_apply_patch_sandbox(exec, approval_policy, config) - } - } -} - -#[allow(clippy::too_many_arguments)] -async fn select_shell_sandbox( - request: &ExecutionRequest, - approval_policy: AskForApproval, - approved_snapshot: HashSet>, - config: &ExecutorConfig, - session: &Session, - sub_id: &str, - call_id: &str, - otel_event_manager: &OtelEventManager, -) -> Result { - let command_for_safety = if request.approval_command.is_empty() { - request.params.command.clone() - } else { - request.approval_command.clone() - }; - - let safety = assess_command_safety( - &command_for_safety, - approval_policy, - &config.sandbox_policy, - &approved_snapshot, - request.params.with_escalated_permissions.unwrap_or(false), - ); - - match safety { - SafetyCheck::AutoApprove { - sandbox_type, - user_explicitly_approved, - } => { - let mut decision = SandboxDecision::auto( - sandbox_type, - should_escalate_on_failure(approval_policy, sandbox_type), - ); - if user_explicitly_approved { - decision.record_session_approval = true; - } - let (decision_for_event, source) = if user_explicitly_approved { - (ReviewDecision::ApprovedForSession, ToolDecisionSource::User) - } else { - (ReviewDecision::Approved, ToolDecisionSource::Config) - }; - otel_event_manager.tool_decision("local_shell", call_id, decision_for_event, source); - Ok(decision) - } - SafetyCheck::AskUser => { - let decision = session - .request_command_approval( - sub_id.to_string(), - call_id.to_string(), - request.approval_command.clone(), - request.params.cwd.clone(), - request.params.justification.clone(), - ) - .await; - - otel_event_manager.tool_decision( - "local_shell", - call_id, - decision, - ToolDecisionSource::User, - ); - match decision { - ReviewDecision::Approved => Ok(SandboxDecision::user_override(false)), - ReviewDecision::ApprovedForSession => Ok(SandboxDecision::user_override(true)), - ReviewDecision::Denied | ReviewDecision::Abort => { - Err(ExecError::rejection("exec command rejected by user")) - } - } - } - SafetyCheck::Reject { reason } => Err(ExecError::rejection(format!( - "exec command rejected: {reason}" - ))), - } -} - -fn select_apply_patch_sandbox( - exec: &ApplyPatchExec, - approval_policy: AskForApproval, - config: &ExecutorConfig, -) -> Result { - if exec.user_explicitly_approved_this_action { - return Ok(SandboxDecision::user_override(false)); - } - - match assess_patch_safety( - &exec.action, - approval_policy, - &config.sandbox_policy, - &config.sandbox_cwd, - ) { - SafetyCheck::AutoApprove { sandbox_type, .. } => Ok(SandboxDecision::auto( - sandbox_type, - should_escalate_on_failure(approval_policy, sandbox_type), - )), - SafetyCheck::AskUser => Err(ExecError::rejection( - "patch requires approval but none was recorded", - )), - SafetyCheck::Reject { reason } => { - Err(ExecError::rejection(format!("patch rejected: {reason}"))) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::codex::make_session_and_context; - use crate::exec::ExecParams; - use crate::function_tool::FunctionCallError; - use crate::protocol::SandboxPolicy; - use codex_apply_patch::ApplyPatchAction; - use pretty_assertions::assert_eq; - - #[tokio::test] - async fn select_apply_patch_user_override_when_explicit() { - let (session, ctx) = make_session_and_context(); - let tmp = tempfile::tempdir().expect("tmp"); - let p = tmp.path().join("a.txt"); - let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); - let exec = ApplyPatchExec { - action, - user_explicitly_approved_this_action: true, - }; - let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); - let request = ExecutionRequest { - params: ExecParams { - command: vec!["apply_patch".into()], - cwd: std::env::temp_dir(), - timeout_ms: None, - env: std::collections::HashMap::new(), - with_escalated_permissions: None, - justification: None, - }, - approval_command: vec!["apply_patch".into()], - mode: ExecutionMode::ApplyPatch(exec), - stdout_stream: None, - use_shell_profile: false, - }; - let otel_event_manager = ctx.client.get_otel_event_manager(); - let decision = select_sandbox( - &request, - AskForApproval::OnRequest, - Default::default(), - &cfg, - &session, - "sub", - "call", - &otel_event_manager, - ) - .await - .expect("ok"); - // Explicit user override runs without sandbox - assert_eq!(decision.initial_sandbox, SandboxType::None); - assert_eq!(decision.escalate_on_failure, false); - } - - #[tokio::test] - async fn select_apply_patch_autoapprove_in_danger() { - let (session, ctx) = make_session_and_context(); - let tmp = tempfile::tempdir().expect("tmp"); - let p = tmp.path().join("a.txt"); - let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); - let exec = ApplyPatchExec { - action, - user_explicitly_approved_this_action: false, - }; - let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None); - let request = ExecutionRequest { - params: ExecParams { - command: vec!["apply_patch".into()], - cwd: std::env::temp_dir(), - timeout_ms: None, - env: std::collections::HashMap::new(), - with_escalated_permissions: None, - justification: None, - }, - approval_command: vec!["apply_patch".into()], - mode: ExecutionMode::ApplyPatch(exec), - stdout_stream: None, - use_shell_profile: false, - }; - let otel_event_manager = ctx.client.get_otel_event_manager(); - let decision = select_sandbox( - &request, - AskForApproval::OnRequest, - Default::default(), - &cfg, - &session, - "sub", - "call", - &otel_event_manager, - ) - .await - .expect("ok"); - // On platforms with a sandbox, DangerFullAccess still prefers it - let expected = crate::safety::get_platform_sandbox().unwrap_or(SandboxType::None); - assert_eq!(decision.initial_sandbox, expected); - assert_eq!(decision.escalate_on_failure, false); - } - - #[tokio::test] - async fn select_apply_patch_requires_approval_on_unless_trusted() { - let (session, ctx) = make_session_and_context(); - let tempdir = tempfile::tempdir().expect("tmpdir"); - let p = tempdir.path().join("a.txt"); - let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); - let exec = ApplyPatchExec { - action, - user_explicitly_approved_this_action: false, - }; - let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); - let request = ExecutionRequest { - params: ExecParams { - command: vec!["apply_patch".into()], - cwd: std::env::temp_dir(), - timeout_ms: None, - env: std::collections::HashMap::new(), - with_escalated_permissions: None, - justification: None, - }, - approval_command: vec!["apply_patch".into()], - mode: ExecutionMode::ApplyPatch(exec), - stdout_stream: None, - use_shell_profile: false, - }; - let otel_event_manager = ctx.client.get_otel_event_manager(); - let result = select_sandbox( - &request, - AskForApproval::UnlessTrusted, - Default::default(), - &cfg, - &session, - "sub", - "call", - &otel_event_manager, - ) - .await; - match result { - Ok(_) => panic!("expected error"), - Err(ExecError::Function(FunctionCallError::RespondToModel(msg))) => { - assert!(msg.contains("requires approval")) - } - Err(other) => panic!("unexpected error: {other:?}"), - } - } - - #[tokio::test] - async fn select_shell_autoapprove_in_danger_mode() { - let (session, ctx) = make_session_and_context(); - let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None); - let request = ExecutionRequest { - params: ExecParams { - command: vec!["some-unknown".into()], - cwd: std::env::temp_dir(), - timeout_ms: None, - env: std::collections::HashMap::new(), - with_escalated_permissions: None, - justification: None, - }, - approval_command: vec!["some-unknown".into()], - mode: ExecutionMode::Shell, - stdout_stream: None, - use_shell_profile: false, - }; - let otel_event_manager = ctx.client.get_otel_event_manager(); - let decision = select_sandbox( - &request, - AskForApproval::OnRequest, - Default::default(), - &cfg, - &session, - "sub", - "call", - &otel_event_manager, - ) - .await - .expect("ok"); - assert_eq!(decision.initial_sandbox, SandboxType::None); - assert_eq!(decision.escalate_on_failure, false); - } - - #[cfg(any(target_os = "macos", target_os = "linux"))] - #[tokio::test] - async fn select_shell_escalates_on_failure_with_platform_sandbox() { - let (session, ctx) = make_session_and_context(); - let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); - let request = ExecutionRequest { - params: ExecParams { - // Unknown command => untrusted but not flagged dangerous - command: vec!["some-unknown".into()], - cwd: std::env::temp_dir(), - timeout_ms: None, - env: std::collections::HashMap::new(), - with_escalated_permissions: None, - justification: None, - }, - approval_command: vec!["some-unknown".into()], - mode: ExecutionMode::Shell, - stdout_stream: None, - use_shell_profile: false, - }; - let otel_event_manager = ctx.client.get_otel_event_manager(); - let decision = select_sandbox( - &request, - AskForApproval::OnFailure, - Default::default(), - &cfg, - &session, - "sub", - "call", - &otel_event_manager, - ) - .await - .expect("ok"); - // On macOS/Linux we should have a platform sandbox and escalate on failure - assert_ne!(decision.initial_sandbox, SandboxType::None); - assert_eq!(decision.escalate_on_failure, true); - } -} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 94350a44..c3f61642 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -27,7 +27,6 @@ pub mod error; pub mod exec; mod exec_command; pub mod exec_env; -pub mod executor; mod flags; pub mod git_info; pub mod landlock; diff --git a/codex-rs/core/src/safety.rs b/codex-rs/core/src/safety.rs index 0ed0f929..b976ae4a 100644 --- a/codex-rs/core/src/safety.rs +++ b/codex-rs/core/src/safety.rs @@ -125,10 +125,9 @@ pub fn assess_command_safety( // the session _because_ they know it needs to run outside a sandbox. if is_known_safe_command(command) || approved.contains(command) { - let user_explicitly_approved = approved.contains(command); return SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, - user_explicitly_approved, + user_explicitly_approved: false, }; } @@ -381,7 +380,7 @@ mod tests { safety_check, SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, - user_explicitly_approved: true, + user_explicitly_approved: false, } ); } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 994352ed..a67b9dda 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -1,9 +1,9 @@ use crate::RolloutRecorder; use crate::exec_command::ExecSessionManager; -use crate::executor::Executor; use crate::mcp_connection_manager::McpConnectionManager; use crate::unified_exec::UnifiedExecSessionManager; use crate::user_notification::UserNotifier; +use std::path::PathBuf; use tokio::sync::Mutex; pub(crate) struct SessionServices { @@ -12,7 +12,7 @@ pub(crate) struct SessionServices { pub(crate) unified_exec_manager: UnifiedExecSessionManager, pub(crate) notifier: UserNotifier, pub(crate) rollout: Mutex>, + pub(crate) codex_linux_sandbox_exe: Option, pub(crate) user_shell: crate::shell::Shell, pub(crate) show_raw_agent_reasoning: bool, - pub(crate) executor: Executor, } diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index f170a10c..ee0c5fc9 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -1,5 +1,7 @@ //! Session-wide mutable state. +use std::collections::HashSet; + use codex_protocol::models::ResponseItem; use crate::conversation_history::ConversationHistory; @@ -10,6 +12,7 @@ use crate::protocol::TokenUsageInfo; /// Persistent, session-scoped state previously stored directly on `Session`. #[derive(Default)] pub(crate) struct SessionState { + pub(crate) approved_commands: HashSet>, pub(crate) history: ConversationHistory, pub(crate) token_info: Option, pub(crate) latest_rate_limits: Option, @@ -41,6 +44,15 @@ impl SessionState { self.history.replace(items); } + // Approved command helpers + pub(crate) fn add_approved_command(&mut self, cmd: Vec) { + self.approved_commands.insert(cmd); + } + + pub(crate) fn approved_commands_ref(&self) -> &HashSet> { + &self.approved_commands + } + // Token/rate limit helpers pub(crate) fn update_token_info_from_usage( &mut self, diff --git a/codex-rs/core/tests/suite/seatbelt.rs b/codex-rs/core/tests/suite/seatbelt.rs index a879d3e9..78f599d4 100644 --- a/codex-rs/core/tests/suite/seatbelt.rs +++ b/codex-rs/core/tests/suite/seatbelt.rs @@ -169,12 +169,6 @@ async fn python_getpwuid_works_under_seatbelt() { return; } - // For local dev. - if which::which("python3").is_err() { - eprintln!("python3 not found in PATH, skipping test."); - return; - } - // ReadOnly is sufficient here since we are only exercising user lookup. let policy = SandboxPolicy::ReadOnly; let command_cwd = std::env::current_dir().expect("getcwd");