From b5349202e96d75b4b84d71e6953a14bcd95f2478 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Thu, 6 Nov 2025 14:14:27 -0800 Subject: [PATCH] Freeform unified exec output formatting (#6233) --- .../core/src/tools/handlers/unified_exec.rs | 57 ++-- codex-rs/core/tests/suite/unified_exec.rs | 268 ++++++++++++++---- 2 files changed, 230 insertions(+), 95 deletions(-) diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index 32ace6c9..593925a7 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -1,8 +1,5 @@ -use std::time::Duration; - use async_trait::async_trait; use serde::Deserialize; -use serde::Serialize; use crate::function_tool::FunctionCallError; use crate::protocol::EventMsg; @@ -163,11 +160,7 @@ impl ToolHandler for UnifiedExecHandler { .await; } - let content = serialize_response(&response).map_err(|err| { - FunctionCallError::RespondToModel(format!( - "failed to serialize unified exec output: {err:?}" - )) - })?; + let content = format_response(&response); Ok(ToolOutput::Function { content, @@ -177,32 +170,30 @@ impl ToolHandler for UnifiedExecHandler { } } -#[derive(Serialize)] -struct SerializedUnifiedExecResponse<'a> { - chunk_id: &'a str, - wall_time_seconds: f64, - output: &'a str, - #[serde(skip_serializing_if = "Option::is_none")] - session_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - exit_code: Option, - #[serde(skip_serializing_if = "Option::is_none")] - original_token_count: Option, -} +fn format_response(response: &UnifiedExecResponse) -> String { + let mut sections = Vec::new(); -fn serialize_response(response: &UnifiedExecResponse) -> Result { - let payload = SerializedUnifiedExecResponse { - chunk_id: &response.chunk_id, - wall_time_seconds: duration_to_seconds(response.wall_time), - output: &response.output, - session_id: response.session_id, - exit_code: response.exit_code, - original_token_count: response.original_token_count, - }; + if !response.chunk_id.is_empty() { + sections.push(format!("Chunk ID: {}", response.chunk_id)); + } - serde_json::to_string(&payload) -} + let wall_time_seconds = response.wall_time.as_secs_f64(); + sections.push(format!("Wall time: {wall_time_seconds:.4} seconds")); -fn duration_to_seconds(duration: Duration) -> f64 { - duration.as_secs_f64() + if let Some(exit_code) = response.exit_code { + sections.push(format!("Process exited with code {exit_code}")); + } + + if let Some(session_id) = response.session_id { + sections.push(format!("Process running with session ID {session_id}")); + } + + if let Some(original_token_count) = response.original_token_count { + sections.push(format!("Original token count: {original_token_count}")); + } + + sections.push("Output:".to_string()); + sections.push(response.output.clone()); + + sections.join("\n") } diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 0e68ec75..3db8050a 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -1,7 +1,8 @@ #![cfg(not(target_os = "windows"))] - use std::collections::HashMap; +use std::sync::OnceLock; +use anyhow::Context; use anyhow::Result; use codex_core::features::Feature; use codex_core::protocol::AskForApproval; @@ -10,6 +11,7 @@ use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::user_input::UserInput; +use core_test_support::assert_regex_match; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; @@ -24,6 +26,7 @@ use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; use core_test_support::wait_for_event_with_timeout; +use regex_lite::Regex; use serde_json::Value; use serde_json::json; @@ -35,7 +38,95 @@ fn extract_output_text(item: &Value) -> Option<&str> { }) } -fn collect_tool_outputs(bodies: &[Value]) -> Result> { +#[derive(Debug)] +struct ParsedUnifiedExecOutput { + chunk_id: Option, + wall_time_seconds: f64, + session_id: Option, + exit_code: Option, + original_token_count: Option, + output: String, +} + +#[allow(clippy::expect_used)] +fn parse_unified_exec_output(raw: &str) -> Result { + static OUTPUT_REGEX: OnceLock = OnceLock::new(); + let regex = OUTPUT_REGEX.get_or_init(|| { + Regex::new(concat!( + r#"(?s)^(?:Total output lines: \d+\n\n)?"#, + r#"(?:Chunk ID: (?P[^\n]+)\n)?"#, + r#"Wall time: (?P-?\d+(?:\.\d+)?) seconds\n"#, + r#"(?:Process exited with code (?P-?\d+)\n)?"#, + r#"(?:Process running with session ID (?P-?\d+)\n)?"#, + r#"(?:Original token count: (?P\d+)\n)?"#, + r#"Output:\n?(?P.*)$"#, + )) + .expect("valid unified exec output regex") + }); + + let cleaned = raw.trim_matches('\r'); + let captures = regex + .captures(cleaned) + .ok_or_else(|| anyhow::anyhow!("missing Output section in unified exec output"))?; + + let chunk_id = captures + .name("chunk_id") + .map(|value| value.as_str().to_string()); + + let wall_time_seconds = captures + .name("wall_time") + .expect("wall_time group present") + .as_str() + .parse::() + .context("failed to parse wall time seconds")?; + + let exit_code = captures + .name("exit_code") + .map(|value| { + value + .as_str() + .parse::() + .context("failed to parse exit code from unified exec output") + }) + .transpose()?; + + let session_id = captures + .name("session_id") + .map(|value| { + value + .as_str() + .parse::() + .context("failed to parse session id from unified exec output") + }) + .transpose()?; + + let original_token_count = captures + .name("original_token_count") + .map(|value| { + value + .as_str() + .parse::() + .context("failed to parse original token count from unified exec output") + }) + .transpose()?; + + let output = captures + .name("output") + .expect("output group present") + .as_str() + .to_string(); + + Ok(ParsedUnifiedExecOutput { + chunk_id, + wall_time_seconds, + session_id, + exit_code, + original_token_count, + output, + }) +} + +fn collect_tool_outputs(bodies: &[Value]) -> Result> { let mut outputs = HashMap::new(); for body in bodies { if let Some(items) = body.get("input").and_then(Value::as_array) { @@ -50,8 +141,8 @@ fn collect_tool_outputs(bodies: &[Value]) -> Result> { if trimmed.is_empty() { continue; } - let parsed: Value = serde_json::from_str(trimmed).map_err(|err| { - anyhow::anyhow!("failed to parse tool output content {trimmed:?}: {err}") + let parsed = parse_unified_exec_output(content).with_context(|| { + format!("failed to parse unified exec output for {call_id}") })?; outputs.insert(call_id.to_string(), parsed); } @@ -556,51 +647,38 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> { .get(call_id) .expect("missing exec_command metadata output"); - let chunk_id = metadata - .get("chunk_id") - .and_then(Value::as_str) - .expect("missing chunk_id"); + let chunk_id = metadata.chunk_id.as_ref().expect("missing chunk_id"); assert_eq!(chunk_id.len(), 6, "chunk id should be 6 hex characters"); assert!( chunk_id.chars().all(|c| c.is_ascii_hexdigit()), "chunk id should be hexadecimal: {chunk_id}" ); - let wall_time = metadata - .get("wall_time_seconds") - .and_then(Value::as_f64) - .unwrap_or_default(); + let wall_time = metadata.wall_time_seconds; assert!( wall_time >= 0.0, "wall_time_seconds should be non-negative, got {wall_time}" ); assert!( - metadata.get("session_id").is_none(), + metadata.session_id.is_none(), "exec_command for a completed process should not include session_id" ); - let exit_code = metadata - .get("exit_code") - .and_then(Value::as_i64) - .expect("expected exit_code"); + let exit_code = metadata.exit_code.expect("expected exit_code"); assert_eq!(exit_code, 0, "expected successful exit"); - let output_text = metadata - .get("output") - .and_then(Value::as_str) - .expect("missing output text"); + let output_text = &metadata.output; assert!( output_text.contains("tokens truncated"), "expected truncation notice in output: {output_text:?}" ); let original_tokens = metadata - .get("original_token_count") - .and_then(Value::as_u64) - .expect("missing original_token_count"); + .original_token_count + .expect("missing original_token_count") as usize; assert!( - original_tokens as usize > 6, + original_tokens > 6, "original token count should exceed max_output_tokens" ); @@ -711,39 +789,34 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { .get(start_call_id) .expect("missing start output for exec_command"); let session_id = start_output - .get("session_id") - .and_then(Value::as_i64) + .session_id .expect("expected session id from exec_command"); assert!( session_id >= 0, "session_id should be non-negative, got {session_id}" ); assert!( - start_output.get("exit_code").is_none(), + start_output.exit_code.is_none(), "initial exec_command should not include exit_code while session is running" ); let send_output = outputs .get(send_call_id) .expect("missing write_stdin echo output"); - let echoed = send_output - .get("output") - .and_then(Value::as_str) - .unwrap_or_default(); + let echoed = send_output.output.as_str(); assert!( echoed.contains("hello unified exec"), "expected echoed output from cat, got {echoed:?}" ); let echoed_session = send_output - .get("session_id") - .and_then(Value::as_i64) + .session_id .expect("write_stdin should return session id while process is running"); assert_eq!( echoed_session, session_id, "write_stdin should reuse existing session id" ); assert!( - send_output.get("exit_code").is_none(), + send_output.exit_code.is_none(), "write_stdin should not include exit_code while process is running" ); @@ -751,18 +824,17 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { .get(exit_call_id) .expect("missing exit metadata output"); assert!( - exit_output.get("session_id").is_none(), + exit_output.session_id.is_none(), "session_id should be omitted once the process exits" ); let exit_code = exit_output - .get("exit_code") - .and_then(Value::as_i64) + .exit_code .expect("expected exit_code after sending EOF"); assert_eq!(exit_code, 0, "cat should exit cleanly after EOF"); let exit_chunk = exit_output - .get("chunk_id") - .and_then(Value::as_str) + .chunk_id + .as_ref() .expect("missing chunk id for exit output"); assert!( exit_chunk.chars().all(|c| c.is_ascii_hexdigit()), @@ -964,26 +1036,18 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { let start_output = outputs .get(first_call_id) .expect("missing first unified_exec output"); - let session_id = start_output["session_id"].as_i64().unwrap_or_default(); + let session_id = start_output.session_id.unwrap_or_default(); assert!( session_id >= 0, "expected session id in first unified_exec response" ); - assert!( - start_output["output"] - .as_str() - .unwrap_or_default() - .is_empty() - ); + assert!(start_output.output.is_empty()); let reuse_output = outputs .get(second_call_id) .expect("missing reused unified_exec output"); - assert_eq!( - reuse_output["session_id"].as_i64().unwrap_or_default(), - session_id - ); - let echoed = reuse_output["output"].as_str().unwrap_or_default(); + assert_eq!(reuse_output.session_id.unwrap_or_default(), session_id); + let echoed = reuse_output.output.as_str(); assert!( echoed.contains("hello unified exec"), "expected echoed output, got {echoed:?}" @@ -1100,7 +1164,7 @@ PY let start_output = outputs .get(first_call_id) .expect("missing initial unified_exec output"); - let session_id = start_output["session_id"].as_i64().unwrap_or_default(); + let session_id = start_output.session_id.unwrap_or_default(); assert!( session_id >= 0, "expected session id from initial unified_exec response" @@ -1109,7 +1173,7 @@ PY let poll_output = outputs .get(second_call_id) .expect("missing poll unified_exec output"); - let poll_text = poll_output["output"].as_str().unwrap_or_default(); + let poll_text = poll_output.output.as_str(); assert!( poll_text.contains("TAIL-MARKER"), "expected poll output to contain tail marker, got {poll_text:?}" @@ -1209,16 +1273,11 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { let outputs = collect_tool_outputs(&bodies)?; let first_output = outputs.get(first_call_id).expect("missing timeout output"); - assert_eq!(first_output["session_id"], 0); - assert!( - first_output["output"] - .as_str() - .unwrap_or_default() - .is_empty() - ); + assert_eq!(first_output.session_id, Some(0)); + assert!(first_output.output.is_empty()); let poll_output = outputs.get(second_call_id).expect("missing poll output"); - let output_text = poll_output["output"].as_str().unwrap_or_default(); + let output_text = poll_output.output.as_str(); assert!( output_text.contains("ready"), "expected ready output, got {output_text:?}" @@ -1226,3 +1285,88 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_formats_large_output_summary() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let script = r#"python3 - <<'PY' +for i in range(300): + print(f"line-{i}") +PY +"#; + + let call_id = "uexec-large-output"; + let args = serde_json::json!({ + "cmd": script, + "yield_time_ms": 500, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "summarize large output".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + let requests = server.received_requests().await.expect("recorded requests"); + assert!(!requests.is_empty(), "expected at least one POST request"); + + let bodies = requests + .iter() + .map(|req| req.body_json::().expect("request json")) + .collect::>(); + + let outputs = collect_tool_outputs(&bodies)?; + let large_output = outputs.get(call_id).expect("missing large output summary"); + + assert_regex_match( + concat!( + r"(?s)", + r"line-0.*?", + r"\[\.{3} omitted \d+ of \d+ lines \.{3}\].*?", + r"line-299", + ), + &large_output.output, + ); + + Ok(()) +}