Flaky CI fix (#1647)

Flushing before sending `TaskCompleteEvent` and ending the submission
loop to avoid race conditions.
This commit is contained in:
aibrahim-oai
2025-07-23 15:03:26 -07:00
committed by GitHub
parent 084236f717
commit b4ab7c1b73
10 changed files with 153 additions and 47 deletions

View File

@@ -812,6 +812,37 @@ async fn submission_loop(
} }
}); });
} }
Op::Shutdown => {
info!("Shutting down Codex instance");
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
if let Some(sess_arc) = sess {
let recorder_opt = sess_arc.rollout.lock().unwrap().take();
if let Some(rec) = recorder_opt {
if let Err(e) = rec.shutdown().await {
warn!("failed to shutdown rollout recorder: {e}");
let event = Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send error message: {e:?}");
}
}
}
}
let event = Event {
id: sub.id.clone(),
msg: EventMsg::ShutdownComplete,
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send Shutdown event: {e}");
}
break;
}
} }
} }
debug!("Agent loop exited"); debug!("Agent loop exited");

View File

@@ -116,6 +116,9 @@ pub enum Op {
/// Request a single history entry identified by `log_id` + `offset`. /// Request a single history entry identified by `log_id` + `offset`.
GetHistoryEntryRequest { offset: usize, log_id: u64 }, GetHistoryEntryRequest { offset: usize, log_id: u64 },
/// Request to shut down codex instance.
Shutdown,
} }
/// Determines the conditions under which the user is consulted to approve /// Determines the conditions under which the user is consulted to approve
@@ -326,6 +329,9 @@ pub enum EventMsg {
/// Response to GetHistoryEntryRequest. /// Response to GetHistoryEntryRequest.
GetHistoryEntryResponse(GetHistoryEntryResponseEvent), GetHistoryEntryResponse(GetHistoryEntryResponseEvent),
/// Notification that the agent is shutting down.
ShutdownComplete,
} }
// Individual event payload types matching each `EventMsg` variant. // Individual event payload types matching each `EventMsg` variant.

View File

@@ -14,6 +14,7 @@ use time::macros::format_description;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self}; use tokio::sync::mpsc::{self};
use tokio::sync::oneshot;
use tracing::info; use tracing::info;
use tracing::warn; use tracing::warn;
use uuid::Uuid; use uuid::Uuid;
@@ -57,10 +58,10 @@ pub(crate) struct RolloutRecorder {
tx: Sender<RolloutCmd>, tx: Sender<RolloutCmd>,
} }
#[derive(Clone)]
enum RolloutCmd { enum RolloutCmd {
AddItems(Vec<ResponseItem>), AddItems(Vec<ResponseItem>),
UpdateState(SessionStateSnapshot), UpdateState(SessionStateSnapshot),
Shutdown { ack: oneshot::Sender<()> },
} }
impl RolloutRecorder { impl RolloutRecorder {
@@ -204,6 +205,21 @@ impl RolloutRecorder {
info!("Resumed rollout successfully from {path:?}"); info!("Resumed rollout successfully from {path:?}");
Ok((Self { tx }, saved)) Ok((Self { tx }, saved))
} }
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
Ok(_) => rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}"))),
Err(e) => {
warn!("failed to send rollout shutdown command: {e}");
Err(IoError::other(format!(
"failed to send rollout shutdown command: {e}"
)))
}
}
}
} }
struct LogFileInfo { struct LogFileInfo {
@@ -299,6 +315,9 @@ async fn rollout_writer(
let _ = file.flush().await; let _ = file.flush().await;
} }
} }
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}
} }
} }
} }

View File

@@ -1,15 +1,23 @@
use std::path::Path;
use codex_common::summarize_sandbox_policy; use codex_common::summarize_sandbox_policy;
use codex_core::WireApi; use codex_core::WireApi;
use codex_core::config::Config; use codex_core::config::Config;
use codex_core::model_supports_reasoning_summaries; use codex_core::model_supports_reasoning_summaries;
use codex_core::protocol::Event; use codex_core::protocol::Event;
pub(crate) enum CodexStatus {
Running,
InitiateShutdown,
Shutdown,
}
pub(crate) trait EventProcessor { pub(crate) trait EventProcessor {
/// Print summary of effective configuration and user prompt. /// Print summary of effective configuration and user prompt.
fn print_config_summary(&mut self, config: &Config, prompt: &str); fn print_config_summary(&mut self, config: &Config, prompt: &str);
/// Handle a single event emitted by the agent. /// Handle a single event emitted by the agent.
fn process_event(&mut self, event: Event); fn process_event(&mut self, event: Event) -> CodexStatus;
} }
pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static str, String)> { pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static str, String)> {
@@ -35,3 +43,28 @@ pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static st
entries entries
} }
pub(crate) fn handle_last_message(
last_agent_message: Option<&str>,
last_message_path: Option<&Path>,
) {
match (last_message_path, last_agent_message) {
(Some(path), Some(msg)) => write_last_message_file(msg, Some(path)),
(Some(path), None) => {
write_last_message_file("", Some(path));
eprintln!(
"Warning: no last agent message; wrote empty content to {}",
path.display()
);
}
(None, _) => eprintln!("Warning: no file to write last message to."),
}
}
fn write_last_message_file(contents: &str, last_message_path: Option<&Path>) {
if let Some(path) = last_message_path {
if let Err(e) = std::fs::write(path, contents) {
eprintln!("Failed to write last message file {path:?}: {e}");
}
}
}

