Parse and expose stream errors (#2540)

This commit is contained in:
easong-openai
2025-08-21 01:15:24 -07:00
committed by GitHub
parent d2b2a6d13a
commit 8ad56be06e
7 changed files with 61 additions and 2 deletions

View File

@@ -94,6 +94,7 @@ use crate::protocol::PatchApplyEndEvent;
use crate::protocol::ReviewDecision; use crate::protocol::ReviewDecision;
use crate::protocol::SandboxPolicy; use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent; use crate::protocol::SessionConfiguredEvent;
use crate::protocol::StreamErrorEvent;
use crate::protocol::Submission; use crate::protocol::Submission;
use crate::protocol::TaskCompleteEvent; use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnDiffEvent; use crate::protocol::TurnDiffEvent;
@@ -815,6 +816,16 @@ impl Session {
let _ = self.tx_event.send(event).await; let _ = self.tx_event.send(event).await;
} }
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::StreamError(StreamErrorEvent {
message: message.into(),
}),
};
let _ = self.tx_event.send(event).await;
}
/// Build the full turn input by concatenating the current conversation /// Build the full turn input by concatenating the current conversation
/// history with additional items for this turn. /// history with additional items for this turn.
pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> { pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
@@ -1523,7 +1534,7 @@ async fn run_turn(
// Surface retry information to any UI/frontend so the // Surface retry information to any UI/frontend so the
// user understands what is happening instead of staring // user understands what is happening instead of staring
// at a seemingly frozen screen. // at a seemingly frozen screen.
sess.notify_background_event( sess.notify_stream_error(
&sub_id, &sub_id,
format!( format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}" "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"
@@ -1758,7 +1769,7 @@ async fn run_compact_task(
if retries < max_retries { if retries < max_retries {
retries += 1; retries += 1;
let delay = backoff(retries); let delay = backoff(retries);
sess.notify_background_event( sess.notify_stream_error(
&sub_id, &sub_id,
format!( format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}" "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"

View File

@@ -20,6 +20,7 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TaskCompleteEvent; use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TurnAbortReason; use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent; use codex_core::protocol::TurnDiffEvent;
@@ -174,6 +175,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed)); ts_println!(self, "{}", message.style(self.dimmed));
} }
EventMsg::StreamError(StreamErrorEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::TaskStarted => { EventMsg::TaskStarted => {
// Ignore. // Ignore.
} }

View File

@@ -268,6 +268,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::ExecCommandOutputDelta(_) | EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandEnd(_) | EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_) | EventMsg::BackgroundEvent(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_) | EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_) | EventMsg::TurnDiff(_)

View File

@@ -446,6 +446,10 @@ pub enum EventMsg {
BackgroundEvent(BackgroundEventEvent), BackgroundEvent(BackgroundEventEvent),
/// Notification that a model stream experienced an error or disconnect
/// and the system is handling it (e.g., retrying with backoff).
StreamError(StreamErrorEvent),
/// Notification that the agent is about to apply a code patch. Mirrors /// Notification that the agent is about to apply a code patch. Mirrors
/// `ExecCommandBegin` so frontends can show progress indicators. /// `ExecCommandBegin` so frontends can show progress indicators.
PatchApplyBegin(PatchApplyBeginEvent), PatchApplyBegin(PatchApplyBeginEvent),
@@ -721,6 +725,11 @@ pub struct BackgroundEventEvent {
pub message: String, pub message: String,
} }
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamErrorEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PatchApplyBeginEvent { pub struct PatchApplyBeginEvent {
/// Identifier so this can be paired with the PatchApplyEnd event. /// Identifier so this can be paired with the PatchApplyEnd event.

View File

@@ -23,6 +23,7 @@ use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op; use codex_core::protocol::Op;
use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TaskCompleteEvent; use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TokenUsage; use codex_core::protocol::TokenUsage;
use codex_core::protocol::TurnDiffEvent; use codex_core::protocol::TurnDiffEvent;
@@ -327,6 +328,12 @@ impl ChatWidget {
fn on_background_event(&mut self, message: String) { fn on_background_event(&mut self, message: String) {
debug!("BackgroundEvent: {message}"); debug!("BackgroundEvent: {message}");
} }
fn on_stream_error(&mut self, message: String) {
// Show stream errors in the transcript so users see retry/backoff info.
self.add_to_history(history_cell::new_stream_error_event(message));
self.mark_needs_redraw();
}
/// Periodic tick to commit at most one queued line to history with a small delay, /// Periodic tick to commit at most one queued line to history with a small delay,
/// animating the output. /// animating the output.
pub(crate) fn on_commit_tick(&mut self) { pub(crate) fn on_commit_tick(&mut self) {
@@ -690,6 +697,7 @@ impl ChatWidget {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
self.on_background_event(message) self.on_background_event(message)
} }
EventMsg::StreamError(StreamErrorEvent { message }) => self.on_stream_error(message),
} }
// Coalesce redraws: issue at most one after handling the event // Coalesce redraws: issue at most one after handling the event
if self.needs_redraw { if self.needs_redraw {

View File

@@ -19,6 +19,7 @@ use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::FileChange; use codex_core::protocol::FileChange;
use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TaskCompleteEvent; use codex_core::protocol::TaskCompleteEvent;
use crossterm::event::KeyCode; use crossterm::event::KeyCode;
use crossterm::event::KeyEvent; use crossterm::event::KeyEvent;
@@ -823,6 +824,25 @@ fn plan_update_renders_history_cell() {
assert!(blob.contains("Write tests")); assert!(blob.contains("Write tests"));
} }
#[test]
fn stream_error_is_rendered_to_history() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
let msg = "stream error: stream disconnected before completion: idle timeout waiting for SSE; retrying 1/5 in 211ms…";
chat.handle_codex_event(Event {
id: "sub-1".into(),
msg: EventMsg::StreamError(StreamErrorEvent {
message: msg.to_string(),
}),
});
let cells = drain_insert_history(&mut rx);
assert!(!cells.is_empty(), "expected a history cell for StreamError");
let blob = lines_to_single_string(cells.last().unwrap());
assert!(blob.contains(""));
assert!(blob.contains("stream error:"));
assert!(blob.contains("idle timeout waiting for SSE"));
}
#[test] #[test]
fn headers_emitted_on_stream_begin_for_answer_and_not_for_reasoning() { fn headers_emitted_on_stream_begin_for_answer_and_not_for_reasoning() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();

View File

@@ -751,6 +751,12 @@ pub(crate) fn new_error_event(message: String) -> PlainHistoryCell {
PlainHistoryCell { lines } PlainHistoryCell { lines }
} }
pub(crate) fn new_stream_error_event(message: String) -> PlainHistoryCell {
let lines: Vec<Line<'static>> =
vec![vec!["".magenta().bold(), message.dim()].into(), "".into()];
PlainHistoryCell { lines }
}
/// Render a userfriendly plan update styled like a checkbox todo list. /// Render a userfriendly plan update styled like a checkbox todo list.
pub(crate) fn new_plan_update(update: UpdatePlanArgs) -> PlainHistoryCell { pub(crate) fn new_plan_update(update: UpdatePlanArgs) -> PlainHistoryCell {
let UpdatePlanArgs { explanation, plan } = update; let UpdatePlanArgs { explanation, plan } = update;