feat: experimental --output-last-message flag to exec subcommand (#1037)

This introduces an experimental `--output-last-message` flag that can be
used to identify a file where the final message from the agent will be
written. Two use cases:

- Ultimately, we will likely add a `--quiet` option to `exec`, but even
if the user does not want any output written to the terminal, they
probably want to know what the agent did. Writing the output to a file
makes it possible to get that information in a clean way.
- Relatedly, when using `exec` in CI, it is easier to review the
transcript written "normally," (i.e., not as JSON or something with
extra escapes), but getting programmatic access to the last message is
likely helpful, so writing the last message to a file gets the best of
both worlds.

I am calling this "experimental" because it is possible that we are
overfitting and will want a more general solution to this problem that
would justify removing this flag.
This commit is contained in:
Michael Bolin
2025-05-19 16:08:18 -07:00
committed by GitHub
parent a4bfdf6779
commit d766e845b3
11 changed files with 79 additions and 20 deletions

View File

@@ -77,6 +77,7 @@ use crate::protocol::ReviewDecision;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent;
use crate::protocol::Submission;
use crate::protocol::TaskCompleteEvent;
use crate::rollout::RolloutRecorder;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
@@ -766,6 +767,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
}
let mut pending_response_input: Vec<ResponseInputItem> = vec![ResponseInputItem::from(input)];
let last_agent_message: Option<String>;
loop {
let mut net_new_turn_input = pending_response_input
.drain(..)
@@ -795,7 +797,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
// 2. Update the in-memory transcript so that future turns
// include these items as part of the history.
transcript.record_items(net_new_turn_input);
transcript.record_items(&net_new_turn_input);
// Note that `transcript.record_items()` does some filtering
// such that `full_transcript` may include items that were
@@ -830,7 +832,6 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
.into_iter()
.flatten()
.collect::<Vec<ResponseInputItem>>();
let last_assistant_message = get_last_assistant_message_from_turn(&items);
// Only attempt to take the lock if there is something to record.
if !items.is_empty() {
@@ -839,16 +840,17 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
// For ZDR we also need to keep a transcript clone.
if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() {
transcript.record_items(items);
transcript.record_items(&items);
}
}
if responses.is_empty() {
debug!("Turn completed");
last_agent_message = get_last_assistant_message_from_turn(&items);
sess.maybe_notify(UserNotification::AgentTurnComplete {
turn_id: sub_id.clone(),
input_messages: turn_input_messages,
last_assistant_message,
last_assistant_message: last_agent_message.clone(),
});
break;
}
@@ -871,7 +873,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
sess.remove_task(&sub_id);
let event = Event {
id: sub_id,
msg: EventMsg::TaskComplete,
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
};
sess.tx_event.send(event).await.ok();
}

View File

