diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 705b8260..0f914727 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -77,6 +77,7 @@ use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; use crate::protocol::SessionConfiguredEvent; use crate::protocol::Submission; +use crate::protocol::TaskCompleteEvent; use crate::rollout::RolloutRecorder; use crate::safety::SafetyCheck; use crate::safety::assess_command_safety; @@ -766,6 +767,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { } let mut pending_response_input: Vec = vec![ResponseInputItem::from(input)]; + let last_agent_message: Option; loop { let mut net_new_turn_input = pending_response_input .drain(..) @@ -795,7 +797,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { // 2. Update the in-memory transcript so that future turns // include these items as part of the history. - transcript.record_items(net_new_turn_input); + transcript.record_items(&net_new_turn_input); // Note that `transcript.record_items()` does some filtering // such that `full_transcript` may include items that were @@ -830,7 +832,6 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { .into_iter() .flatten() .collect::>(); - let last_assistant_message = get_last_assistant_message_from_turn(&items); // Only attempt to take the lock if there is something to record. if !items.is_empty() { @@ -839,16 +840,17 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { // For ZDR we also need to keep a transcript clone. if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() { - transcript.record_items(items); + transcript.record_items(&items); } } if responses.is_empty() { debug!("Turn completed"); + last_agent_message = get_last_assistant_message_from_turn(&items); sess.maybe_notify(UserNotification::AgentTurnComplete { turn_id: sub_id.clone(), input_messages: turn_input_messages, - last_assistant_message, + last_assistant_message: last_agent_message.clone(), }); break; } @@ -871,7 +873,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { sess.remove_task(&sub_id); let event = Event { id: sub_id, - msg: EventMsg::TaskComplete, + msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }), }; sess.tx_event.send(event).await.ok(); } diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index fdaf8397..52fb1ec4 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -25,7 +25,8 @@ impl ConversationHistory { /// `items` is ordered from oldest to newest. pub(crate) fn record_items(&mut self, items: I) where - I: IntoIterator, + I: IntoIterator, + I::Item: std::ops::Deref, { for item in items { if is_api_message(&item) { diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 658b9a73..2a922cba 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -321,7 +321,7 @@ pub enum EventMsg { TaskStarted, /// Agent has completed all actions - TaskComplete, + TaskComplete(TaskCompleteEvent), /// Agent text output message AgentMessage(AgentMessageEvent), @@ -365,6 +365,11 @@ pub struct ErrorEvent { pub message: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TaskCompleteEvent { + pub last_agent_message: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentMessageEvent { pub message: String, diff --git a/codex-rs/core/tests/live_agent.rs b/codex-rs/core/tests/live_agent.rs index bc5a1105..c21f9d00 100644 --- a/codex-rs/core/tests/live_agent.rs +++ b/codex-rs/core/tests/live_agent.rs @@ -98,7 +98,7 @@ async fn live_streaming_and_prev_id_reset() { match ev.msg { EventMsg::AgentMessage(_) => saw_message_before_complete = true, - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(ErrorEvent { message }) => { panic!("agent reported error in task1: {message}") } @@ -136,7 +136,7 @@ async fn live_streaming_and_prev_id_reset() { { got_expected = true; } - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(ErrorEvent { message }) => { panic!("agent reported error in task2: {message}") } @@ -204,7 +204,7 @@ async fn live_shell_function_call() { assert!(stdout.contains(MARKER)); saw_end_with_output = true; } - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(codex_core::protocol::ErrorEvent { message }) => { panic!("agent error during shell test: {message}") } diff --git a/codex-rs/core/tests/previous_response_id.rs b/codex-rs/core/tests/previous_response_id.rs index c3697a0e..b9c89f35 100644 --- a/codex-rs/core/tests/previous_response_id.rs +++ b/codex-rs/core/tests/previous_response_id.rs @@ -132,7 +132,7 @@ async fn keeps_previous_response_id_between_tasks() { .await .unwrap() .unwrap(); - if matches!(ev.msg, EventMsg::TaskComplete) { + if matches!(ev.msg, EventMsg::TaskComplete(_)) { break; } } @@ -154,7 +154,7 @@ async fn keeps_previous_response_id_between_tasks() { .unwrap() .unwrap(); match ev.msg { - EventMsg::TaskComplete => break, + EventMsg::TaskComplete(_) => break, EventMsg::Error(ErrorEvent { message }) => { panic!("unexpected error: {message}") } diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index 247464f7..02c03681 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -6,6 +6,7 @@ use std::time::Duration; use codex_core::Codex; use codex_core::ModelProviderInfo; use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; mod test_support; @@ -118,7 +119,7 @@ async fn retries_on_early_close() { .await .unwrap() .unwrap(); - if matches!(ev.msg, codex_core::protocol::EventMsg::TaskComplete) { + if matches!(ev.msg, EventMsg::TaskComplete(_)) { break; } } diff --git a/codex-rs/exec/src/cli.rs b/codex-rs/exec/src/cli.rs index dd72b3e9..4a3d493a 100644 --- a/codex-rs/exec/src/cli.rs +++ b/codex-rs/exec/src/cli.rs @@ -41,6 +41,10 @@ pub struct Cli { #[arg(long = "color", value_enum, default_value_t = Color::Auto)] pub color: Color, + /// Specifies file where the last message from the agent should be written. + #[arg(long = "output-last-message")] + pub last_message_file: Option, + /// Initial instructions for the agent. pub prompt: String, } diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 4c8278cc..65f2204d 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -13,6 +13,7 @@ use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; +use codex_core::protocol::TaskCompleteEvent; use owo_colors::OwoColorize; use owo_colors::Style; use shlex::try_join; @@ -117,7 +118,9 @@ impl EventProcessor { let msg = format!("Task started: {id}"); ts_println!("{}", msg.style(self.dimmed)); } - EventMsg::TaskComplete => { + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) => { let msg = format!("Task complete: {id}"); ts_println!("{}", msg.style(self.bold)); } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 348bff08..d405a2d2 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -2,6 +2,7 @@ mod cli; mod event_processor; use std::io::IsTerminal; +use std::path::Path; use std::sync::Arc; pub use cli::Cli; @@ -14,6 +15,7 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; +use codex_core::protocol::TaskCompleteEvent; use codex_core::util::is_inside_git_repo; use event_processor::EventProcessor; use tracing::debug; @@ -32,6 +34,7 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { skip_git_repo_check, disable_response_storage, color, + last_message_file, prompt, } = cli; @@ -137,7 +140,14 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { let initial_images_event_id = codex.submit(Op::UserInput { items }).await?; info!("Sent images with event ID: {initial_images_event_id}"); while let Ok(event) = codex.next_event().await { - if event.id == initial_images_event_id && matches!(event.msg, EventMsg::TaskComplete) { + if event.id == initial_images_event_id + && matches!( + event.msg, + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) + ) + { break; } } @@ -151,13 +161,40 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { // Run the loop until the task is complete. let mut event_processor = EventProcessor::create_with_ansi(stdout_with_ansi); while let Some(event) = rx.recv().await { - let last_event = - event.id == initial_prompt_task_id && matches!(event.msg, EventMsg::TaskComplete); + let (is_last_event, last_assistant_message) = match &event.msg { + EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => { + (true, last_agent_message.clone()) + } + _ => (false, None), + }; event_processor.process_event(event); - if last_event { + if is_last_event { + handle_last_message(last_assistant_message, last_message_file.as_deref())?; break; } } Ok(()) } + +fn handle_last_message( + last_agent_message: Option, + last_message_file: Option<&Path>, +) -> std::io::Result<()> { + match (last_agent_message, last_message_file) { + (Some(last_agent_message), Some(last_message_file)) => { + // Last message and a file to write to. + std::fs::write(last_message_file, last_agent_message)?; + } + (None, Some(last_message_file)) => { + eprintln!( + "Warning: No last message to write to file: {}", + last_message_file.to_string_lossy() + ); + } + (_, None) => { + // No last message and no file to write to. + } + } + Ok(()) +} diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index f6f6798c..67c990b0 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -9,6 +9,7 @@ use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_core::protocol::TaskCompleteEvent; use mcp_types::CallToolResult; use mcp_types::CallToolResultContent; use mcp_types::JSONRPC_VERSION; @@ -125,7 +126,9 @@ pub async fn run_codex_tool_session( .await; break; } - EventMsg::TaskComplete => { + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) => { let result = if let Some(msg) = last_agent_message { CallToolResult { content: vec![CallToolResultContent::TextContent(TextContent { diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 8ceef95b..189f3994 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -17,6 +17,7 @@ use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; +use codex_core::protocol::TaskCompleteEvent; use crossterm::event::KeyEvent; use ratatui::buffer::Buffer; use ratatui::layout::Constraint; @@ -246,7 +247,9 @@ impl ChatWidget<'_> { self.bottom_pane.set_task_running(true); self.request_redraw(); } - EventMsg::TaskComplete => { + EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: _, + }) => { self.bottom_pane.set_task_running(false); self.request_redraw(); }