Freeform unified exec output formatting (#6233)
This commit is contained in:
@@ -1,8 +1,5 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::protocol::EventMsg;
|
||||
@@ -163,11 +160,7 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
.await;
|
||||
}
|
||||
|
||||
let content = serialize_response(&response).map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to serialize unified exec output: {err:?}"
|
||||
))
|
||||
})?;
|
||||
let content = format_response(&response);
|
||||
|
||||
Ok(ToolOutput::Function {
|
||||
content,
|
||||
@@ -177,32 +170,30 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SerializedUnifiedExecResponse<'a> {
|
||||
chunk_id: &'a str,
|
||||
wall_time_seconds: f64,
|
||||
output: &'a str,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
session_id: Option<i32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
exit_code: Option<i32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
original_token_count: Option<usize>,
|
||||
}
|
||||
fn format_response(response: &UnifiedExecResponse) -> String {
|
||||
let mut sections = Vec::new();
|
||||
|
||||
fn serialize_response(response: &UnifiedExecResponse) -> Result<String, serde_json::Error> {
|
||||
let payload = SerializedUnifiedExecResponse {
|
||||
chunk_id: &response.chunk_id,
|
||||
wall_time_seconds: duration_to_seconds(response.wall_time),
|
||||
output: &response.output,
|
||||
session_id: response.session_id,
|
||||
exit_code: response.exit_code,
|
||||
original_token_count: response.original_token_count,
|
||||
};
|
||||
if !response.chunk_id.is_empty() {
|
||||
sections.push(format!("Chunk ID: {}", response.chunk_id));
|
||||
}
|
||||
|
||||
serde_json::to_string(&payload)
|
||||
}
|
||||
let wall_time_seconds = response.wall_time.as_secs_f64();
|
||||
sections.push(format!("Wall time: {wall_time_seconds:.4} seconds"));
|
||||
|
||||
fn duration_to_seconds(duration: Duration) -> f64 {
|
||||
duration.as_secs_f64()
|
||||
if let Some(exit_code) = response.exit_code {
|
||||
sections.push(format!("Process exited with code {exit_code}"));
|
||||
}
|
||||
|
||||
if let Some(session_id) = response.session_id {
|
||||
sections.push(format!("Process running with session ID {session_id}"));
|
||||
}
|
||||
|
||||
if let Some(original_token_count) = response.original_token_count {
|
||||
sections.push(format!("Original token count: {original_token_count}"));
|
||||
}
|
||||
|
||||
sections.push("Output:".to_string());
|
||||
sections.push(response.output.clone());
|
||||
|
||||
sections.join("\n")
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
#![cfg(not(target_os = "windows"))]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
@@ -10,6 +11,7 @@ use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::assert_regex_match;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
@@ -24,6 +26,7 @@ use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use regex_lite::Regex;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
|
||||
@@ -35,7 +38,95 @@ fn extract_output_text(item: &Value) -> Option<&str> {
|
||||
})
|
||||
}
|
||||
|
||||
fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, Value>> {
|
||||
#[derive(Debug)]
|
||||
struct ParsedUnifiedExecOutput {
|
||||
chunk_id: Option<String>,
|
||||
wall_time_seconds: f64,
|
||||
session_id: Option<i32>,
|
||||
exit_code: Option<i32>,
|
||||
original_token_count: Option<usize>,
|
||||
output: String,
|
||||
}
|
||||
|
||||
#[allow(clippy::expect_used)]
|
||||
fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
|
||||
static OUTPUT_REGEX: OnceLock<Regex> = OnceLock::new();
|
||||
let regex = OUTPUT_REGEX.get_or_init(|| {
|
||||
Regex::new(concat!(
|
||||
r#"(?s)^(?:Total output lines: \d+\n\n)?"#,
|
||||
r#"(?:Chunk ID: (?P<chunk_id>[^\n]+)\n)?"#,
|
||||
r#"Wall time: (?P<wall_time>-?\d+(?:\.\d+)?) seconds\n"#,
|
||||
r#"(?:Process exited with code (?P<exit_code>-?\d+)\n)?"#,
|
||||
r#"(?:Process running with session ID (?P<session_id>-?\d+)\n)?"#,
|
||||
r#"(?:Original token count: (?P<original_token_count>\d+)\n)?"#,
|
||||
r#"Output:\n?(?P<output>.*)$"#,
|
||||
))
|
||||
.expect("valid unified exec output regex")
|
||||
});
|
||||
|
||||
let cleaned = raw.trim_matches('\r');
|
||||
let captures = regex
|
||||
.captures(cleaned)
|
||||
.ok_or_else(|| anyhow::anyhow!("missing Output section in unified exec output"))?;
|
||||
|
||||
let chunk_id = captures
|
||||
.name("chunk_id")
|
||||
.map(|value| value.as_str().to_string());
|
||||
|
||||
let wall_time_seconds = captures
|
||||
.name("wall_time")
|
||||
.expect("wall_time group present")
|
||||
.as_str()
|
||||
.parse::<f64>()
|
||||
.context("failed to parse wall time seconds")?;
|
||||
|
||||
let exit_code = captures
|
||||
.name("exit_code")
|
||||
.map(|value| {
|
||||
value
|
||||
.as_str()
|
||||
.parse::<i32>()
|
||||
.context("failed to parse exit code from unified exec output")
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let session_id = captures
|
||||
.name("session_id")
|
||||
.map(|value| {
|
||||
value
|
||||
.as_str()
|
||||
.parse::<i32>()
|
||||
.context("failed to parse session id from unified exec output")
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let original_token_count = captures
|
||||
.name("original_token_count")
|
||||
.map(|value| {
|
||||
value
|
||||
.as_str()
|
||||
.parse::<usize>()
|
||||
.context("failed to parse original token count from unified exec output")
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let output = captures
|
||||
.name("output")
|
||||
.expect("output group present")
|
||||
.as_str()
|
||||
.to_string();
|
||||
|
||||
Ok(ParsedUnifiedExecOutput {
|
||||
chunk_id,
|
||||
wall_time_seconds,
|
||||
session_id,
|
||||
exit_code,
|
||||
original_token_count,
|
||||
output,
|
||||
})
|
||||
}
|
||||
|
||||
fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, ParsedUnifiedExecOutput>> {
|
||||
let mut outputs = HashMap::new();
|
||||
for body in bodies {
|
||||
if let Some(items) = body.get("input").and_then(Value::as_array) {
|
||||
@@ -50,8 +141,8 @@ fn collect_tool_outputs(bodies: &[Value]) -> Result<HashMap<String, Value>> {
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let parsed: Value = serde_json::from_str(trimmed).map_err(|err| {
|
||||
anyhow::anyhow!("failed to parse tool output content {trimmed:?}: {err}")
|
||||
let parsed = parse_unified_exec_output(content).with_context(|| {
|
||||
format!("failed to parse unified exec output for {call_id}")
|
||||
})?;
|
||||
outputs.insert(call_id.to_string(), parsed);
|
||||
}
|
||||
@@ -556,51 +647,38 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
|
||||
.get(call_id)
|
||||
.expect("missing exec_command metadata output");
|
||||
|
||||
let chunk_id = metadata
|
||||
.get("chunk_id")
|
||||
.and_then(Value::as_str)
|
||||
.expect("missing chunk_id");
|
||||
let chunk_id = metadata.chunk_id.as_ref().expect("missing chunk_id");
|
||||
assert_eq!(chunk_id.len(), 6, "chunk id should be 6 hex characters");
|
||||
assert!(
|
||||
chunk_id.chars().all(|c| c.is_ascii_hexdigit()),
|
||||
"chunk id should be hexadecimal: {chunk_id}"
|
||||
);
|
||||
|
||||
let wall_time = metadata
|
||||
.get("wall_time_seconds")
|
||||
.and_then(Value::as_f64)
|
||||
.unwrap_or_default();
|
||||
let wall_time = metadata.wall_time_seconds;
|
||||
assert!(
|
||||
wall_time >= 0.0,
|
||||
"wall_time_seconds should be non-negative, got {wall_time}"
|
||||
);
|
||||
|
||||
assert!(
|
||||
metadata.get("session_id").is_none(),
|
||||
metadata.session_id.is_none(),
|
||||
"exec_command for a completed process should not include session_id"
|
||||
);
|
||||
|
||||
let exit_code = metadata
|
||||
.get("exit_code")
|
||||
.and_then(Value::as_i64)
|
||||
.expect("expected exit_code");
|
||||
let exit_code = metadata.exit_code.expect("expected exit_code");
|
||||
assert_eq!(exit_code, 0, "expected successful exit");
|
||||
|
||||
let output_text = metadata
|
||||
.get("output")
|
||||
.and_then(Value::as_str)
|
||||
.expect("missing output text");
|
||||
let output_text = &metadata.output;
|
||||
assert!(
|
||||
output_text.contains("tokens truncated"),
|
||||
"expected truncation notice in output: {output_text:?}"
|
||||
);
|
||||
|
||||
let original_tokens = metadata
|
||||
.get("original_token_count")
|
||||
.and_then(Value::as_u64)
|
||||
.expect("missing original_token_count");
|
||||
.original_token_count
|
||||
.expect("missing original_token_count") as usize;
|
||||
assert!(
|
||||
original_tokens as usize > 6,
|
||||
original_tokens > 6,
|
||||
"original token count should exceed max_output_tokens"
|
||||
);
|
||||
|
||||
@@ -711,39 +789,34 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
|
||||
.get(start_call_id)
|
||||
.expect("missing start output for exec_command");
|
||||
let session_id = start_output
|
||||
.get("session_id")
|
||||
.and_then(Value::as_i64)
|
||||
.session_id
|
||||
.expect("expected session id from exec_command");
|
||||
assert!(
|
||||
session_id >= 0,
|
||||
"session_id should be non-negative, got {session_id}"
|
||||
);
|
||||
assert!(
|
||||
start_output.get("exit_code").is_none(),
|
||||
start_output.exit_code.is_none(),
|
||||
"initial exec_command should not include exit_code while session is running"
|
||||
);
|
||||
|
||||
let send_output = outputs
|
||||
.get(send_call_id)
|
||||
.expect("missing write_stdin echo output");
|
||||
let echoed = send_output
|
||||
.get("output")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or_default();
|
||||
let echoed = send_output.output.as_str();
|
||||
assert!(
|
||||
echoed.contains("hello unified exec"),
|
||||
"expected echoed output from cat, got {echoed:?}"
|
||||
);
|
||||
let echoed_session = send_output
|
||||
.get("session_id")
|
||||
.and_then(Value::as_i64)
|
||||
.session_id
|
||||
.expect("write_stdin should return session id while process is running");
|
||||
assert_eq!(
|
||||
echoed_session, session_id,
|
||||
"write_stdin should reuse existing session id"
|
||||
);
|
||||
assert!(
|
||||
send_output.get("exit_code").is_none(),
|
||||
send_output.exit_code.is_none(),
|
||||
"write_stdin should not include exit_code while process is running"
|
||||
);
|
||||
|
||||
@@ -751,18 +824,17 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
|
||||
.get(exit_call_id)
|
||||
.expect("missing exit metadata output");
|
||||
assert!(
|
||||
exit_output.get("session_id").is_none(),
|
||||
exit_output.session_id.is_none(),
|
||||
"session_id should be omitted once the process exits"
|
||||
);
|
||||
let exit_code = exit_output
|
||||
.get("exit_code")
|
||||
.and_then(Value::as_i64)
|
||||
.exit_code
|
||||
.expect("expected exit_code after sending EOF");
|
||||
assert_eq!(exit_code, 0, "cat should exit cleanly after EOF");
|
||||
|
||||
let exit_chunk = exit_output
|
||||
.get("chunk_id")
|
||||
.and_then(Value::as_str)
|
||||
.chunk_id
|
||||
.as_ref()
|
||||
.expect("missing chunk id for exit output");
|
||||
assert!(
|
||||
exit_chunk.chars().all(|c| c.is_ascii_hexdigit()),
|
||||
@@ -964,26 +1036,18 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
|
||||
let start_output = outputs
|
||||
.get(first_call_id)
|
||||
.expect("missing first unified_exec output");
|
||||
let session_id = start_output["session_id"].as_i64().unwrap_or_default();
|
||||
let session_id = start_output.session_id.unwrap_or_default();
|
||||
assert!(
|
||||
session_id >= 0,
|
||||
"expected session id in first unified_exec response"
|
||||
);
|
||||
assert!(
|
||||
start_output["output"]
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.is_empty()
|
||||
);
|
||||
assert!(start_output.output.is_empty());
|
||||
|
||||
let reuse_output = outputs
|
||||
.get(second_call_id)
|
||||
.expect("missing reused unified_exec output");
|
||||
assert_eq!(
|
||||
reuse_output["session_id"].as_i64().unwrap_or_default(),
|
||||
session_id
|
||||
);
|
||||
let echoed = reuse_output["output"].as_str().unwrap_or_default();
|
||||
assert_eq!(reuse_output.session_id.unwrap_or_default(), session_id);
|
||||
let echoed = reuse_output.output.as_str();
|
||||
assert!(
|
||||
echoed.contains("hello unified exec"),
|
||||
"expected echoed output, got {echoed:?}"
|
||||
@@ -1100,7 +1164,7 @@ PY
|
||||
let start_output = outputs
|
||||
.get(first_call_id)
|
||||
.expect("missing initial unified_exec output");
|
||||
let session_id = start_output["session_id"].as_i64().unwrap_or_default();
|
||||
let session_id = start_output.session_id.unwrap_or_default();
|
||||
assert!(
|
||||
session_id >= 0,
|
||||
"expected session id from initial unified_exec response"
|
||||
@@ -1109,7 +1173,7 @@ PY
|
||||
let poll_output = outputs
|
||||
.get(second_call_id)
|
||||
.expect("missing poll unified_exec output");
|
||||
let poll_text = poll_output["output"].as_str().unwrap_or_default();
|
||||
let poll_text = poll_output.output.as_str();
|
||||
assert!(
|
||||
poll_text.contains("TAIL-MARKER"),
|
||||
"expected poll output to contain tail marker, got {poll_text:?}"
|
||||
@@ -1209,16 +1273,11 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
|
||||
let first_output = outputs.get(first_call_id).expect("missing timeout output");
|
||||
assert_eq!(first_output["session_id"], 0);
|
||||
assert!(
|
||||
first_output["output"]
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.is_empty()
|
||||
);
|
||||
assert_eq!(first_output.session_id, Some(0));
|
||||
assert!(first_output.output.is_empty());
|
||||
|
||||
let poll_output = outputs.get(second_call_id).expect("missing poll output");
|
||||
let output_text = poll_output["output"].as_str().unwrap_or_default();
|
||||
let output_text = poll_output.output.as_str();
|
||||
assert!(
|
||||
output_text.contains("ready"),
|
||||
"expected ready output, got {output_text:?}"
|
||||
@@ -1226,3 +1285,88 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_formats_large_output_summary() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let script = r#"python3 - <<'PY'
|
||||
for i in range(300):
|
||||
print(f"line-{i}")
|
||||
PY
|
||||
"#;
|
||||
|
||||
let call_id = "uexec-large-output";
|
||||
let args = serde_json::json!({
|
||||
"cmd": script,
|
||||
"yield_time_ms": 500,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "summarize large output".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
let requests = server.received_requests().await.expect("recorded requests");
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
|
||||
let bodies = requests
|
||||
.iter()
|
||||
.map(|req| req.body_json::<Value>().expect("request json"))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
let large_output = outputs.get(call_id).expect("missing large output summary");
|
||||
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)",
|
||||
r"line-0.*?",
|
||||
r"\[\.{3} omitted \d+ of \d+ lines \.{3}\].*?",
|
||||
r"line-299",
|
||||
),
|
||||
&large_output.output,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user