diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ec053ce4..a7f8e7c3 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -147,6 +147,14 @@ pub struct CodexSpawnOk { } pub(crate) const INITIAL_SUBMIT_ID: &str = ""; +pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 64; + +// Model-formatting limits: clients get full streams; oonly content sent to the model is truncated. +pub(crate) const MODEL_FORMAT_MAX_BYTES: usize = 10 * 1024; // 10 KiB +pub(crate) const MODEL_FORMAT_MAX_LINES: usize = 256; // lines +pub(crate) const MODEL_FORMAT_HEAD_LINES: usize = MODEL_FORMAT_MAX_LINES / 2; +pub(crate) const MODEL_FORMAT_TAIL_LINES: usize = MODEL_FORMAT_MAX_LINES - MODEL_FORMAT_HEAD_LINES; // 128 +pub(crate) const MODEL_FORMAT_HEAD_BYTES: usize = MODEL_FORMAT_MAX_BYTES / 2; impl Codex { /// Spawn a new [`Codex`] and initialize the session. @@ -155,7 +163,7 @@ impl Codex { auth_manager: Arc, initial_history: Option>, ) -> CodexResult { - let (tx_sub, rx_sub) = async_channel::bounded(64); + let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); let user_instructions = get_user_instructions(&config).await; @@ -728,15 +736,15 @@ impl Session { let ExecToolCallOutput { stdout, stderr, + aggregated_output, duration, exit_code, } = output; - // Because stdout and stderr could each be up to 100 KiB, we send - // truncated versions. - const MAX_STREAM_OUTPUT: usize = 5 * 1024; // 5KiB - let stdout = stdout.text.chars().take(MAX_STREAM_OUTPUT).collect(); - let stderr = stderr.text.chars().take(MAX_STREAM_OUTPUT).collect(); + // Send full stdout/stderr to clients; do not truncate. + let stdout = stdout.text.clone(); + let stderr = stderr.text.clone(); let formatted_output = format_exec_output_str(output); + let aggregated_output: String = aggregated_output.text.clone(); let msg = if is_apply_patch { EventMsg::PatchApplyEnd(PatchApplyEndEvent { @@ -750,9 +758,10 @@ impl Session { call_id: call_id.to_string(), stdout, stderr, - formatted_output, - duration: *duration, + aggregated_output, exit_code: *exit_code, + duration: *duration, + formatted_output, }) }; @@ -810,6 +819,7 @@ impl Session { exit_code: -1, stdout: StreamOutput::new(String::new()), stderr: StreamOutput::new(get_error_message_ui(e)), + aggregated_output: StreamOutput::new(get_error_message_ui(e)), duration: Duration::default(), }; &output_stderr @@ -2604,23 +2614,103 @@ async fn handle_sandbox_error( fn format_exec_output_str(exec_output: &ExecToolCallOutput) -> String { let ExecToolCallOutput { - exit_code, - stdout, - stderr, - .. + aggregated_output, .. } = exec_output; - let is_success = *exit_code == 0; - let output = if is_success { stdout } else { stderr }; + // Head+tail truncation for the model: show the beginning and end with an elision. + // Clients still receive full streams; only this formatted summary is capped. - let mut formatted_output = output.text.clone(); - if let Some(truncated_after_lines) = output.truncated_after_lines { - formatted_output.push_str(&format!( - "\n\n[Output truncated after {truncated_after_lines} lines: too many lines or bytes.]", - )); + let s = aggregated_output.text.as_str(); + let total_lines = s.lines().count(); + if s.len() <= MODEL_FORMAT_MAX_BYTES && total_lines <= MODEL_FORMAT_MAX_LINES { + return s.to_string(); } - formatted_output + let lines: Vec<&str> = s.lines().collect(); + let head_take = MODEL_FORMAT_HEAD_LINES.min(lines.len()); + let tail_take = MODEL_FORMAT_TAIL_LINES.min(lines.len().saturating_sub(head_take)); + let omitted = lines.len().saturating_sub(head_take + tail_take); + + // Join head and tail blocks (lines() strips newlines; reinsert them) + let head_block = lines + .iter() + .take(head_take) + .cloned() + .collect::>() + .join("\n"); + let tail_block = if tail_take > 0 { + lines[lines.len() - tail_take..].join("\n") + } else { + String::new() + }; + let marker = format!("\n[... omitted {omitted} of {total_lines} lines ...]\n\n"); + + // Byte budgets for head/tail around the marker + let mut head_budget = MODEL_FORMAT_HEAD_BYTES.min(MODEL_FORMAT_MAX_BYTES); + let tail_budget = MODEL_FORMAT_MAX_BYTES.saturating_sub(head_budget + marker.len()); + if tail_budget == 0 && marker.len() >= MODEL_FORMAT_MAX_BYTES { + // Degenerate case: marker alone exceeds budget; return a clipped marker + return take_bytes_at_char_boundary(&marker, MODEL_FORMAT_MAX_BYTES).to_string(); + } + if tail_budget == 0 { + // Make room for the marker by shrinking head + head_budget = MODEL_FORMAT_MAX_BYTES.saturating_sub(marker.len()); + } + + // Enforce line-count cap by trimming head/tail lines + let head_lines_text = head_block; + let tail_lines_text = tail_block; + // Build final string respecting byte budgets + let head_part = take_bytes_at_char_boundary(&head_lines_text, head_budget); + let mut result = String::with_capacity(MODEL_FORMAT_MAX_BYTES.min(s.len())); + result.push_str(head_part); + result.push_str(&marker); + + let remaining = MODEL_FORMAT_MAX_BYTES.saturating_sub(result.len()); + let tail_budget_final = remaining; + let tail_part = take_last_bytes_at_char_boundary(&tail_lines_text, tail_budget_final); + result.push_str(tail_part); + + result +} + +// Truncate a &str to a byte budget at a char boundary (prefix) +#[inline] +fn take_bytes_at_char_boundary(s: &str, maxb: usize) -> &str { + if s.len() <= maxb { + return s; + } + let mut last_ok = 0; + for (i, ch) in s.char_indices() { + let nb = i + ch.len_utf8(); + if nb > maxb { + break; + } + last_ok = nb; + } + &s[..last_ok] +} + +// Take a suffix of a &str within a byte budget at a char boundary +#[inline] +fn take_last_bytes_at_char_boundary(s: &str, maxb: usize) -> &str { + if s.len() <= maxb { + return s; + } + let mut start = s.len(); + let mut used = 0usize; + for (i, ch) in s.char_indices().rev() { + let nb = ch.len_utf8(); + if used + nb > maxb { + break; + } + start = i; + used += nb; + if start == 0 { + break; + } + } + &s[start..] } /// Exec output is a pre-serialized JSON payload @@ -2771,6 +2861,7 @@ mod tests { use mcp_types::TextContent; use pretty_assertions::assert_eq; use serde_json::json; + use std::time::Duration as StdDuration; fn text_block(s: &str) -> ContentBlock { ContentBlock::TextContent(TextContent { @@ -2805,6 +2896,82 @@ mod tests { assert_eq!(expected, got); } + #[test] + fn model_truncation_head_tail_by_lines() { + // Build 400 short lines so line-count limit, not byte budget, triggers truncation + let lines: Vec = (1..=400).map(|i| format!("line{i}")).collect(); + let full = lines.join("\n"); + + let exec = ExecToolCallOutput { + exit_code: 0, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(String::new()), + aggregated_output: StreamOutput::new(full.clone()), + duration: StdDuration::from_secs(1), + }; + + let out = format_exec_output_str(&exec); + + // Expect elision marker with correct counts + let omitted = 400 - MODEL_FORMAT_MAX_LINES; // 144 + let marker = format!("\n[... omitted {omitted} of 400 lines ...]\n\n"); + assert!(out.contains(&marker), "missing marker: {out}"); + + // Validate head and tail + let parts: Vec<&str> = out.split(&marker).collect(); + assert_eq!(parts.len(), 2, "expected one marker split"); + let head = parts[0]; + let tail = parts[1]; + + let expected_head: String = (1..=MODEL_FORMAT_HEAD_LINES) + .map(|i| format!("line{i}")) + .collect::>() + .join("\n"); + assert!(head.starts_with(&expected_head), "head mismatch"); + + let expected_tail: String = ((400 - MODEL_FORMAT_TAIL_LINES + 1)..=400) + .map(|i| format!("line{i}")) + .collect::>() + .join("\n"); + assert!(tail.ends_with(&expected_tail), "tail mismatch"); + } + + #[test] + fn model_truncation_respects_byte_budget() { + // Construct a large output (about 100kB) so byte budget dominates + let big_line = "x".repeat(100); + let full = std::iter::repeat_n(big_line.clone(), 1000) + .collect::>() + .join("\n"); + + let exec = ExecToolCallOutput { + exit_code: 0, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(String::new()), + aggregated_output: StreamOutput::new(full.clone()), + duration: StdDuration::from_secs(1), + }; + + let out = format_exec_output_str(&exec); + assert!(out.len() <= MODEL_FORMAT_MAX_BYTES, "exceeds byte budget"); + assert!(out.contains("omitted"), "should contain elision marker"); + + // Ensure head and tail are drawn from the original + assert!(full.starts_with(out.chars().take(8).collect::().as_str())); + assert!( + full.ends_with( + out.chars() + .rev() + .take(8) + .collect::() + .chars() + .rev() + .collect::() + .as_str() + ) + ); + } + #[test] fn falls_back_to_content_when_structured_is_null() { let ctr = CallToolResult { diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index 9430433c..d74ec9fc 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -28,18 +28,17 @@ use crate::spawn::StdioPolicy; use crate::spawn::spawn_child_async; use serde_bytes::ByteBuf; -// Maximum we send for each stream, which is either: -// - 10KiB OR -// - 256 lines -const MAX_STREAM_OUTPUT: usize = 10 * 1024; -const MAX_STREAM_OUTPUT_LINES: usize = 256; - const DEFAULT_TIMEOUT_MS: u64 = 10_000; // Hardcode these since it does not seem worth including the libc crate just // for these. const SIGKILL_CODE: i32 = 9; const TIMEOUT_CODE: i32 = 64; +const EXIT_CODE_SIGNAL_BASE: i32 = 128; // conventional shell: 128 + signal + +// I/O buffer sizing +const READ_CHUNK_SIZE: usize = 8192; // bytes per read +const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB #[derive(Debug, Clone)] pub struct ExecParams { @@ -153,6 +152,7 @@ pub async fn process_exec_tool_call( exit_code, stdout, stderr, + aggregated_output: raw_output.aggregated_output.from_utf8_lossy(), duration, }) } @@ -189,10 +189,11 @@ pub struct StreamOutput { pub truncated_after_lines: Option, } #[derive(Debug)] -pub struct RawExecToolCallOutput { +struct RawExecToolCallOutput { pub exit_status: ExitStatus, pub stdout: StreamOutput>, pub stderr: StreamOutput>, + pub aggregated_output: StreamOutput>, } impl StreamOutput { @@ -213,11 +214,17 @@ impl StreamOutput> { } } +#[inline] +fn append_all(dst: &mut Vec, src: &[u8]) { + dst.extend_from_slice(src); +} + #[derive(Debug)] pub struct ExecToolCallOutput { pub exit_code: i32, pub stdout: StreamOutput, pub stderr: StreamOutput, + pub aggregated_output: StreamOutput, pub duration: Duration, } @@ -253,7 +260,7 @@ async fn exec( /// Consumes the output of a child process, truncating it so it is suitable for /// use as the output of a `shell` tool call. Also enforces specified timeout. -pub(crate) async fn consume_truncated_output( +async fn consume_truncated_output( mut child: Child, timeout: Duration, stdout_stream: Option, @@ -273,19 +280,19 @@ pub(crate) async fn consume_truncated_output( )) })?; + let (agg_tx, agg_rx) = async_channel::unbounded::>(); + let stdout_handle = tokio::spawn(read_capped( BufReader::new(stdout_reader), - MAX_STREAM_OUTPUT, - MAX_STREAM_OUTPUT_LINES, stdout_stream.clone(), false, + Some(agg_tx.clone()), )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(stderr_reader), - MAX_STREAM_OUTPUT, - MAX_STREAM_OUTPUT_LINES, stdout_stream.clone(), true, + Some(agg_tx.clone()), )); let exit_status = tokio::select! { @@ -297,38 +304,48 @@ pub(crate) async fn consume_truncated_output( // timeout child.start_kill()?; // Debatable whether `child.wait().await` should be called here. - synthetic_exit_status(128 + TIMEOUT_CODE) + synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE) } } } _ = tokio::signal::ctrl_c() => { child.start_kill()?; - synthetic_exit_status(128 + SIGKILL_CODE) + synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE) } }; let stdout = stdout_handle.await??; let stderr = stderr_handle.await??; + drop(agg_tx); + + let mut combined_buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); + while let Ok(chunk) = agg_rx.recv().await { + append_all(&mut combined_buf, &chunk); + } + let aggregated_output = StreamOutput { + text: combined_buf, + truncated_after_lines: None, + }; + Ok(RawExecToolCallOutput { exit_status, stdout, stderr, + aggregated_output, }) } async fn read_capped( mut reader: R, - max_output: usize, - max_lines: usize, stream: Option, is_stderr: bool, + aggregate_tx: Option>>, ) -> io::Result>> { - let mut buf = Vec::with_capacity(max_output.min(8 * 1024)); - let mut tmp = [0u8; 8192]; + let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); + let mut tmp = [0u8; READ_CHUNK_SIZE]; - let mut remaining_bytes = max_output; - let mut remaining_lines = max_lines; + // No caps: append all bytes loop { let n = reader.read(&mut tmp).await?; @@ -355,33 +372,17 @@ async fn read_capped( let _ = stream.tx_event.send(event).await; } - // Copy into the buffer only while we still have byte and line budget. - if remaining_bytes > 0 && remaining_lines > 0 { - let mut copy_len = 0; - for &b in &tmp[..n] { - if remaining_bytes == 0 || remaining_lines == 0 { - break; - } - copy_len += 1; - remaining_bytes -= 1; - if b == b'\n' { - remaining_lines -= 1; - } - } - buf.extend_from_slice(&tmp[..copy_len]); + if let Some(tx) = &aggregate_tx { + let _ = tx.send(tmp[..n].to_vec()).await; } - // Continue reading to EOF to avoid back-pressure, but discard once caps are hit. - } - let truncated = remaining_lines == 0 || remaining_bytes == 0; + append_all(&mut buf, &tmp[..n]); + // Continue reading to EOF to avoid back-pressure + } Ok(StreamOutput { text: buf, - truncated_after_lines: if truncated { - Some((max_lines - remaining_lines) as u32) - } else { - None - }, + truncated_after_lines: None, }) } diff --git a/codex-rs/core/tests/exec.rs b/codex-rs/core/tests/exec.rs index f011aa7a..9e0cffe6 100644 --- a/codex-rs/core/tests/exec.rs +++ b/codex-rs/core/tests/exec.rs @@ -70,12 +70,12 @@ async fn truncates_output_lines() { let output = run_test_cmd(tmp, cmd).await.unwrap(); - let expected_output = (1..=256) + let expected_output = (1..=300) .map(|i| format!("{i}\n")) .collect::>() .join(""); assert_eq!(output.stdout.text, expected_output); - assert_eq!(output.stdout.truncated_after_lines, Some(256)); + assert_eq!(output.stdout.truncated_after_lines, None); } /// Command succeeds with exit code 0 normally @@ -91,8 +91,8 @@ async fn truncates_output_bytes() { let output = run_test_cmd(tmp, cmd).await.unwrap(); - assert_eq!(output.stdout.text.len(), 10240); - assert_eq!(output.stdout.truncated_after_lines, Some(10)); + assert!(output.stdout.text.len() >= 15000); + assert_eq!(output.stdout.truncated_after_lines, None); } /// Command not found returns exit code 127, this is not considered a sandbox error diff --git a/codex-rs/core/tests/exec_stream_events.rs b/codex-rs/core/tests/exec_stream_events.rs index 85d4eb37..521823d2 100644 --- a/codex-rs/core/tests/exec_stream_events.rs +++ b/codex-rs/core/tests/exec_stream_events.rs @@ -139,3 +139,34 @@ async fn test_exec_stderr_stream_events_echo() { } assert_eq!(String::from_utf8_lossy(&err), "oops\n"); } + +#[tokio::test] +async fn test_aggregated_output_interleaves_in_order() { + // Spawn a shell that alternates stdout and stderr with sleeps to enforce order. + let cmd = vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "printf 'O1\\n'; sleep 0.01; printf 'E1\\n' 1>&2; sleep 0.01; printf 'O2\\n'; sleep 0.01; printf 'E2\\n' 1>&2".to_string(), + ]; + + let params = ExecParams { + command: cmd, + cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")), + timeout_ms: Some(5_000), + env: HashMap::new(), + with_escalated_permissions: None, + justification: None, + }; + + let policy = SandboxPolicy::new_read_only_policy(); + + let result = process_exec_tool_call(params, SandboxType::None, &policy, &None, None) + .await + .expect("process_exec_tool_call"); + + assert_eq!(result.exit_code, 0); + assert_eq!(result.stdout.text, "O1\nO2\n"); + assert_eq!(result.stderr.text, "E1\nE2\n"); + assert_eq!(result.aggregated_output.text, "O1\nE1\nO2\nE2\n"); + assert_eq!(result.aggregated_output.truncated_after_lines, None); +} diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 0f3b56b4..0f7e14ea 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -287,8 +287,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { EventMsg::ExecCommandOutputDelta(_) => {} EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id, - stdout, - stderr, + aggregated_output, duration, exit_code, .. @@ -304,8 +303,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { ("".to_string(), format!("exec('{call_id}')")) }; - let output = if exit_code == 0 { stdout } else { stderr }; - let truncated_output = output + let truncated_output = aggregated_output .lines() .take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL) .collect::>() diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index e803324a..7e7708b2 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -685,6 +685,9 @@ pub struct ExecCommandEndEvent { pub stdout: String, /// Captured stderr pub stderr: String, + /// Captured aggregated output + #[serde(default)] + pub aggregated_output: String, /// The command's exit code. pub exit_code: i32, /// The duration of the command execution. diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 04dd3ad3..dfb80a69 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -263,6 +263,7 @@ fn exec_history_cell_shows_working_then_completed() { call_id: "call-1".into(), stdout: "done".into(), stderr: String::new(), + aggregated_output: "done".into(), exit_code: 0, duration: std::time::Duration::from_millis(5), formatted_output: "done".into(), @@ -313,6 +314,7 @@ fn exec_history_cell_shows_working_then_failed() { call_id: "call-2".into(), stdout: String::new(), stderr: "error".into(), + aggregated_output: "error".into(), exit_code: 2, duration: std::time::Duration::from_millis(7), formatted_output: "".into(), @@ -361,6 +363,7 @@ fn exec_history_extends_previous_when_consecutive() { call_id: "call-a".into(), stdout: "one".into(), stderr: String::new(), + aggregated_output: "one".into(), exit_code: 0, duration: std::time::Duration::from_millis(5), formatted_output: "one".into(), @@ -390,6 +393,7 @@ fn exec_history_extends_previous_when_consecutive() { call_id: "call-b".into(), stdout: "two".into(), stderr: String::new(), + aggregated_output: "two".into(), exit_code: 0, duration: std::time::Duration::from_millis(5), formatted_output: "two".into(), diff --git a/codex-rs/tui/tests/fixtures/ideal-binary-response.txt b/codex-rs/tui/tests/fixtures/ideal-binary-response.txt index 62d6af57..56ed4a46 100644 --- a/codex-rs/tui/tests/fixtures/ideal-binary-response.txt +++ b/codex-rs/tui/tests/fixtures/ideal-binary-response.txt @@ -9,7 +9,7 @@ codex I’m going to scan the workspace and Cargo manifests to see build profiles and dependencies that impact binary size. Then I’ll summarize the main causes. ->_ +_ ✓ ls -la └ total 6696 drwxr-xr-x@ 39 easong staff 1248 Aug 9 08:49 . @@ -205,4 +205,4 @@ assertions—outputs are much larger than cargo build --release. If you want, I can outline targeted trims (e.g., strip = "debuginfo", opt-level = "z", panic abort, tighter tokio/reqwest features) and estimate impact per -binary. +binary. \ No newline at end of file