#[cfg(unix)] use std::os::unix::process::ExitStatusExt; use std::collections::HashMap; use std::io; use std::path::PathBuf; use std::process::ExitStatus; use std::time::Duration; use std::time::Instant; use async_channel::Sender; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::BufReader; use tokio::process::Child; use crate::error::CodexErr; use crate::error::Result; use crate::error::SandboxErr; use crate::landlock::spawn_command_under_linux_sandbox; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecCommandOutputDeltaEvent; use crate::protocol::ExecOutputStream; use crate::protocol::SandboxPolicy; use crate::seatbelt::spawn_command_under_seatbelt; use crate::spawn::StdioPolicy; use crate::spawn::spawn_child_async; const DEFAULT_TIMEOUT_MS: u64 = 10_000; // Hardcode these since it does not seem worth including the libc crate just // for these. const SIGKILL_CODE: i32 = 9; const TIMEOUT_CODE: i32 = 64; const EXIT_CODE_SIGNAL_BASE: i32 = 128; // conventional shell: 128 + signal const EXEC_TIMEOUT_EXIT_CODE: i32 = 124; // conventional timeout exit code // I/O buffer sizing const READ_CHUNK_SIZE: usize = 8192; // bytes per read const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB /// Limit the number of ExecCommandOutputDelta events emitted per exec call. /// Aggregation still collects full output; only the live event stream is capped. pub(crate) const MAX_EXEC_OUTPUT_DELTAS_PER_CALL: usize = 10_000; #[derive(Debug, Clone)] pub struct ExecParams { pub command: Vec, pub cwd: PathBuf, pub timeout_ms: Option, pub env: HashMap, pub with_escalated_permissions: Option, pub justification: Option, } impl ExecParams { pub fn timeout_duration(&self) -> Duration { Duration::from_millis(self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS)) } } #[derive(Clone, Copy, Debug, PartialEq)] pub enum SandboxType { None, /// Only available on macOS. MacosSeatbelt, /// Only available on Linux. LinuxSeccomp, } #[derive(Clone)] pub struct StdoutStream { pub sub_id: String, pub call_id: String, pub tx_event: Sender, } pub async fn process_exec_tool_call( params: ExecParams, sandbox_type: SandboxType, sandbox_policy: &SandboxPolicy, codex_linux_sandbox_exe: &Option, stdout_stream: Option, ) -> Result { let start = Instant::now(); let timeout_duration = params.timeout_duration(); let raw_output_result: std::result::Result = match sandbox_type { SandboxType::None => exec(params, sandbox_policy, stdout_stream.clone()).await, SandboxType::MacosSeatbelt => { let ExecParams { command, cwd, env, .. } = params; let child = spawn_command_under_seatbelt( command, sandbox_policy, cwd, StdioPolicy::RedirectForShellTool, env, ) .await?; consume_truncated_output(child, timeout_duration, stdout_stream.clone()).await } SandboxType::LinuxSeccomp => { let ExecParams { command, cwd, env, .. } = params; let codex_linux_sandbox_exe = codex_linux_sandbox_exe .as_ref() .ok_or(CodexErr::LandlockSandboxExecutableNotProvided)?; let child = spawn_command_under_linux_sandbox( codex_linux_sandbox_exe, command, sandbox_policy, cwd, StdioPolicy::RedirectForShellTool, env, ) .await?; consume_truncated_output(child, timeout_duration, stdout_stream).await } }; let duration = start.elapsed(); match raw_output_result { Ok(raw_output) => { #[allow(unused_mut)] let mut timed_out = raw_output.timed_out; #[cfg(target_family = "unix")] { if let Some(signal) = raw_output.exit_status.signal() { if signal == TIMEOUT_CODE { timed_out = true; } else { return Err(CodexErr::Sandbox(SandboxErr::Signal(signal))); } } } let mut exit_code = raw_output.exit_status.code().unwrap_or(-1); if timed_out { exit_code = EXEC_TIMEOUT_EXIT_CODE; } let stdout = raw_output.stdout.from_utf8_lossy(); let stderr = raw_output.stderr.from_utf8_lossy(); let aggregated_output = raw_output.aggregated_output.from_utf8_lossy(); let exec_output = ExecToolCallOutput { exit_code, stdout, stderr, aggregated_output, duration, timed_out, }; if timed_out { return Err(CodexErr::Sandbox(SandboxErr::Timeout { output: Box::new(exec_output), })); } if exit_code != 0 && is_likely_sandbox_denied(sandbox_type, exit_code) { return Err(CodexErr::Sandbox(SandboxErr::Denied { output: Box::new(exec_output), })); } Ok(exec_output) } Err(err) => { tracing::error!("exec error: {err}"); Err(err) } } } /// We don't have a fully deterministic way to tell if our command failed /// because of the sandbox - a command in the user's zshrc file might hit an /// error, but the command itself might fail or succeed for other reasons. /// For now, we conservatively check for 'command not found' (exit code 127), /// and can add additional cases as necessary. fn is_likely_sandbox_denied(sandbox_type: SandboxType, exit_code: i32) -> bool { if sandbox_type == SandboxType::None { return false; } // Quick rejects: well-known non-sandbox shell exit codes // 127: command not found, 2: misuse of shell builtins if exit_code == 127 { return false; } // For all other cases, we assume the sandbox is the cause true } #[derive(Debug)] pub struct StreamOutput { pub text: T, pub truncated_after_lines: Option, } #[derive(Debug)] struct RawExecToolCallOutput { pub exit_status: ExitStatus, pub stdout: StreamOutput>, pub stderr: StreamOutput>, pub aggregated_output: StreamOutput>, pub timed_out: bool, } impl StreamOutput { pub fn new(text: String) -> Self { Self { text, truncated_after_lines: None, } } } impl StreamOutput> { pub fn from_utf8_lossy(&self) -> StreamOutput { StreamOutput { text: String::from_utf8_lossy(&self.text).to_string(), truncated_after_lines: self.truncated_after_lines, } } } #[inline] fn append_all(dst: &mut Vec, src: &[u8]) { dst.extend_from_slice(src); } #[derive(Debug)] pub struct ExecToolCallOutput { pub exit_code: i32, pub stdout: StreamOutput, pub stderr: StreamOutput, pub aggregated_output: StreamOutput, pub duration: Duration, pub timed_out: bool, } async fn exec( params: ExecParams, sandbox_policy: &SandboxPolicy, stdout_stream: Option, ) -> Result { let timeout = params.timeout_duration(); let ExecParams { command, cwd, env, .. } = params; let (program, args) = command.split_first().ok_or_else(|| { CodexErr::Io(io::Error::new( io::ErrorKind::InvalidInput, "command args are empty", )) })?; let arg0 = None; let child = spawn_child_async( PathBuf::from(program), args.into(), arg0, cwd, sandbox_policy, StdioPolicy::RedirectForShellTool, env, ) .await?; consume_truncated_output(child, timeout, stdout_stream).await } /// Consumes the output of a child process, truncating it so it is suitable for /// use as the output of a `shell` tool call. Also enforces specified timeout. async fn consume_truncated_output( mut child: Child, timeout: Duration, stdout_stream: Option, ) -> Result { // Both stdout and stderr were configured with `Stdio::piped()` // above, therefore `take()` should normally return `Some`. If it doesn't // we treat it as an exceptional I/O error let stdout_reader = child.stdout.take().ok_or_else(|| { CodexErr::Io(io::Error::other( "stdout pipe was unexpectedly not available", )) })?; let stderr_reader = child.stderr.take().ok_or_else(|| { CodexErr::Io(io::Error::other( "stderr pipe was unexpectedly not available", )) })?; let (agg_tx, agg_rx) = async_channel::unbounded::>(); let stdout_handle = tokio::spawn(read_capped( BufReader::new(stdout_reader), stdout_stream.clone(), false, Some(agg_tx.clone()), )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(stderr_reader), stdout_stream.clone(), true, Some(agg_tx.clone()), )); let (exit_status, timed_out) = tokio::select! { result = tokio::time::timeout(timeout, child.wait()) => { match result { Ok(status_result) => { let exit_status = status_result?; (exit_status, false) } Err(_) => { // timeout child.start_kill()?; // Debatable whether `child.wait().await` should be called here. (synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true) } } } _ = tokio::signal::ctrl_c() => { child.start_kill()?; (synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false) } }; let stdout = stdout_handle.await??; let stderr = stderr_handle.await??; drop(agg_tx); let mut combined_buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); while let Ok(chunk) = agg_rx.recv().await { append_all(&mut combined_buf, &chunk); } let aggregated_output = StreamOutput { text: combined_buf, truncated_after_lines: None, }; Ok(RawExecToolCallOutput { exit_status, stdout, stderr, aggregated_output, timed_out, }) } async fn read_capped( mut reader: R, stream: Option, is_stderr: bool, aggregate_tx: Option>>, ) -> io::Result>> { let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); let mut tmp = [0u8; READ_CHUNK_SIZE]; let mut emitted_deltas: usize = 0; // No caps: append all bytes loop { let n = reader.read(&mut tmp).await?; if n == 0 { break; } if let Some(stream) = &stream && emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL { let chunk = tmp[..n].to_vec(); let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { call_id: stream.call_id.clone(), stream: if is_stderr { ExecOutputStream::Stderr } else { ExecOutputStream::Stdout }, chunk, }); let event = Event { id: stream.sub_id.clone(), msg, }; #[allow(clippy::let_unit_value)] let _ = stream.tx_event.send(event).await; emitted_deltas += 1; } if let Some(tx) = &aggregate_tx { let _ = tx.send(tmp[..n].to_vec()).await; } append_all(&mut buf, &tmp[..n]); // Continue reading to EOF to avoid back-pressure } Ok(StreamOutput { text: buf, truncated_after_lines: None, }) } #[cfg(unix)] fn synthetic_exit_status(code: i32) -> ExitStatus { use std::os::unix::process::ExitStatusExt; std::process::ExitStatus::from_raw(code) } #[cfg(windows)] fn synthetic_exit_status(code: i32) -> ExitStatus { use std::os::windows::process::ExitStatusExt; #[expect(clippy::unwrap_used)] std::process::ExitStatus::from_raw(code.try_into().unwrap()) }