View File

@@ -15,16 +15,20 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TokenUsage; use codex_core::protocol::TokenUsage;
use owo_colors::OwoColorize; use owo_colors::OwoColorize;
use owo_colors::Style; use owo_colors::Style;
use shlex::try_join; use shlex::try_join;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use std::path::PathBuf;
use std::time::Instant; use std::time::Instant;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor; use crate::event_processor::EventProcessor;
use crate::event_processor::create_config_summary_entries; use crate::event_processor::create_config_summary_entries;
use crate::event_processor::handle_last_message;
/// This should be configurable. When used in CI, users may not want to impose /// This should be configurable. When used in CI, users may not want to impose
/// a limit so they can see the full transcript. /// a limit so they can see the full transcript.
@@ -54,10 +58,15 @@ pub(crate) struct EventProcessorWithHumanOutput {
show_agent_reasoning: bool, show_agent_reasoning: bool,
answer_started: bool, answer_started: bool,
reasoning_started: bool, reasoning_started: bool,
last_message_path: Option<PathBuf>,
} }
impl EventProcessorWithHumanOutput { impl EventProcessorWithHumanOutput {
pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self { pub(crate) fn create_with_ansi(
with_ansi: bool,
config: &Config,
last_message_path: Option<PathBuf>,
) -> Self {
let call_id_to_command = HashMap::new(); let call_id_to_command = HashMap::new();
let call_id_to_patch = HashMap::new(); let call_id_to_patch = HashMap::new();
let call_id_to_tool_call = HashMap::new(); let call_id_to_tool_call = HashMap::new();
@@ -77,6 +86,7 @@ impl EventProcessorWithHumanOutput {
show_agent_reasoning: !config.hide_agent_reasoning, show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false, answer_started: false,
reasoning_started: false, reasoning_started: false,
last_message_path,
} }
} else { } else {
Self { Self {
@@ -93,6 +103,7 @@ impl EventProcessorWithHumanOutput {
show_agent_reasoning: !config.hide_agent_reasoning, show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false, answer_started: false,
reasoning_started: false, reasoning_started: false,
last_message_path,
} }
} }
} }
@@ -158,7 +169,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
); );
} }
fn process_event(&mut self, event: Event) { fn process_event(&mut self, event: Event) -> CodexStatus {
let Event { id: _, msg } = event; let Event { id: _, msg } = event;
match msg { match msg {
EventMsg::Error(ErrorEvent { message }) => { EventMsg::Error(ErrorEvent { message }) => {
@@ -168,9 +179,16 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed)); ts_println!(self, "{}", message.style(self.dimmed));
} }
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => { EventMsg::TaskStarted => {
// Ignore. // Ignore.
} }
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
handle_last_message(
last_agent_message.as_deref(),
self.last_message_path.as_deref(),
);
return CodexStatus::InitiateShutdown;
}
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => { EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
ts_println!(self, "tokens used: {total_tokens}"); ts_println!(self, "tokens used: {total_tokens}");
} }
@@ -185,7 +203,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
} }
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
if !self.show_agent_reasoning { if !self.show_agent_reasoning {
return; return CodexStatus::Running;
} }
if !self.reasoning_started { if !self.reasoning_started {
ts_println!( ts_println!(
@@ -498,7 +516,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::GetHistoryEntryResponse(_) => { EventMsg::GetHistoryEntryResponse(_) => {
// Currently ignored in exec output. // Currently ignored in exec output.
} }
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
} }
CodexStatus::Running
} }
} }

View File

@@ -1,18 +1,24 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf;
use codex_core::config::Config; use codex_core::config::Config;
use codex_core::protocol::Event; use codex_core::protocol::Event;
use codex_core::protocol::EventMsg; use codex_core::protocol::EventMsg;
use codex_core::protocol::TaskCompleteEvent;
use serde_json::json; use serde_json::json;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor; use crate::event_processor::EventProcessor;
use crate::event_processor::create_config_summary_entries; use crate::event_processor::create_config_summary_entries;
use crate::event_processor::handle_last_message;
pub(crate) struct EventProcessorWithJsonOutput; pub(crate) struct EventProcessorWithJsonOutput {
last_message_path: Option<PathBuf>,
}
impl EventProcessorWithJsonOutput { impl EventProcessorWithJsonOutput {
pub fn new() -> Self { pub fn new(last_message_path: Option<PathBuf>) -> Self {
Self {} Self { last_message_path }
} }
} }
@@ -33,15 +39,25 @@ impl EventProcessor for EventProcessorWithJsonOutput {
println!("{prompt_json}"); println!("{prompt_json}");
} }
fn process_event(&mut self, event: Event) { fn process_event(&mut self, event: Event) -> CodexStatus {
match event.msg { match event.msg {
EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) => { EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) => {
// Suppress streaming events in JSON mode. // Suppress streaming events in JSON mode.
CodexStatus::Running
} }
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
handle_last_message(
last_agent_message.as_deref(),
self.last_message_path.as_deref(),
);
CodexStatus::InitiateShutdown
}
EventMsg::ShutdownComplete => CodexStatus::Shutdown,
_ => { _ => {
if let Ok(line) = serde_json::to_string(&event) { if let Ok(line) = serde_json::to_string(&event) {
println!("{line}"); println!("{line}");
} }
CodexStatus::Running
} }
} }
} }