@@ -25,7 +25,8 @@ impl ConversationHistory {
/// `items` is ordered from oldest to newest.
pub(crate) fn record_items<I>(&mut self, items: I)
where
I: IntoIterator<Item = ResponseItem>,
I: IntoIterator,
I::Item: std::ops::Deref<Target = ResponseItem>,
{
for item in items {
if is_api_message(&item) {

View File

@@ -321,7 +321,7 @@ pub enum EventMsg {
TaskStarted,
/// Agent has completed all actions
TaskComplete,
TaskComplete(TaskCompleteEvent),
/// Agent text output message
AgentMessage(AgentMessageEvent),
@@ -365,6 +365,11 @@ pub struct ErrorEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TaskCompleteEvent {
pub last_agent_message: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentMessageEvent {
pub message: String,

View File

@@ -98,7 +98,7 @@ async fn live_streaming_and_prev_id_reset() {
match ev.msg {
EventMsg::AgentMessage(_) => saw_message_before_complete = true,
EventMsg::TaskComplete => break,
EventMsg::TaskComplete(_) => break,
EventMsg::Error(ErrorEvent { message }) => {
panic!("agent reported error in task1: {message}")
}
@@ -136,7 +136,7 @@ async fn live_streaming_and_prev_id_reset() {
{
got_expected = true;
}
EventMsg::TaskComplete => break,
EventMsg::TaskComplete(_) => break,
EventMsg::Error(ErrorEvent { message }) => {
panic!("agent reported error in task2: {message}")
}
@@ -204,7 +204,7 @@ async fn live_shell_function_call() {
assert!(stdout.contains(MARKER));
saw_end_with_output = true;
}
EventMsg::TaskComplete => break,
EventMsg::TaskComplete(_) => break,
EventMsg::Error(codex_core::protocol::ErrorEvent { message }) => {
panic!("agent error during shell test: {message}")
}

View File

@@ -132,7 +132,7 @@ async fn keeps_previous_response_id_between_tasks() {
.await
.unwrap()
.unwrap();
if matches!(ev.msg, EventMsg::TaskComplete) {
if matches!(ev.msg, EventMsg::TaskComplete(_)) {
break;
}
}
@@ -154,7 +154,7 @@ async fn keeps_previous_response_id_between_tasks() {
.unwrap()
.unwrap();
match ev.msg {
EventMsg::TaskComplete => break,
EventMsg::TaskComplete(_) => break,
EventMsg::Error(ErrorEvent { message }) => {
panic!("unexpected error: {message}")
}

View File

@@ -6,6 +6,7 @@ use std::time::Duration;
use codex_core::Codex;
use codex_core::ModelProviderInfo;
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
mod test_support;
@@ -118,7 +119,7 @@ async fn retries_on_early_close() {
.await
.unwrap()
.unwrap();
if matches!(ev.msg, codex_core::protocol::EventMsg::TaskComplete) {
if matches!(ev.msg, EventMsg::TaskComplete(_)) {
break;
}
}

View File

@@ -41,6 +41,10 @@ pub struct Cli {
#[arg(long = "color", value_enum, default_value_t = Color::Auto)]
pub color: Color,
/// Specifies file where the last message from the agent should be written.
#[arg(long = "output-last-message")]
pub last_message_file: Option<PathBuf>,
/// Initial instructions for the agent.
pub prompt: String,
}

View File

@@ -13,6 +13,7 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
@@ -117,7 +118,9 @@ impl EventProcessor {
let msg = format!("Task started: {id}");
ts_println!("{}", msg.style(self.dimmed));
}
EventMsg::TaskComplete => {
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => {
let msg = format!("Task complete: {id}");
ts_println!("{}", msg.style(self.bold));
}

View File

@@ -2,6 +2,7 @@ mod cli;
mod event_processor;
use std::io::IsTerminal;
use std::path::Path;
use std::sync::Arc;
pub use cli::Cli;
@@ -14,6 +15,7 @@ use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::util::is_inside_git_repo;
use event_processor::EventProcessor;
use tracing::debug;
@@ -32,6 +34,7 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> {
skip_git_repo_check,
disable_response_storage,
color,
last_message_file,
prompt,
} = cli;
@@ -137,7 +140,14 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> {
let initial_images_event_id = codex.submit(Op::UserInput { items }).await?;
info!("Sent images with event ID: {initial_images_event_id}");
while let Ok(event) = codex.next_event().await {
if event.id == initial_images_event_id && matches!(event.msg, EventMsg::TaskComplete) {
if event.id == initial_images_event_id
&& matches!(
event.msg,
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
})
)
{
break;
}
}
@@ -151,13 +161,40 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> {
// Run the loop until the task is complete.
let mut event_processor = EventProcessor::create_with_ansi(stdout_with_ansi);
while let Some(event) = rx.recv().await {
let last_event =
event.id == initial_prompt_task_id && matches!(event.msg, EventMsg::TaskComplete);
let (is_last_event, last_assistant_message) = match &event.msg {
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
(true, last_agent_message.clone())
}
_ => (false, None),
};
event_processor.process_event(event);
if last_event {
if is_last_event {
handle_last_message(last_assistant_message, last_message_file.as_deref())?;
break;
}
}
Ok(())
}
fn handle_last_message(
last_agent_message: Option<String>,
last_message_file: Option<&Path>,
) -> std::io::Result<()> {
match (last_agent_message, last_message_file) {
(Some(last_agent_message), Some(last_message_file)) => {
// Last message and a file to write to.
std::fs::write(last_message_file, last_agent_message)?;
}
(None, Some(last_message_file)) => {
eprintln!(
"Warning: No last message to write to file: {}",
last_message_file.to_string_lossy()
);
}
(_, None) => {
// No last message and no file to write to.
}
}
Ok(())
}

View File

@@ -9,6 +9,7 @@ use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::TaskCompleteEvent;
use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent;
use mcp_types::JSONRPC_VERSION;
@@ -125,7 +126,9 @@ pub async fn run_codex_tool_session(
.await;
break;
}
EventMsg::TaskComplete => {
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => {
let result = if let Some(msg) = last_agent_message {
CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {

View File

@@ -17,6 +17,7 @@ use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::TaskCompleteEvent;
use crossterm::event::KeyEvent;
use ratatui::buffer::Buffer;
use ratatui::layout::Constraint;
@@ -246,7 +247,9 @@ impl ChatWidget<'_> {
self.bottom_pane.set_task_running(true);
self.request_redraw();
}
EventMsg::TaskComplete => {
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => {
self.bottom_pane.set_task_running(false);
self.request_redraw();
}