From 8ad56be06e8ddbc97edf5e7d16456dd2a6f8f589 Mon Sep 17 00:00:00 2001 From: easong-openai Date: Thu, 21 Aug 2025 01:15:24 -0700 Subject: [PATCH] Parse and expose stream errors (#2540) --- codex-rs/core/src/codex.rs | 15 ++++++++++++-- .../src/event_processor_with_human_output.rs | 4 ++++ codex-rs/mcp-server/src/codex_tool_runner.rs | 1 + codex-rs/protocol/src/protocol.rs | 9 +++++++++ codex-rs/tui/src/chatwidget.rs | 8 ++++++++ codex-rs/tui/src/chatwidget/tests.rs | 20 +++++++++++++++++++ codex-rs/tui/src/history_cell.rs | 6 ++++++ 7 files changed, 61 insertions(+), 2 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index a692e6fa..7d616f96 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -94,6 +94,7 @@ use crate::protocol::PatchApplyEndEvent; use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; use crate::protocol::SessionConfiguredEvent; +use crate::protocol::StreamErrorEvent; use crate::protocol::Submission; use crate::protocol::TaskCompleteEvent; use crate::protocol::TurnDiffEvent; @@ -815,6 +816,16 @@ impl Session { let _ = self.tx_event.send(event).await; } + async fn notify_stream_error(&self, sub_id: &str, message: impl Into) { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::StreamError(StreamErrorEvent { + message: message.into(), + }), + }; + let _ = self.tx_event.send(event).await; + } + /// Build the full turn input by concatenating the current conversation /// history with additional items for this turn. pub fn turn_input_with_history(&self, extra: Vec) -> Vec { @@ -1523,7 +1534,7 @@ async fn run_turn( // Surface retry information to any UI/front‑end so the // user understands what is happening instead of staring // at a seemingly frozen screen. - sess.notify_background_event( + sess.notify_stream_error( &sub_id, format!( "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…" @@ -1758,7 +1769,7 @@ async fn run_compact_task( if retries < max_retries { retries += 1; let delay = backoff(retries); - sess.notify_background_event( + sess.notify_stream_error( &sub_id, format!( "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…" diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 498b1d29..9a562cbd 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -20,6 +20,7 @@ use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; +use codex_core::protocol::StreamErrorEvent; use codex_core::protocol::TaskCompleteEvent; use codex_core::protocol::TurnAbortReason; use codex_core::protocol::TurnDiffEvent; @@ -174,6 +175,9 @@ impl EventProcessor for EventProcessorWithHumanOutput { EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { ts_println!(self, "{}", message.style(self.dimmed)); } + EventMsg::StreamError(StreamErrorEvent { message }) => { + ts_println!(self, "{}", message.style(self.dimmed)); + } EventMsg::TaskStarted => { // Ignore. } diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index e1dfd08c..36845d89 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -268,6 +268,7 @@ async fn run_codex_tool_session_inner( | EventMsg::ExecCommandOutputDelta(_) | EventMsg::ExecCommandEnd(_) | EventMsg::BackgroundEvent(_) + | EventMsg::StreamError(_) | EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyEnd(_) | EventMsg::TurnDiff(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 65ef30aa..fbe052bf 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -446,6 +446,10 @@ pub enum EventMsg { BackgroundEvent(BackgroundEventEvent), + /// Notification that a model stream experienced an error or disconnect + /// and the system is handling it (e.g., retrying with backoff). + StreamError(StreamErrorEvent), + /// Notification that the agent is about to apply a code patch. Mirrors /// `ExecCommandBegin` so front‑ends can show progress indicators. PatchApplyBegin(PatchApplyBeginEvent), @@ -721,6 +725,11 @@ pub struct BackgroundEventEvent { pub message: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct StreamErrorEvent { + pub message: String, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct PatchApplyBeginEvent { /// Identifier so this can be paired with the PatchApplyEnd event. diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index b39cf7aa..3386133f 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -23,6 +23,7 @@ use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; +use codex_core::protocol::StreamErrorEvent; use codex_core::protocol::TaskCompleteEvent; use codex_core::protocol::TokenUsage; use codex_core::protocol::TurnDiffEvent; @@ -327,6 +328,12 @@ impl ChatWidget { fn on_background_event(&mut self, message: String) { debug!("BackgroundEvent: {message}"); } + + fn on_stream_error(&mut self, message: String) { + // Show stream errors in the transcript so users see retry/backoff info. + self.add_to_history(history_cell::new_stream_error_event(message)); + self.mark_needs_redraw(); + } /// Periodic tick to commit at most one queued line to history with a small delay, /// animating the output. pub(crate) fn on_commit_tick(&mut self) { @@ -690,6 +697,7 @@ impl ChatWidget { EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { self.on_background_event(message) } + EventMsg::StreamError(StreamErrorEvent { message }) => self.on_stream_error(message), } // Coalesce redraws: issue at most one after handling the event if self.needs_redraw { diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 42dcddeb..d981f06d 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -19,6 +19,7 @@ use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::FileChange; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::StreamErrorEvent; use codex_core::protocol::TaskCompleteEvent; use crossterm::event::KeyCode; use crossterm::event::KeyEvent; @@ -823,6 +824,25 @@ fn plan_update_renders_history_cell() { assert!(blob.contains("Write tests")); } +#[test] +fn stream_error_is_rendered_to_history() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); + let msg = "stream error: stream disconnected before completion: idle timeout waiting for SSE; retrying 1/5 in 211ms…"; + chat.handle_codex_event(Event { + id: "sub-1".into(), + msg: EventMsg::StreamError(StreamErrorEvent { + message: msg.to_string(), + }), + }); + + let cells = drain_insert_history(&mut rx); + assert!(!cells.is_empty(), "expected a history cell for StreamError"); + let blob = lines_to_single_string(cells.last().unwrap()); + assert!(blob.contains("⚠ ")); + assert!(blob.contains("stream error:")); + assert!(blob.contains("idle timeout waiting for SSE")); +} + #[test] fn headers_emitted_on_stream_begin_for_answer_and_not_for_reasoning() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 8e3a5fb9..4948d31d 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -751,6 +751,12 @@ pub(crate) fn new_error_event(message: String) -> PlainHistoryCell { PlainHistoryCell { lines } } +pub(crate) fn new_stream_error_event(message: String) -> PlainHistoryCell { + let lines: Vec> = + vec![vec!["⚠ ".magenta().bold(), message.dim()].into(), "".into()]; + PlainHistoryCell { lines } +} + /// Render a user‑friendly plan update styled like a checkbox todo list. pub(crate) fn new_plan_update(update: UpdatePlanArgs) -> PlainHistoryCell { let UpdatePlanArgs { explanation, plan } = update;