View File

@@ -5,7 +5,6 @@ mod event_processor_with_json_output;
use std::io::IsTerminal; use std::io::IsTerminal;
use std::io::Read; use std::io::Read;
use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
@@ -28,6 +27,7 @@ use tracing::error;
use tracing::info; use tracing::info;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor; use crate::event_processor::EventProcessor;
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> { pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
@@ -123,11 +123,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?; let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
let mut event_processor: Box<dyn EventProcessor> = if json_mode { let mut event_processor: Box<dyn EventProcessor> = if json_mode {
Box::new(EventProcessorWithJsonOutput::new()) Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone()))
} else { } else {
Box::new(EventProcessorWithHumanOutput::create_with_ansi( Box::new(EventProcessorWithHumanOutput::create_with_ansi(
stdout_with_ansi, stdout_with_ansi,
&config, &config,
last_message_file.clone(),
)) ))
}; };
@@ -224,40 +225,17 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
// Run the loop until the task is complete. // Run the loop until the task is complete.
while let Some(event) = rx.recv().await { while let Some(event) = rx.recv().await {
let (is_last_event, last_assistant_message) = match &event.msg { let shutdown: CodexStatus = event_processor.process_event(event);
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => { match shutdown {
(true, last_agent_message.clone()) CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
codex.submit(Op::Shutdown).await?;
}
CodexStatus::Shutdown => {
break;
} }
_ => (false, None),
};
event_processor.process_event(event);
if is_last_event {
handle_last_message(last_assistant_message, last_message_file.as_deref())?;
break;
} }
} }
Ok(()) Ok(())
} }
fn handle_last_message(
last_agent_message: Option<String>,
last_message_file: Option<&Path>,
) -> std::io::Result<()> {
match (last_agent_message, last_message_file) {
(Some(last_agent_message), Some(last_message_file)) => {
// Last message and a file to write to.
std::fs::write(last_message_file, last_agent_message)?;
}
(None, Some(last_message_file)) => {
eprintln!(
"Warning: No last message to write to file: {}",
last_message_file.to_string_lossy()
);
}
(_, None) => {
// No last message and no file to write to.
}
}
Ok(())
}

View File

@@ -246,7 +246,8 @@ async fn run_codex_tool_session_inner(
| EventMsg::BackgroundEvent(_) | EventMsg::BackgroundEvent(_)
| EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_) | EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_) => { | EventMsg::GetHistoryEntryResponse(_)
| EventMsg::ShutdownComplete => {
// For now, we do not do anything extra for these // For now, we do not do anything extra for these
// events. Note that // events. Note that
// send(codex_event_to_notification(&event)) above has // send(codex_event_to_notification(&event)) above has

View File

@@ -223,9 +223,7 @@ impl App<'_> {
} => { } => {
match &mut self.app_state { match &mut self.app_state {
AppState::Chat { widget } => { AppState::Chat { widget } => {
if widget.on_ctrl_c() { widget.on_ctrl_c();
self.app_event_tx.send(AppEvent::ExitRequest);
}
} }
AppState::Login { .. } | AppState::GitWarning { .. } => { AppState::Login { .. } | AppState::GitWarning { .. } => {
// No-op. // No-op.

View File

@@ -419,6 +419,9 @@ impl ChatWidget<'_> {
self.bottom_pane self.bottom_pane
.on_history_entry_response(log_id, offset, entry.map(|e| e.text)); .on_history_entry_response(log_id, offset, entry.map(|e| e.text));
} }
EventMsg::ShutdownComplete => {
self.app_event_tx.send(AppEvent::ExitRequest);
}
event => { event => {
self.conversation_history self.conversation_history
.add_background_event(format!("{event:?}")); .add_background_event(format!("{event:?}"));
@@ -471,6 +474,7 @@ impl ChatWidget<'_> {
self.reasoning_buffer.clear(); self.reasoning_buffer.clear();
false false
} else if self.bottom_pane.ctrl_c_quit_hint_visible() { } else if self.bottom_pane.ctrl_c_quit_hint_visible() {
self.submit_op(Op::Shutdown);
true true
} else { } else {
self.bottom_pane.show_ctrl_c_quit_hint(); self.bottom_pane.show_ctrl_c_quit_hint();