From d766e845b391c4fa4c4c1f1bb0a2c49c6a21e4e1 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Mon, 19 May 2025 16:08:18 -0700 Subject: [PATCH] feat: experimental --output-last-message flag to exec subcommand (#1037) This introduces an experimental `--output-last-message` flag that can be used to identify a file where the final message from the agent will be written. Two use cases: - Ultimately, we will likely add a `--quiet` option to `exec`, but even if the user does not want any output written to the terminal, they probably want to know what the agent did. Writing the output to a file makes it possible to get that information in a clean way. - Relatedly, when using `exec` in CI, it is easier to review the transcript written "normally," (i.e., not as JSON or something with extra escapes), but getting programmatic access to the last message is likely helpful, so writing the last message to a file gets the best of both worlds. I am calling this "experimental" because it is possible that we are overfitting and will want a more general solution to this problem that would justify removing this flag. --- codex-rs/core/src/codex.rs | 12 +++--- codex-rs/core/src/conversation_history.rs | 3 +- codex-rs/core/src/protocol.rs | 7 ++- codex-rs/core/tests/live_agent.rs | 6 +-- codex-rs/core/tests/previous_response_id.rs | 4 +- codex-rs/core/tests/stream_no_completed.rs | 3 +- codex-rs/exec/src/cli.rs | 4 ++ codex-rs/exec/src/event_processor.rs | 5 ++- codex-rs/exec/src/lib.rs | 45 ++++++++++++++++++-- codex-rs/mcp-server/src/codex_tool_runner.rs | 5 ++- codex-rs/tui/src/chatwidget.rs | 5 ++- 11 files changed, 79 insertions(+), 20 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 705b8260..0f914727 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -77,6 +77,7 @@ use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; use crate::protocol::SessionConfiguredEvent; use crate::protocol::Submission; +use crate::protocol::TaskCompleteEvent; use crate::rollout::RolloutRecorder; use crate::safety::SafetyCheck; use crate::safety::assess_command_safety; @@ -766,6 +767,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { } let mut pending_response_input: Vec = vec![ResponseInputItem::from(input)]; + let last_agent_message: Option; loop { let mut net_new_turn_input = pending_response_input .drain(..) @@ -795,7 +797,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { // 2. Update the in-memory transcript so that future turns // include these items as part of the history. - transcript.record_items(net_new_turn_input); + transcript.record_items(&net_new_turn_input); // Note that `transcript.record_items()` does some filtering // such that `full_transcript` may include items that were @@ -830,7 +832,6 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { .into_iter() .flatten() .collect::>(); - let last_assistant_message = get_last_assistant_message_from_turn(&items); // Only attempt to take the lock if there is something to record. if !items.is_empty() { @@ -839,16 +840,17 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { // For ZDR we also need to keep a transcript clone. if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() { - transcript.record_items(items); + transcript.record_items(&items); } } if responses.is_empty() { debug!("Turn completed"); + last_agent_message = get_last_assistant_message_from_turn(&items); sess.maybe_notify(UserNotification::AgentTurnComplete { turn_id: sub_id.clone(), input_messages: turn_input_messages, - last_assistant_message, + last_assistant_message: last_agent_message.clone(), }); break; } @@ -871,7 +873,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { sess.remove_task(&sub_id); let event = Event { id: sub_id, - msg: EventMsg::TaskComplete, + msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }), }; sess.tx_event.send(event).await.ok(); } diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index fdaf8397..52fb1ec4 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -25,7 +25,8 @@ impl ConversationHistory { /// `items` is ordered from oldest to newest. pub(crate) fn record_items(&mut self, items: I) where - I: IntoIterator, + I: IntoIterator, + I::Item: std::ops::Deref, { for item in items { if is_api_message(&item) { diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 658b9a73..2a922cba 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -321,7 +321,7 @@ pub enum EventMsg { TaskStarted, /// Agent has completed all actions - TaskComplete, + TaskComplete(TaskCompleteEvent), /// Agent text output message AgentMessage(AgentMessageEvent), @@ -365,6 +365,11 @@ pub struct ErrorEvent { pub message: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TaskCompleteEvent { + pub last_agent_message: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentMessageEvent { pub message: String, diff --git a/codex-rs/core/tests/live_agent.rs b/codex-rs/core/tests/live_agent.rs index bc5a1105..c21f9d00 100644 --- a/codex-rs/core/tests/live_agent.rs +++ b/codex-rs/core/tests/live_agent.rs @@ -98,7 +98,7 @@ async fn live_streaming_and_prev_id_reset() { match ev.msg { EventMsg::AgentMessage(_) => saw_message_before_complete = true, - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(ErrorEvent { message }) => { panic!("agent reported error in task1: {message}") } @@ -136,7 +136,7 @@ async fn live_streaming_and_prev_id_reset() { { got_expected = true; } - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(ErrorEvent { message }) => { panic!("agent reported error in task2: {message}") } @@ -204,7 +204,7 @@ async fn live_shell_function_call() { assert!(stdout.contains(MARKER)); saw_end_with_output = true; } - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(codex_core::protocol::ErrorEvent { message }) => { panic!("agent error during shell test: {message}") } diff --git a/codex-rs/core/tests/previous_response_id.rs b/codex-rs/core/tests/previous_response_id.rs index c3697a0e..b9c89f35 100644 --- a/codex-rs/core/tests/previous_response_id.rs +++ b/codex-rs/core/tests/previous_response_id.rs @@ -132,7 +132,7 @@ async fn keeps_previous_response_id_between_tasks() { .await .unwrap() .unwrap(); - if matches!(ev.msg, EventMsg::TaskComplete) { + if matches!(ev.msg, EventMsg::TaskComplete(_)) { break; } } @@ -154,7 +154,7 @@ async fn keeps_previous_response_id_between_tasks() { .unwrap() .unwrap(); match ev.msg { - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(ErrorEvent { message }) => { panic!("unexpected error: {message}") } diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index 247464f7..02c03681 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -6,6 +6,7 @@ use std::time::Duration; use codex_core::Codex; use codex_core::ModelProviderInfo; use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; mod test_support; @@ -118,7 +119,7 @@ async fn retries_on_early_close() { .await .unwrap() .unwrap(); - if matches!(ev.msg, codex_core::protocol::EventMsg::TaskComplete) { + if matches!(ev.msg, EventMsg::TaskComplete(_)) { break; } } diff --git a/codex-rs/exec/src/cli.rs b/codex-rs/exec/src/cli.rs index dd72b3e9..4a3d493a 100644 --- a/codex-rs/exec/src/cli.rs +++ b/codex-rs/exec/src/cli.rs @@ -41,6 +41,10 @@ pub struct Cli { #[arg(long = "color", value_enum, default_value_t = Color::Auto)] pub color: Color, + /// Specifies file where the last message from the agent should be written. + #[arg(long = "output-last-message")] + pub last_message_file: Option, + /// Initial instructions for the agent. pub prompt: String, } diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 4c8278cc..65f2204d 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -13,6 +13,7 @@ use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; +use codex_core::protocol::TaskCompleteEvent; use owo_colors::OwoColorize; use owo_colors::Style; use shlex::try_join; @@ -117,7 +118,9 @@ impl EventProcessor { let msg = format!("Task started: {id}"); ts_println!("{}", msg.style(self.dimmed)); } - EventMsg::TaskComplete => { + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) => { let msg = format!("Task complete: {id}"); ts_println!("{}", msg.style(self.bold)); } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 348bff08..d405a2d2 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -2,6 +2,7 @@ mod cli; mod event_processor; use std::io::IsTerminal; +use std::path::Path; use std::sync::Arc; pub use cli::Cli; @@ -14,6 +15,7 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; +use codex_core::protocol::TaskCompleteEvent; use codex_core::util::is_inside_git_repo; use event_processor::EventProcessor; use tracing::debug; @@ -32,6 +34,7 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { skip_git_repo_check, disable_response_storage, color, + last_message_file, prompt, } = cli; @@ -137,7 +140,14 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { let initial_images_event_id = codex.submit(Op::UserInput { items }).await?; info!("Sent images with event ID: {initial_images_event_id}"); while let Ok(event) = codex.next_event().await { - if event.id == initial_images_event_id && matches!(event.msg, EventMsg::TaskComplete) { + if event.id == initial_images_event_id + && matches!( + event.msg, + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) + ) + { break; } } @@ -151,13 +161,40 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { // Run the loop until the task is complete. let mut event_processor = EventProcessor::create_with_ansi(stdout_with_ansi); while let Some(event) = rx.recv().await { - let last_event = - event.id == initial_prompt_task_id && matches!(event.msg, EventMsg::TaskComplete); + let (is_last_event, last_assistant_message) = match &event.msg { + EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => { + (true, last_agent_message.clone()) + } + _ => (false, None), + }; event_processor.process_event(event); - if last_event { + if is_last_event { + handle_last_message(last_assistant_message, last_message_file.as_deref())?; break; } } Ok(()) } + +fn handle_last_message( + last_agent_message: Option, + last_message_file: Option<&Path>, +) -> std::io::Result<()> { + match (last_agent_message, last_message_file) { + (Some(last_agent_message), Some(last_message_file)) => { + // Last message and a file to write to. + std::fs::write(last_message_file, last_agent_message)?; + } + (None, Some(last_message_file)) => { + eprintln!( + "Warning: No last message to write to file: {}", + last_message_file.to_string_lossy() + ); + } + (_, None) => { + // No last message and no file to write to. + } + } + Ok(()) +} diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index f6f6798c..67c990b0 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -9,6 +9,7 @@ use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_core::protocol::TaskCompleteEvent; use mcp_types::CallToolResult; use mcp_types::CallToolResultContent; use mcp_types::JSONRPC_VERSION; @@ -125,7 +126,9 @@ pub async fn run_codex_tool_session( .await; break; } - EventMsg::TaskComplete => { + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) => { let result = if let Some(msg) = last_agent_message { CallToolResult { content: vec![CallToolResultContent::TextContent(TextContent { diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 8ceef95b..189f3994 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -17,6 +17,7 @@ use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; +use codex_core::protocol::TaskCompleteEvent; use crossterm::event::KeyEvent; use ratatui::buffer::Buffer; use ratatui::layout::Constraint; @@ -246,7 +247,9 @@ impl ChatWidget<'_> { self.bottom_pane.set_task_running(true); self.request_redraw(); } - EventMsg::TaskComplete => { + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) => { self.bottom_pane.set_task_running(false); self.request_redraw(); }