#[cfg(unix)] use std::os::unix::process::ExitStatusExt; use std::collections::HashMap; use std::io; use std::path::Path; use std::path::PathBuf; use std::process::ExitStatus; use std::time::Duration; use std::time::Instant; use async_channel::Sender; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::BufReader; use tokio::process::Child; use crate::error::CodexErr; use crate::error::Result; use crate::error::SandboxErr; use crate::landlock::spawn_command_under_linux_sandbox; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecCommandOutputDeltaEvent; use crate::protocol::ExecOutputStream; use crate::protocol::SandboxPolicy; use crate::seatbelt::spawn_command_under_seatbelt; use crate::spawn::StdioPolicy; use crate::spawn::spawn_child_async; const DEFAULT_TIMEOUT_MS: u64 = 10_000; // Hardcode these since it does not seem worth including the libc crate just // for these. const SIGKILL_CODE: i32 = 9; const TIMEOUT_CODE: i32 = 64; const EXIT_CODE_SIGNAL_BASE: i32 = 128; // conventional shell: 128 + signal const EXEC_TIMEOUT_EXIT_CODE: i32 = 124; // conventional timeout exit code // I/O buffer sizing const READ_CHUNK_SIZE: usize = 8192; // bytes per read const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB /// Limit the number of ExecCommandOutputDelta events emitted per exec call. /// Aggregation still collects full output; only the live event stream is capped. pub(crate) const MAX_EXEC_OUTPUT_DELTAS_PER_CALL: usize = 10_000; #[derive(Clone, Debug)] pub struct ExecParams { pub command: Vec, pub cwd: PathBuf, pub timeout_ms: Option, pub env: HashMap, pub with_escalated_permissions: Option, pub justification: Option, } impl ExecParams { pub fn timeout_duration(&self) -> Duration { Duration::from_millis(self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS)) } } #[derive(Clone, Copy, Debug, PartialEq)] pub enum SandboxType { None, /// Only available on macOS. MacosSeatbelt, /// Only available on Linux. LinuxSeccomp, } #[derive(Clone)] pub struct StdoutStream { pub sub_id: String, pub call_id: String, pub tx_event: Sender, } pub async fn process_exec_tool_call( params: ExecParams, sandbox_type: SandboxType, sandbox_policy: &SandboxPolicy, sandbox_cwd: &Path, codex_linux_sandbox_exe: &Option, stdout_stream: Option, ) -> Result { let start = Instant::now(); let timeout_duration = params.timeout_duration(); let raw_output_result: std::result::Result = match sandbox_type { SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await, SandboxType::MacosSeatbelt => { let ExecParams { command, cwd: command_cwd, env, .. } = params; let child = spawn_command_under_seatbelt( command, command_cwd, sandbox_policy, sandbox_cwd, StdioPolicy::RedirectForShellTool, env, ) .await?; consume_truncated_output(child, timeout_duration, stdout_stream.clone()).await } SandboxType::LinuxSeccomp => { let ExecParams { command, cwd: command_cwd, env, .. } = params; let codex_linux_sandbox_exe = codex_linux_sandbox_exe .as_ref() .ok_or(CodexErr::LandlockSandboxExecutableNotProvided)?; let child = spawn_command_under_linux_sandbox( codex_linux_sandbox_exe, command, command_cwd, sandbox_policy, sandbox_cwd, StdioPolicy::RedirectForShellTool, env, ) .await?; consume_truncated_output(child, timeout_duration, stdout_stream).await } }; let duration = start.elapsed(); match raw_output_result { Ok(raw_output) => { #[allow(unused_mut)] let mut timed_out = raw_output.timed_out; #[cfg(target_family = "unix")] { if let Some(signal) = raw_output.exit_status.signal() { if signal == TIMEOUT_CODE { timed_out = true; } else { return Err(CodexErr::Sandbox(SandboxErr::Signal(signal))); } } } let mut exit_code = raw_output.exit_status.code().unwrap_or(-1); if timed_out { exit_code = EXEC_TIMEOUT_EXIT_CODE; } let stdout = raw_output.stdout.from_utf8_lossy(); let stderr = raw_output.stderr.from_utf8_lossy(); let aggregated_output = raw_output.aggregated_output.from_utf8_lossy(); let exec_output = ExecToolCallOutput { exit_code, stdout, stderr, aggregated_output, duration, timed_out, }; if timed_out { return Err(CodexErr::Sandbox(SandboxErr::Timeout { output: Box::new(exec_output), })); } if is_likely_sandbox_denied(sandbox_type, &exec_output) { return Err(CodexErr::Sandbox(SandboxErr::Denied { output: Box::new(exec_output), })); } Ok(exec_output) } Err(err) => { tracing::error!("exec error: {err}"); Err(err) } } } /// We don't have a fully deterministic way to tell if our command failed /// because of the sandbox - a command in the user's zshrc file might hit an /// error, but the command itself might fail or succeed for other reasons. /// For now, we conservatively check for well known command failure exit codes and /// also look for common sandbox denial keywords in the command output. fn is_likely_sandbox_denied(sandbox_type: SandboxType, exec_output: &ExecToolCallOutput) -> bool { if sandbox_type == SandboxType::None || exec_output.exit_code == 0 { return false; } // Quick rejects: well-known non-sandbox shell exit codes // 2: misuse of shell builtins // 126: permission denied // 127: command not found const QUICK_REJECT_EXIT_CODES: [i32; 3] = [2, 126, 127]; if QUICK_REJECT_EXIT_CODES.contains(&exec_output.exit_code) { return false; } const SANDBOX_DENIED_KEYWORDS: [&str; 6] = [ "operation not permitted", "permission denied", "read-only file system", "seccomp", "sandbox", "landlock", ]; if [ &exec_output.stderr.text, &exec_output.stdout.text, &exec_output.aggregated_output.text, ] .into_iter() .any(|section| { let lower = section.to_lowercase(); SANDBOX_DENIED_KEYWORDS .iter() .any(|needle| lower.contains(needle)) }) { return true; } #[cfg(unix)] { const SIGSYS_CODE: i32 = libc::SIGSYS; if sandbox_type == SandboxType::LinuxSeccomp && exec_output.exit_code == EXIT_CODE_SIGNAL_BASE + SIGSYS_CODE { return true; } } false } #[derive(Debug)] pub struct StreamOutput { pub text: T, pub truncated_after_lines: Option, } #[derive(Debug)] struct RawExecToolCallOutput { pub exit_status: ExitStatus, pub stdout: StreamOutput>, pub stderr: StreamOutput>, pub aggregated_output: StreamOutput>, pub timed_out: bool, } impl StreamOutput { pub fn new(text: String) -> Self { Self { text, truncated_after_lines: None, } } } impl StreamOutput> { pub fn from_utf8_lossy(&self) -> StreamOutput { StreamOutput { text: String::from_utf8_lossy(&self.text).to_string(), truncated_after_lines: self.truncated_after_lines, } } } #[inline] fn append_all(dst: &mut Vec, src: &[u8]) { dst.extend_from_slice(src); } #[derive(Debug)] pub struct ExecToolCallOutput { pub exit_code: i32, pub stdout: StreamOutput, pub stderr: StreamOutput, pub aggregated_output: StreamOutput, pub duration: Duration, pub timed_out: bool, } async fn exec( params: ExecParams, sandbox_policy: &SandboxPolicy, stdout_stream: Option, ) -> Result { let timeout = params.timeout_duration(); let ExecParams { command, cwd, env, .. } = params; let (program, args) = command.split_first().ok_or_else(|| { CodexErr::Io(io::Error::new( io::ErrorKind::InvalidInput, "command args are empty", )) })?; let arg0 = None; let child = spawn_child_async( PathBuf::from(program), args.into(), arg0, cwd, sandbox_policy, StdioPolicy::RedirectForShellTool, env, ) .await?; consume_truncated_output(child, timeout, stdout_stream).await } /// Consumes the output of a child process, truncating it so it is suitable for /// use as the output of a `shell` tool call. Also enforces specified timeout. async fn consume_truncated_output( mut child: Child, timeout: Duration, stdout_stream: Option, ) -> Result { // Both stdout and stderr were configured with `Stdio::piped()` // above, therefore `take()` should normally return `Some`. If it doesn't // we treat it as an exceptional I/O error let stdout_reader = child.stdout.take().ok_or_else(|| { CodexErr::Io(io::Error::other( "stdout pipe was unexpectedly not available", )) })?; let stderr_reader = child.stderr.take().ok_or_else(|| { CodexErr::Io(io::Error::other( "stderr pipe was unexpectedly not available", )) })?; let (agg_tx, agg_rx) = async_channel::unbounded::>(); let stdout_handle = tokio::spawn(read_capped( BufReader::new(stdout_reader), stdout_stream.clone(), false, Some(agg_tx.clone()), )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(stderr_reader), stdout_stream.clone(), true, Some(agg_tx.clone()), )); let (exit_status, timed_out) = tokio::select! { result = tokio::time::timeout(timeout, child.wait()) => { match result { Ok(status_result) => { let exit_status = status_result?; (exit_status, false) } Err(_) => { // timeout child.start_kill()?; // Debatable whether `child.wait().await` should be called here. (synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true) } } } _ = tokio::signal::ctrl_c() => { child.start_kill()?; (synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false) } }; let stdout = stdout_handle.await??; let stderr = stderr_handle.await??; drop(agg_tx); let mut combined_buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); while let Ok(chunk) = agg_rx.recv().await { append_all(&mut combined_buf, &chunk); } let aggregated_output = StreamOutput { text: combined_buf, truncated_after_lines: None, }; Ok(RawExecToolCallOutput { exit_status, stdout, stderr, aggregated_output, timed_out, }) } async fn read_capped( mut reader: R, stream: Option, is_stderr: bool, aggregate_tx: Option>>, ) -> io::Result>> { let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); let mut tmp = [0u8; READ_CHUNK_SIZE]; let mut emitted_deltas: usize = 0; // No caps: append all bytes loop { let n = reader.read(&mut tmp).await?; if n == 0 { break; } if let Some(stream) = &stream && emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL { let chunk = tmp[..n].to_vec(); let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { call_id: stream.call_id.clone(), stream: if is_stderr { ExecOutputStream::Stderr } else { ExecOutputStream::Stdout }, chunk, }); let event = Event { id: stream.sub_id.clone(), msg, }; #[allow(clippy::let_unit_value)] let _ = stream.tx_event.send(event).await; emitted_deltas += 1; } if let Some(tx) = &aggregate_tx { let _ = tx.send(tmp[..n].to_vec()).await; } append_all(&mut buf, &tmp[..n]); // Continue reading to EOF to avoid back-pressure } Ok(StreamOutput { text: buf, truncated_after_lines: None, }) } #[cfg(unix)] fn synthetic_exit_status(code: i32) -> ExitStatus { use std::os::unix::process::ExitStatusExt; std::process::ExitStatus::from_raw(code) } #[cfg(windows)] fn synthetic_exit_status(code: i32) -> ExitStatus { use std::os::windows::process::ExitStatusExt; #[expect(clippy::unwrap_used)] std::process::ExitStatus::from_raw(code.try_into().unwrap()) } #[cfg(test)] mod tests { use super::*; use std::time::Duration; fn make_exec_output( exit_code: i32, stdout: &str, stderr: &str, aggregated: &str, ) -> ExecToolCallOutput { ExecToolCallOutput { exit_code, stdout: StreamOutput::new(stdout.to_string()), stderr: StreamOutput::new(stderr.to_string()), aggregated_output: StreamOutput::new(aggregated.to_string()), duration: Duration::from_millis(1), timed_out: false, } } #[test] fn sandbox_detection_requires_keywords() { let output = make_exec_output(1, "", "", ""); assert!(!is_likely_sandbox_denied( SandboxType::LinuxSeccomp, &output )); } #[test] fn sandbox_detection_identifies_keyword_in_stderr() { let output = make_exec_output(1, "", "Operation not permitted", ""); assert!(is_likely_sandbox_denied(SandboxType::LinuxSeccomp, &output)); } #[test] fn sandbox_detection_respects_quick_reject_exit_codes() { let output = make_exec_output(127, "", "command not found", ""); assert!(!is_likely_sandbox_denied( SandboxType::LinuxSeccomp, &output )); } #[test] fn sandbox_detection_ignores_non_sandbox_mode() { let output = make_exec_output(1, "", "Operation not permitted", ""); assert!(!is_likely_sandbox_denied(SandboxType::None, &output)); } #[test] fn sandbox_detection_uses_aggregated_output() { let output = make_exec_output( 101, "", "", "cargo failed: Read-only file system when writing target", ); assert!(is_likely_sandbox_denied( SandboxType::MacosSeatbelt, &output )); } #[cfg(unix)] #[test] fn sandbox_detection_flags_sigsys_exit_code() { let exit_code = EXIT_CODE_SIGNAL_BASE + libc::SIGSYS; let output = make_exec_output(exit_code, "", "", ""); assert!(is_likely_sandbox_denied(SandboxType::LinuxSeccomp, &output)); } }