From 5185d69f134505797a497b01e3dbeb07c16a04cf Mon Sep 17 00:00:00 2001 From: Fouad Matin <169186268+fouad-openai@users.noreply.github.com> Date: Sun, 14 Sep 2025 18:04:05 -0700 Subject: [PATCH] fix(core): flaky test `completed_commands_do_not_persist_sessions` (#3596) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix flaky test: ``` FAIL [ 2.641s] codex-core unified_exec::tests::completed_commands_do_not_persist_sessions stdout ─── running 1 test test unified_exec::tests::completed_commands_do_not_persist_sessions ... FAILED failures: failures: unified_exec::tests::completed_commands_do_not_persist_sessions test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 235 filtered out; finished in 2.63s stderr ─── thread 'unified_exec::tests::completed_commands_do_not_persist_sessions' panicked at core/src/unified_exec/mod.rs:582:9: assertion failed: result.output.contains("codex") ``` --- .../src/exec_command/exec_command_session.rs | 20 ++++++++++++++++++- .../core/src/exec_command/session_manager.rs | 2 ++ codex-rs/core/src/unified_exec/mod.rs | 8 ++++++-- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/exec_command/exec_command_session.rs b/codex-rs/core/src/exec_command/exec_command_session.rs index b524506e..3bf647ef 100644 --- a/codex-rs/core/src/exec_command/exec_command_session.rs +++ b/codex-rs/core/src/exec_command/exec_command_session.rs @@ -11,6 +11,9 @@ pub(crate) struct ExecCommandSession { /// Broadcast stream of output chunks read from the PTY. New subscribers /// receive only chunks emitted after they subscribe. output_tx: broadcast::Sender>, + /// Receiver subscribed before the child process starts emitting output so + /// the first caller can consume any early data without races. + initial_output_rx: StdMutex>>>, /// Child killer handle for termination on drop (can signal independently /// of a thread blocked in `.wait()`). @@ -42,6 +45,7 @@ impl ExecCommandSession { Self { writer_tx, output_tx, + initial_output_rx: StdMutex::new(None), killer: StdMutex::new(Some(killer)), reader_handle: StdMutex::new(Some(reader_handle)), writer_handle: StdMutex::new(Some(writer_handle)), @@ -50,12 +54,26 @@ impl ExecCommandSession { } } + pub(crate) fn set_initial_output_receiver(&self, receiver: broadcast::Receiver>) { + if let Ok(mut guard) = self.initial_output_rx.lock() + && guard.is_none() + { + *guard = Some(receiver); + } + } + pub(crate) fn writer_sender(&self) -> mpsc::Sender> { self.writer_tx.clone() } pub(crate) fn output_receiver(&self) -> broadcast::Receiver> { - self.output_tx.subscribe() + if let Ok(mut guard) = self.initial_output_rx.lock() + && let Some(receiver) = guard.take() + { + receiver + } else { + self.output_tx.subscribe() + } } pub(crate) fn has_exited(&self) -> bool { diff --git a/codex-rs/core/src/exec_command/session_manager.rs b/codex-rs/core/src/exec_command/session_manager.rs index 9578610c..26fb3f3e 100644 --- a/codex-rs/core/src/exec_command/session_manager.rs +++ b/codex-rs/core/src/exec_command/session_manager.rs @@ -279,6 +279,7 @@ async fn create_exec_command_session( let (writer_tx, mut writer_rx) = mpsc::channel::>(128); // Broadcast for streaming PTY output to readers: subscribers receive from subscription time. let (output_tx, _) = tokio::sync::broadcast::channel::>(256); + let initial_output_rx = output_tx.subscribe(); // Reader task: drain PTY and forward chunks to output channel. let mut reader = pair.master.try_clone_reader()?; @@ -350,6 +351,7 @@ async fn create_exec_command_session( wait_handle, exit_status, ); + session.set_initial_output_receiver(initial_output_rx); Ok((session, exit_rx)) } diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index 5d8efbe0..c6980c53 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -327,6 +327,7 @@ async fn create_unified_exec_session( let (writer_tx, mut writer_rx) = mpsc::channel::>(128); let (output_tx, _) = tokio::sync::broadcast::channel::>(256); + let initial_output_rx = output_tx.subscribe(); let mut reader = pair .master @@ -380,7 +381,7 @@ async fn create_unified_exec_session( wait_exit_status.store(true, Ordering::SeqCst); }); - Ok(ExecCommandSession::new( + let session = ExecCommandSession::new( writer_tx, output_tx, killer, @@ -388,7 +389,10 @@ async fn create_unified_exec_session( writer_handle, wait_handle, exit_status, - )) + ); + session.set_initial_output_receiver(initial_output_rx); + + Ok(session) } #[cfg(test)]