#![cfg(not(target_os = "windows"))] #![allow(clippy::unwrap_used, clippy::expect_used)] use anyhow::Context; use anyhow::Result; use codex_core::features::Feature; use codex_core::model_family::find_family_for_model; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::user_input::UserInput; use core_test_support::assert_regex_match; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_custom_tool_call; use core_test_support::responses::ev_function_call; use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use regex_lite::Regex; use serde_json::Value; use serde_json::json; async fn submit_turn( test: &TestCodex, prompt: &str, approval_policy: AskForApproval, sandbox_policy: SandboxPolicy, ) -> Result<()> { let session_model = test.session_configured.model.clone(); test.codex .submit(Op::UserTurn { items: vec![UserInput::Text { text: prompt.into(), }], final_output_json_schema: None, cwd: test.cwd.path().to_path_buf(), approval_policy, sandbox_policy, model: session_model, effort: None, summary: ReasoningSummary::Auto, }) .await?; wait_for_event(&test.codex, |event| { matches!(event, EventMsg::TaskComplete(_)) }) .await; Ok(()) } fn tool_names(body: &Value) -> Vec { body.get("tools") .and_then(Value::as_array) .map(|tools| { tools .iter() .filter_map(|tool| { tool.get("name") .or_else(|| tool.get("type")) .and_then(Value::as_str) .map(str::to_string) }) .collect() }) .unwrap_or_default() } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn custom_tool_unknown_returns_custom_output_error() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let mut builder = test_codex(); let test = builder.build(&server).await?; let call_id = "custom-unsupported"; let tool_name = "unsupported_tool"; mount_sse_once( &server, sse(vec![ ev_response_created("resp-1"), ev_custom_tool_call(call_id, tool_name, "\"payload\""), ev_completed("resp-1"), ]), ) .await; let mock = mount_sse_once( &server, sse(vec![ ev_assistant_message("msg-1", "done"), ev_completed("resp-2"), ]), ) .await; submit_turn( &test, "invoke custom tool", AskForApproval::Never, SandboxPolicy::DangerFullAccess, ) .await?; let item = mock.single_request().custom_tool_call_output(call_id); let output = item .get("output") .and_then(Value::as_str) .unwrap_or_default(); let expected = format!("unsupported custom tool call: {tool_name}"); assert_eq!(output, expected); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn shell_escalated_permissions_rejected_then_ok() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let mut builder = test_codex().with_config(|config| { config.model = "gpt-5".to_string(); config.model_family = find_family_for_model("gpt-5").expect("gpt-5 is a valid model"); }); let test = builder.build(&server).await?; let command = ["/bin/echo", "shell ok"]; let call_id_blocked = "shell-blocked"; let call_id_success = "shell-success"; let first_args = json!({ "command": command, "timeout_ms": 1_000, "with_escalated_permissions": true, }); let second_args = json!({ "command": command, "timeout_ms": 1_000, }); mount_sse_once( &server, sse(vec![ ev_response_created("resp-1"), ev_function_call( call_id_blocked, "shell", &serde_json::to_string(&first_args)?, ), ev_completed("resp-1"), ]), ) .await; let second_mock = mount_sse_once( &server, sse(vec![ ev_response_created("resp-2"), ev_function_call( call_id_success, "shell", &serde_json::to_string(&second_args)?, ), ev_completed("resp-2"), ]), ) .await; let third_mock = mount_sse_once( &server, sse(vec![ ev_assistant_message("msg-1", "done"), ev_completed("resp-3"), ]), ) .await; submit_turn( &test, "run the shell command", AskForApproval::Never, SandboxPolicy::DangerFullAccess, ) .await?; let policy = AskForApproval::Never; let expected_message = format!( "approval policy is {policy:?}; reject command — you should not ask for escalated permissions if the approval policy is {policy:?}" ); let blocked_item = second_mock .single_request() .function_call_output(call_id_blocked); assert_eq!( blocked_item.get("output").and_then(Value::as_str), Some(expected_message.as_str()), "unexpected rejection message" ); let success_item = third_mock .single_request() .function_call_output(call_id_success); let output_json: Value = serde_json::from_str( success_item .get("output") .and_then(Value::as_str) .expect("success output string"), )?; assert_eq!( output_json["metadata"]["exit_code"].as_i64(), Some(0), "expected exit code 0 after rerunning without escalation", ); let stdout = output_json["output"].as_str().unwrap_or_default(); let stdout_pattern = r"(?s)^shell ok\n?$"; assert_regex_match(stdout_pattern, stdout); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn sandbox_denied_shell_returns_original_output() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let mut builder = test_codex().with_config(|config| { config.model = "gpt-5-codex".to_string(); config.model_family = find_family_for_model("gpt-5-codex").expect("gpt-5-codex model family"); }); let fixture = builder.build(&server).await?; let call_id = "sandbox-denied-shell"; let target_path = fixture.workspace_path("sandbox-denied.txt"); let sentinel = "sandbox-denied sentinel output"; let command = vec![ "/bin/sh".to_string(), "-c".to_string(), format!( "printf {sentinel:?}; printf {content:?} > {path:?}", sentinel = format!("{sentinel}\n"), content = "sandbox denied", path = &target_path ), ]; let args = json!({ "command": command, "timeout_ms": 1_000, }); let responses = vec![ sse(vec![ ev_response_created("resp-1"), ev_function_call(call_id, "shell", &serde_json::to_string(&args)?), ev_completed("resp-1"), ]), sse(vec![ ev_assistant_message("msg-1", "done"), ev_completed("resp-2"), ]), ]; let mock = mount_sse_sequence(&server, responses).await; fixture .submit_turn_with_policy( "run a command that should be denied by the read-only sandbox", SandboxPolicy::ReadOnly, ) .await?; let output_text = mock .function_call_output_text(call_id) .context("shell output present")?; let exit_code_line = output_text .lines() .next() .context("exit code line present")?; let exit_code = exit_code_line .strip_prefix("Exit code: ") .context("exit code prefix present")? .trim() .parse::() .context("exit code is integer")?; let body = output_text; let body_lower = body.to_lowercase(); // Required for multi-OS. let has_denial = body_lower.contains("permission denied") || body_lower.contains("operation not permitted") || body_lower.contains("read-only file system"); assert!( has_denial, "expected sandbox denial details in tool output: {body}" ); assert!( body.contains(sentinel), "expected sentinel output from command to reach the model: {body}" ); let target_path_str = target_path .to_str() .context("target path string representation")?; assert!( body.contains(target_path_str), "expected sandbox error to mention denied path: {body}" ); assert!( !body_lower.contains("failed in sandbox"), "expected original tool output, found fallback message: {body}" ); assert_ne!( exit_code, 0, "sandbox denial should surface a non-zero exit code" ); Ok(()) } async fn collect_tools(use_unified_exec: bool) -> Result> { let server = start_mock_server().await; let responses = vec![sse(vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "done"), ev_completed("resp-1"), ])]; let mock = mount_sse_sequence(&server, responses).await; let mut builder = test_codex().with_config(move |config| { if use_unified_exec { config.features.enable(Feature::UnifiedExec); } else { config.features.disable(Feature::UnifiedExec); } }); let test = builder.build(&server).await?; submit_turn( &test, "list tools", AskForApproval::Never, SandboxPolicy::DangerFullAccess, ) .await?; let first_body = mock.single_request().body_json(); Ok(tool_names(&first_body)) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_spec_toggle_end_to_end() -> Result<()> { skip_if_no_network!(Ok(())); let tools_disabled = collect_tools(false).await?; assert!( !tools_disabled.iter().any(|name| name == "exec_command"), "tools list should not include exec_command when disabled: {tools_disabled:?}" ); assert!( !tools_disabled.iter().any(|name| name == "write_stdin"), "tools list should not include write_stdin when disabled: {tools_disabled:?}" ); let tools_enabled = collect_tools(true).await?; assert!( tools_enabled.iter().any(|name| name == "exec_command"), "tools list should include exec_command when enabled: {tools_enabled:?}" ); assert!( tools_enabled.iter().any(|name| name == "write_stdin"), "tools list should include write_stdin when enabled: {tools_enabled:?}" ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn shell_timeout_includes_timeout_prefix_and_metadata() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let mut builder = test_codex().with_config(|config| { config.model = "gpt-5".to_string(); config.model_family = find_family_for_model("gpt-5").expect("gpt-5 is a valid model"); }); let test = builder.build(&server).await?; let call_id = "shell-timeout"; let timeout_ms = 50u64; let args = json!({ "command": ["/bin/sh", "-c", "yes line | head -n 400; sleep 1"], "timeout_ms": timeout_ms, }); mount_sse_once( &server, sse(vec![ ev_response_created("resp-1"), ev_function_call(call_id, "shell", &serde_json::to_string(&args)?), ev_completed("resp-1"), ]), ) .await; let second_mock = mount_sse_once( &server, sse(vec![ ev_assistant_message("msg-1", "done"), ev_completed("resp-2"), ]), ) .await; submit_turn( &test, "run a long command", AskForApproval::Never, SandboxPolicy::DangerFullAccess, ) .await?; let timeout_item = second_mock.single_request().function_call_output(call_id); let output_str = timeout_item .get("output") .and_then(Value::as_str) .expect("timeout output string"); // The exec path can report a timeout in two ways depending on timing: // 1) Structured JSON with exit_code 124 and a timeout prefix (preferred), or // 2) A plain error string if the child is observed as killed by a signal first. if let Ok(output_json) = serde_json::from_str::(output_str) { assert_eq!( output_json["metadata"]["exit_code"].as_i64(), Some(124), "expected timeout exit code 124", ); let stdout = output_json["output"].as_str().unwrap_or_default(); assert!( stdout.contains("command timed out"), "timeout output missing `command timed out`: {stdout}" ); } else { // Fallback: accept the signal classification path to deflake the test. let signal_pattern = r"(?is)^execution error:.*signal.*$"; assert_regex_match(signal_pattern, output_str); } Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn shell_spawn_failure_truncates_exec_error() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let mut builder = test_codex().with_config(|cfg| { cfg.sandbox_policy = SandboxPolicy::DangerFullAccess; }); let test = builder.build(&server).await?; let call_id = "shell-spawn-failure"; let bogus_component = "missing-bin-".repeat(700); let bogus_exe = test .cwd .path() .join(bogus_component) .to_string_lossy() .into_owned(); let args = json!({ "command": [bogus_exe], "timeout_ms": 1_000, }); mount_sse_once( &server, sse(vec![ ev_response_created("resp-1"), ev_function_call(call_id, "shell", &serde_json::to_string(&args)?), ev_completed("resp-1"), ]), ) .await; let second_mock = mount_sse_once( &server, sse(vec![ ev_assistant_message("msg-1", "done"), ev_completed("resp-2"), ]), ) .await; submit_turn( &test, "spawn a missing binary", AskForApproval::Never, SandboxPolicy::DangerFullAccess, ) .await?; let failure_item = second_mock.single_request().function_call_output(call_id); let output = failure_item .get("output") .and_then(Value::as_str) .expect("spawn failure output string"); let spawn_error_pattern = r#"(?s)^Exit code: -?\d+ Wall time: [0-9]+(?:\.[0-9]+)? seconds Output: execution error: .*$"#; let spawn_truncated_pattern = r#"(?s)^Exit code: -?\d+ Wall time: [0-9]+(?:\.[0-9]+)? seconds Total output lines: \d+ Output: execution error: .*$"#; let spawn_error_regex = Regex::new(spawn_error_pattern)?; let spawn_truncated_regex = Regex::new(spawn_truncated_pattern)?; if !spawn_error_regex.is_match(output) && !spawn_truncated_regex.is_match(output) { let fallback_pattern = r"(?s)^execution error: .*$"; assert_regex_match(fallback_pattern, output); } assert!(output.len() <= 10 * 1024); Ok(()) }