simplify StreamController (#3928)

no intended functional change, just simplifying the code.
This commit is contained in:
Jeremy Rose
2025-09-22 11:14:04 -07:00
committed by GitHub
parent 434eb4fd49
commit fa80bbb587
6 changed files with 86 additions and 220 deletions

View File

@@ -171,7 +171,7 @@ pub(crate) struct ChatWidget {
rate_limit_snapshot: Option<RateLimitSnapshotEvent>,
rate_limit_warnings: RateLimitWarningState,
// Stream lifecycle controller
stream: StreamController,
stream_controller: Option<StreamController>,
running_commands: HashMap<String, RunningCommand>,
task_complete_pending: bool,
// Queue of interruptive UI events deferred during an active write cycle
@@ -219,8 +219,10 @@ fn create_initial_user_message(text: String, image_paths: Vec<PathBuf>) -> Optio
impl ChatWidget {
fn flush_answer_stream_with_separator(&mut self) {
let sink = AppEventHistorySink(self.app_event_tx.clone());
let _ = self.stream.finalize(true, &sink);
if let Some(mut controller) = self.stream_controller.take() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
controller.finalize(&sink);
}
}
// --- Small event handlers ---
fn on_session_configured(&mut self, event: codex_core::protocol::SessionConfiguredEvent) {
@@ -249,9 +251,13 @@ impl ChatWidget {
}
fn on_agent_message(&mut self, message: String) {
let sink = AppEventHistorySink(self.app_event_tx.clone());
let finished = self.stream.apply_final_answer(&message, &sink);
self.handle_if_stream_finished(finished);
// If we have a stream_controller, then the final agent message is redundant and will be a
// duplicate of what has already been streamed.
if self.stream_controller.is_none() {
self.handle_streaming_delta(message);
}
self.flush_answer_stream_with_separator();
self.handle_stream_finished();
self.request_redraw();
}
@@ -301,7 +307,6 @@ impl ChatWidget {
fn on_task_started(&mut self) {
self.bottom_pane.clear_ctrl_c_quit_hint();
self.bottom_pane.set_task_running(true);
self.stream.reset_headers_for_new_turn();
self.full_reasoning_buffer.clear();
self.reasoning_buffer.clear();
self.request_redraw();
@@ -310,9 +315,9 @@ impl ChatWidget {
fn on_task_complete(&mut self, last_agent_message: Option<String>) {
// If a stream is currently active, finalize only that stream to flush any tail
// without emitting stray headers for other streams.
if self.stream.is_write_cycle_active() {
if let Some(mut controller) = self.stream_controller.take() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
let _ = self.stream.finalize(true, &sink);
controller.finalize(&sink);
}
// Mark task stopped and request redraw now that all content is in history.
self.bottom_pane.set_task_running(false);
@@ -353,7 +358,7 @@ impl ChatWidget {
// Reset running state and clear streaming buffers.
self.bottom_pane.set_task_running(false);
self.running_commands.clear();
self.stream.clear_all();
self.stream_controller = None;
}
fn on_error(&mut self, message: String) {
@@ -508,12 +513,13 @@ impl ChatWidget {
/// Periodic tick to commit at most one queued line to history with a small delay,
/// animating the output.
pub(crate) fn on_commit_tick(&mut self) {
let sink = AppEventHistorySink(self.app_event_tx.clone());
let finished = self.stream.on_commit_tick(&sink);
self.handle_if_stream_finished(finished);
}
fn is_write_cycle_active(&self) -> bool {
self.stream.is_write_cycle_active()
if let Some(controller) = self.stream_controller.as_mut() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
let finished = controller.on_commit_tick(&sink);
if finished {
self.handle_stream_finished();
}
}
}
fn flush_interrupt_queue(&mut self) {
@@ -531,32 +537,34 @@ impl ChatWidget {
// Preserve deterministic FIFO across queued interrupts: once anything
// is queued due to an active write cycle, continue queueing until the
// queue is flushed to avoid reordering (e.g., ExecEnd before ExecBegin).
if self.is_write_cycle_active() || !self.interrupts.is_empty() {
if self.stream_controller.is_some() || !self.interrupts.is_empty() {
push(&mut self.interrupts);
} else {
handle(self);
}
}
#[inline]
fn handle_if_stream_finished(&mut self, finished: bool) {
if finished {
if self.task_complete_pending {
self.bottom_pane.set_task_running(false);
self.task_complete_pending = false;
}
// A completed stream indicates non-exec content was just inserted.
self.flush_interrupt_queue();
fn handle_stream_finished(&mut self) {
if self.task_complete_pending {
self.bottom_pane.set_task_running(false);
self.task_complete_pending = false;
}
// A completed stream indicates non-exec content was just inserted.
self.flush_interrupt_queue();
}
#[inline]
fn handle_streaming_delta(&mut self, delta: String) {
// Before streaming agent content, flush any active exec cell group.
self.flush_active_exec_cell();
let sink = AppEventHistorySink(self.app_event_tx.clone());
self.stream.begin(&sink);
self.stream.push_and_maybe_commit(&delta, &sink);
if self.stream_controller.is_none() {
self.stream_controller = Some(StreamController::new(self.config.clone()));
}
if let Some(controller) = self.stream_controller.as_mut() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
controller.push_and_maybe_commit(&delta, &sink);
}
self.request_redraw();
}
@@ -754,7 +762,7 @@ impl ChatWidget {
active_exec_cell: None,
config: config.clone(),
auth_manager,
session_header: SessionHeader::new(config.model.clone()),
session_header: SessionHeader::new(config.model),
initial_user_message: create_initial_user_message(
initial_prompt.unwrap_or_default(),
initial_images,
@@ -762,7 +770,7 @@ impl ChatWidget {
token_info: None,
rate_limit_snapshot: None,
rate_limit_warnings: RateLimitWarningState::default(),
stream: StreamController::new(config),
stream_controller: None,
running_commands: HashMap::new(),
task_complete_pending: false,
interrupts: InterruptManager::new(),
@@ -813,7 +821,7 @@ impl ChatWidget {
active_exec_cell: None,
config: config.clone(),
auth_manager,
session_header: SessionHeader::new(config.model.clone()),
session_header: SessionHeader::new(config.model),
initial_user_message: create_initial_user_message(
initial_prompt.unwrap_or_default(),
initial_images,
@@ -821,7 +829,7 @@ impl ChatWidget {
token_info: None,
rate_limit_snapshot: None,
rate_limit_warnings: RateLimitWarningState::default(),
stream: StreamController::new(config),
stream_controller: None,
running_commands: HashMap::new(),
task_complete_pending: false,
interrupts: InterruptManager::new(),