diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 9123bd63..460a440e 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -695,6 +695,7 @@ dependencies = [ "reqwest", "seccompiler", "serde", + "serde_bytes", "serde_json", "sha1", "shlex", @@ -3951,6 +3952,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.219" diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index ecc904cd..db3fd4f8 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -31,6 +31,7 @@ rand = "0.9" reqwest = { version = "0.12", features = ["json", "stream"] } serde = { version = "1", features = ["derive"] } serde_json = "1" +serde_bytes = "0.11" sha1 = "0.10.6" shlex = "1.3.0" strum_macros = "0.27.2" diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5d48cd45..6e21642d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -48,6 +48,7 @@ use crate::error::SandboxErr; use crate::exec::ExecParams; use crate::exec::ExecToolCallOutput; use crate::exec::SandboxType; +use crate::exec::StdoutStream; use crate::exec::process_exec_tool_call; use crate::exec_env::create_env; use crate::mcp_connection_manager::McpConnectionManager; @@ -1759,6 +1760,11 @@ async fn handle_container_exec_with_params( sess.ctrl_c.clone(), &sess.sandbox_policy, &sess.codex_linux_sandbox_exe, + Some(StdoutStream { + sub_id: sub_id.clone(), + call_id: call_id.clone(), + tx_event: sess.tx_event.clone(), + }), ) .await; @@ -1879,6 +1885,11 @@ async fn handle_sandbox_error( sess.ctrl_c.clone(), &sess.sandbox_policy, &sess.codex_linux_sandbox_exe, + Some(StdoutStream { + sub_id: sub_id.clone(), + call_id: call_id.clone(), + tx_event: sess.tx_event.clone(), + }), ) .await; diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index 06416e67..a5a88ee9 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use async_channel::Sender; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::BufReader; @@ -19,10 +20,15 @@ use tokio::sync::Notify; use crate::error::CodexErr; use crate::error::Result; use crate::error::SandboxErr; +use crate::protocol::Event; +use crate::protocol::EventMsg; +use crate::protocol::ExecCommandStderrDeltaEvent; +use crate::protocol::ExecCommandStdoutDeltaEvent; use crate::protocol::SandboxPolicy; use crate::seatbelt::spawn_command_under_seatbelt; use crate::spawn::StdioPolicy; use crate::spawn::spawn_child_async; +use serde_bytes::ByteBuf; // Maximum we send for each stream, which is either: // - 10KiB OR @@ -56,18 +62,26 @@ pub enum SandboxType { LinuxSeccomp, } +#[derive(Clone)] +pub struct StdoutStream { + pub sub_id: String, + pub call_id: String, + pub tx_event: Sender, +} + pub async fn process_exec_tool_call( params: ExecParams, sandbox_type: SandboxType, ctrl_c: Arc, sandbox_policy: &SandboxPolicy, codex_linux_sandbox_exe: &Option, + stdout_stream: Option, ) -> Result { let start = Instant::now(); let raw_output_result: std::result::Result = match sandbox_type { - SandboxType::None => exec(params, sandbox_policy, ctrl_c).await, + SandboxType::None => exec(params, sandbox_policy, ctrl_c, stdout_stream.clone()).await, SandboxType::MacosSeatbelt => { let ExecParams { command, @@ -83,7 +97,7 @@ pub async fn process_exec_tool_call( env, ) .await?; - consume_truncated_output(child, ctrl_c, timeout_ms).await + consume_truncated_output(child, ctrl_c, timeout_ms, stdout_stream.clone()).await } SandboxType::LinuxSeccomp => { let ExecParams { @@ -106,7 +120,7 @@ pub async fn process_exec_tool_call( ) .await?; - consume_truncated_output(child, ctrl_c, timeout_ms).await + consume_truncated_output(child, ctrl_c, timeout_ms, stdout_stream).await } }; let duration = start.elapsed(); @@ -233,6 +247,7 @@ async fn exec( }: ExecParams, sandbox_policy: &SandboxPolicy, ctrl_c: Arc, + stdout_stream: Option, ) -> Result { let (program, args) = command.split_first().ok_or_else(|| { CodexErr::Io(io::Error::new( @@ -251,7 +266,7 @@ async fn exec( env, ) .await?; - consume_truncated_output(child, ctrl_c, timeout_ms).await + consume_truncated_output(child, ctrl_c, timeout_ms, stdout_stream).await } /// Consumes the output of a child process, truncating it so it is suitable for @@ -260,6 +275,7 @@ pub(crate) async fn consume_truncated_output( mut child: Child, ctrl_c: Arc, timeout_ms: Option, + stdout_stream: Option, ) -> Result { // Both stdout and stderr were configured with `Stdio::piped()` // above, therefore `take()` should normally return `Some`. If it doesn't @@ -280,11 +296,15 @@ pub(crate) async fn consume_truncated_output( BufReader::new(stdout_reader), MAX_STREAM_OUTPUT, MAX_STREAM_OUTPUT_LINES, + stdout_stream.clone(), + false, )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(stderr_reader), MAX_STREAM_OUTPUT, MAX_STREAM_OUTPUT_LINES, + stdout_stream.clone(), + true, )); let interrupted = ctrl_c.notified(); @@ -318,10 +338,12 @@ pub(crate) async fn consume_truncated_output( }) } -async fn read_capped( +async fn read_capped( mut reader: R, max_output: usize, max_lines: usize, + stream: Option, + is_stderr: bool, ) -> io::Result> { let mut buf = Vec::with_capacity(max_output.min(8 * 1024)); let mut tmp = [0u8; 8192]; @@ -335,6 +357,27 @@ async fn read_capped( break; } + if let Some(stream) = &stream { + let chunk = tmp[..n].to_vec(); + let msg = if is_stderr { + EventMsg::ExecCommandStderrDelta(ExecCommandStderrDeltaEvent { + call_id: stream.call_id.clone(), + chunk: ByteBuf::from(chunk), + }) + } else { + EventMsg::ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent { + call_id: stream.call_id.clone(), + chunk: ByteBuf::from(chunk), + }) + }; + let event = Event { + id: stream.sub_id.clone(), + msg, + }; + #[allow(clippy::let_unit_value)] + let _ = stream.tx_event.send(event).await; + } + // Copy into the buffer only while we still have byte and line budget. if remaining_bytes > 0 && remaining_lines > 0 { let mut copy_len = 0; diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 1b4832a2..8d26eda9 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -13,6 +13,7 @@ use std::time::Duration; use mcp_types::CallToolResult; use serde::Deserialize; use serde::Serialize; +use serde_bytes::ByteBuf; use strum_macros::Display; use uuid::Uuid; @@ -323,6 +324,12 @@ pub enum EventMsg { /// Notification that the server is about to execute a command. ExecCommandBegin(ExecCommandBeginEvent), + /// Incremental chunk of stdout from a running command. + ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent), + + /// Incremental chunk of stderr from a running command. + ExecCommandStderrDelta(ExecCommandStderrDeltaEvent), + ExecCommandEnd(ExecCommandEndEvent), ExecApprovalRequest(ExecApprovalRequestEvent), @@ -476,6 +483,24 @@ pub struct ExecCommandEndEvent { pub exit_code: i32, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ExecCommandStdoutDeltaEvent { + /// Identifier for the ExecCommandBegin that produced this chunk. + pub call_id: String, + /// Raw stdout bytes (may not be valid UTF-8). + #[serde(with = "serde_bytes")] + pub chunk: ByteBuf, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ExecCommandStderrDeltaEvent { + /// Identifier for the ExecCommandBegin that produced this chunk. + pub call_id: String, + /// Raw stderr bytes (may not be valid UTF-8). + #[serde(with = "serde_bytes")] + pub chunk: ByteBuf, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ExecApprovalRequestEvent { /// Identifier for the associated exec call, if available. diff --git a/codex-rs/core/src/shell.rs b/codex-rs/core/src/shell.rs index 98addffc..1e895a37 100644 --- a/codex-rs/core/src/shell.rs +++ b/codex-rs/core/src/shell.rs @@ -220,6 +220,7 @@ mod tests { Arc::new(Notify::new()), &SandboxPolicy::DangerFullAccess, &None, + None, ) .await .unwrap(); diff --git a/codex-rs/core/tests/exec_stream_events.rs b/codex-rs/core/tests/exec_stream_events.rs new file mode 100644 index 00000000..059b69a0 --- /dev/null +++ b/codex-rs/core/tests/exec_stream_events.rs @@ -0,0 +1,135 @@ +#![cfg(unix)] + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use async_channel::Receiver; +use codex_core::exec::ExecParams; +use codex_core::exec::SandboxType; +use codex_core::exec::StdoutStream; +use codex_core::exec::process_exec_tool_call; +use codex_core::protocol::Event; +use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecCommandStderrDeltaEvent; +use codex_core::protocol::ExecCommandStdoutDeltaEvent; +use codex_core::protocol::SandboxPolicy; +use tokio::sync::Notify; + +fn collect_stdout_events(rx: Receiver) -> Vec { + let mut out = Vec::new(); + while let Ok(ev) = rx.try_recv() { + if let EventMsg::ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent { chunk, .. }) = ev.msg + { + out.extend_from_slice(&chunk); + } + } + out +} + +#[tokio::test] +async fn test_exec_stdout_stream_events_echo() { + let (tx, rx) = async_channel::unbounded::(); + + let stdout_stream = StdoutStream { + sub_id: "test-sub".to_string(), + call_id: "call-1".to_string(), + tx_event: tx, + }; + + let cmd = vec![ + "/bin/sh".to_string(), + "-c".to_string(), + // Use printf for predictable behavior across shells + "printf 'hello-world\n'".to_string(), + ]; + + let params = ExecParams { + command: cmd, + cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")), + timeout_ms: Some(5_000), + env: HashMap::new(), + }; + + let ctrl_c = Arc::new(Notify::new()); + let policy = SandboxPolicy::new_read_only_policy(); + + let result = process_exec_tool_call( + params, + SandboxType::None, + ctrl_c, + &policy, + &None, + Some(stdout_stream), + ) + .await; + + let result = match result { + Ok(r) => r, + Err(e) => panic!("process_exec_tool_call failed: {e}"), + }; + + assert_eq!(result.exit_code, 0); + assert_eq!(result.stdout, "hello-world\n"); + + let streamed = collect_stdout_events(rx); + // We should have received at least the same contents (possibly in one chunk) + assert_eq!(String::from_utf8_lossy(&streamed), "hello-world\n"); +} + +#[tokio::test] +async fn test_exec_stderr_stream_events_echo() { + let (tx, rx) = async_channel::unbounded::(); + + let stdout_stream = StdoutStream { + sub_id: "test-sub".to_string(), + call_id: "call-2".to_string(), + tx_event: tx, + }; + + let cmd = vec![ + "/bin/sh".to_string(), + "-c".to_string(), + // Write to stderr explicitly + "printf 'oops\n' 1>&2".to_string(), + ]; + + let params = ExecParams { + command: cmd, + cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")), + timeout_ms: Some(5_000), + env: HashMap::new(), + }; + + let ctrl_c = Arc::new(Notify::new()); + let policy = SandboxPolicy::new_read_only_policy(); + + let result = process_exec_tool_call( + params, + SandboxType::None, + ctrl_c, + &policy, + &None, + Some(stdout_stream), + ) + .await; + + let result = match result { + Ok(r) => r, + Err(e) => panic!("process_exec_tool_call failed: {e}"), + }; + + assert_eq!(result.exit_code, 0); + assert_eq!(result.stdout, ""); + assert_eq!(result.stderr, "oops\n"); + + // Collect only stderr delta events + let mut err = Vec::new(); + while let Ok(ev) = rx.try_recv() { + if let EventMsg::ExecCommandStderrDelta(ExecCommandStderrDeltaEvent { chunk, .. }) = ev.msg + { + err.extend_from_slice(&chunk); + } + } + assert_eq!(String::from_utf8_lossy(&err), "oops\n"); +} diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 54604d53..05643c62 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -239,6 +239,8 @@ impl EventProcessor for EventProcessorWithHumanOutput { cwd.to_string_lossy(), ); } + EventMsg::ExecCommandStdoutDelta(_) => {} + EventMsg::ExecCommandStderrDelta(_) => {} EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id, stdout, diff --git a/codex-rs/linux-sandbox/tests/landlock.rs b/codex-rs/linux-sandbox/tests/landlock.rs index 7eacda46..c16e9227 100644 --- a/codex-rs/linux-sandbox/tests/landlock.rs +++ b/codex-rs/linux-sandbox/tests/landlock.rs @@ -59,6 +59,7 @@ async fn run_cmd(cmd: &[&str], writable_roots: &[PathBuf], timeout_ms: u64) { ctrl_c, &sandbox_policy, &codex_linux_sandbox_exe, + None, ) .await .unwrap(); @@ -149,6 +150,7 @@ async fn assert_network_blocked(cmd: &[&str]) { ctrl_c, &sandbox_policy, &codex_linux_sandbox_exe, + None, ) .await; diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index f25659b2..94d47f74 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -258,6 +258,8 @@ async fn run_codex_tool_session_inner( | EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) | EventMsg::ExecCommandBegin(_) + | EventMsg::ExecCommandStdoutDelta(_) + | EventMsg::ExecCommandStderrDelta(_) | EventMsg::ExecCommandEnd(_) | EventMsg::BackgroundEvent(_) | EventMsg::PatchApplyBegin(_) diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index c777af52..d6454c82 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -374,6 +374,7 @@ impl ChatWidget<'_> { ); self.add_to_history(HistoryCell::new_active_exec_command(command)); } + EventMsg::ExecCommandStdoutDelta(_) => {} EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: _, auto_approved,