From fa80bbb587d5914022157f6528d4a7b18c1a67ed Mon Sep 17 00:00:00 2001 From: Jeremy Rose <172423086+nornagon-openai@users.noreply.github.com> Date: Mon, 22 Sep 2025 11:14:04 -0700 Subject: [PATCH] simplify StreamController (#3928) no intended functional change, just simplifying the code. --- codex-rs/tui/src/chatwidget.rs | 74 +++++++----- codex-rs/tui/src/chatwidget/tests.rs | 14 +-- codex-rs/tui/src/lib.rs | 18 ++- codex-rs/tui/src/markdown_stream.rs | 14 --- codex-rs/tui/src/streaming/controller.rs | 148 ++++------------------- codex-rs/tui/src/streaming/mod.rs | 38 ------ 6 files changed, 86 insertions(+), 220 deletions(-) diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index be27fbec..0702eff5 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -171,7 +171,7 @@ pub(crate) struct ChatWidget { rate_limit_snapshot: Option, rate_limit_warnings: RateLimitWarningState, // Stream lifecycle controller - stream: StreamController, + stream_controller: Option, running_commands: HashMap, task_complete_pending: bool, // Queue of interruptive UI events deferred during an active write cycle @@ -219,8 +219,10 @@ fn create_initial_user_message(text: String, image_paths: Vec) -> Optio impl ChatWidget { fn flush_answer_stream_with_separator(&mut self) { - let sink = AppEventHistorySink(self.app_event_tx.clone()); - let _ = self.stream.finalize(true, &sink); + if let Some(mut controller) = self.stream_controller.take() { + let sink = AppEventHistorySink(self.app_event_tx.clone()); + controller.finalize(&sink); + } } // --- Small event handlers --- fn on_session_configured(&mut self, event: codex_core::protocol::SessionConfiguredEvent) { @@ -249,9 +251,13 @@ impl ChatWidget { } fn on_agent_message(&mut self, message: String) { - let sink = AppEventHistorySink(self.app_event_tx.clone()); - let finished = self.stream.apply_final_answer(&message, &sink); - self.handle_if_stream_finished(finished); + // If we have a stream_controller, then the final agent message is redundant and will be a + // duplicate of what has already been streamed. + if self.stream_controller.is_none() { + self.handle_streaming_delta(message); + } + self.flush_answer_stream_with_separator(); + self.handle_stream_finished(); self.request_redraw(); } @@ -301,7 +307,6 @@ impl ChatWidget { fn on_task_started(&mut self) { self.bottom_pane.clear_ctrl_c_quit_hint(); self.bottom_pane.set_task_running(true); - self.stream.reset_headers_for_new_turn(); self.full_reasoning_buffer.clear(); self.reasoning_buffer.clear(); self.request_redraw(); @@ -310,9 +315,9 @@ impl ChatWidget { fn on_task_complete(&mut self, last_agent_message: Option) { // If a stream is currently active, finalize only that stream to flush any tail // without emitting stray headers for other streams. - if self.stream.is_write_cycle_active() { + if let Some(mut controller) = self.stream_controller.take() { let sink = AppEventHistorySink(self.app_event_tx.clone()); - let _ = self.stream.finalize(true, &sink); + controller.finalize(&sink); } // Mark task stopped and request redraw now that all content is in history. self.bottom_pane.set_task_running(false); @@ -353,7 +358,7 @@ impl ChatWidget { // Reset running state and clear streaming buffers. self.bottom_pane.set_task_running(false); self.running_commands.clear(); - self.stream.clear_all(); + self.stream_controller = None; } fn on_error(&mut self, message: String) { @@ -508,12 +513,13 @@ impl ChatWidget { /// Periodic tick to commit at most one queued line to history with a small delay, /// animating the output. pub(crate) fn on_commit_tick(&mut self) { - let sink = AppEventHistorySink(self.app_event_tx.clone()); - let finished = self.stream.on_commit_tick(&sink); - self.handle_if_stream_finished(finished); - } - fn is_write_cycle_active(&self) -> bool { - self.stream.is_write_cycle_active() + if let Some(controller) = self.stream_controller.as_mut() { + let sink = AppEventHistorySink(self.app_event_tx.clone()); + let finished = controller.on_commit_tick(&sink); + if finished { + self.handle_stream_finished(); + } + } } fn flush_interrupt_queue(&mut self) { @@ -531,32 +537,34 @@ impl ChatWidget { // Preserve deterministic FIFO across queued interrupts: once anything // is queued due to an active write cycle, continue queueing until the // queue is flushed to avoid reordering (e.g., ExecEnd before ExecBegin). - if self.is_write_cycle_active() || !self.interrupts.is_empty() { + if self.stream_controller.is_some() || !self.interrupts.is_empty() { push(&mut self.interrupts); } else { handle(self); } } - #[inline] - fn handle_if_stream_finished(&mut self, finished: bool) { - if finished { - if self.task_complete_pending { - self.bottom_pane.set_task_running(false); - self.task_complete_pending = false; - } - // A completed stream indicates non-exec content was just inserted. - self.flush_interrupt_queue(); + fn handle_stream_finished(&mut self) { + if self.task_complete_pending { + self.bottom_pane.set_task_running(false); + self.task_complete_pending = false; } + // A completed stream indicates non-exec content was just inserted. + self.flush_interrupt_queue(); } #[inline] fn handle_streaming_delta(&mut self, delta: String) { // Before streaming agent content, flush any active exec cell group. self.flush_active_exec_cell(); - let sink = AppEventHistorySink(self.app_event_tx.clone()); - self.stream.begin(&sink); - self.stream.push_and_maybe_commit(&delta, &sink); + + if self.stream_controller.is_none() { + self.stream_controller = Some(StreamController::new(self.config.clone())); + } + if let Some(controller) = self.stream_controller.as_mut() { + let sink = AppEventHistorySink(self.app_event_tx.clone()); + controller.push_and_maybe_commit(&delta, &sink); + } self.request_redraw(); } @@ -754,7 +762,7 @@ impl ChatWidget { active_exec_cell: None, config: config.clone(), auth_manager, - session_header: SessionHeader::new(config.model.clone()), + session_header: SessionHeader::new(config.model), initial_user_message: create_initial_user_message( initial_prompt.unwrap_or_default(), initial_images, @@ -762,7 +770,7 @@ impl ChatWidget { token_info: None, rate_limit_snapshot: None, rate_limit_warnings: RateLimitWarningState::default(), - stream: StreamController::new(config), + stream_controller: None, running_commands: HashMap::new(), task_complete_pending: false, interrupts: InterruptManager::new(), @@ -813,7 +821,7 @@ impl ChatWidget { active_exec_cell: None, config: config.clone(), auth_manager, - session_header: SessionHeader::new(config.model.clone()), + session_header: SessionHeader::new(config.model), initial_user_message: create_initial_user_message( initial_prompt.unwrap_or_default(), initial_images, @@ -821,7 +829,7 @@ impl ChatWidget { token_info: None, rate_limit_snapshot: None, rate_limit_warnings: RateLimitWarningState::default(), - stream: StreamController::new(config), + stream_controller: None, running_commands: HashMap::new(), task_complete_pending: false, interrupts: InterruptManager::new(), diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index c009cabd..c867155b 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -320,12 +320,12 @@ fn make_chatwidget_manual() -> ( active_exec_cell: None, config: cfg.clone(), auth_manager, - session_header: SessionHeader::new(cfg.model.clone()), + session_header: SessionHeader::new(cfg.model), initial_user_message: None, token_info: None, rate_limit_snapshot: None, rate_limit_warnings: RateLimitWarningState::default(), - stream: StreamController::new(cfg), + stream_controller: None, running_commands: HashMap::new(), task_complete_pending: false, interrupts: InterruptManager::new(), @@ -2133,8 +2133,12 @@ fn deltas_then_same_final_message_are_rendered_snapshot() { // then the exec block, another blank line, the status line, a blank line, and the composer. #[test] fn chatwidget_exec_and_status_layout_vt100_snapshot() { - // Setup identical scenario let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); + chat.handle_codex_event(Event { + id: "t1".into(), + msg: EventMsg::AgentMessage(AgentMessageEvent { message: "I’m going to search the repo for where “Change Approved” is rendered to update that view.".into() }), + }); + chat.handle_codex_event(Event { id: "c1".into(), msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent { @@ -2182,10 +2186,6 @@ fn chatwidget_exec_and_status_layout_vt100_snapshot() { }); chat.bottom_pane .set_composer_text("Summarize recent commits".to_string()); - chat.handle_codex_event(Event { - id: "t1".into(), - msg: EventMsg::AgentMessage(AgentMessageEvent { message: "I’m going to search the repo for where “Change Approved” is rendered to update that view.".into() }), - }); // Dimensions let width: u16 = 80; diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 92b42732..42295782 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -552,18 +552,24 @@ mod tests { use codex_core::auth::write_auth_json; use codex_core::token_data::IdTokenInfo; use codex_core::token_data::TokenData; - fn make_config() -> Config { - // Create a unique CODEX_HOME per test to isolate auth.json writes. + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + fn get_next_codex_home() -> PathBuf { + static NEXT_CODEX_HOME_ID: AtomicUsize = AtomicUsize::new(0); let mut codex_home = std::env::temp_dir(); let unique_suffix = format!( "codex_tui_test_{}_{}", std::process::id(), - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos() + NEXT_CODEX_HOME_ID.fetch_add(1, Ordering::Relaxed) ); codex_home.push(unique_suffix); + codex_home + } + + fn make_config() -> Config { + // Create a unique CODEX_HOME per test to isolate auth.json writes. + let codex_home = get_next_codex_home(); std::fs::create_dir_all(&codex_home).expect("create unique CODEX_HOME"); Config::load_from_base_config_with_overrides( diff --git a/codex-rs/tui/src/markdown_stream.rs b/codex-rs/tui/src/markdown_stream.rs index a9428b89..51331887 100644 --- a/codex-rs/tui/src/markdown_stream.rs +++ b/codex-rs/tui/src/markdown_stream.rs @@ -20,25 +20,11 @@ impl MarkdownStreamCollector { } } - /// Returns the number of logical lines that have already been committed - /// (i.e., previously returned from `commit_complete_lines`). - pub fn committed_count(&self) -> usize { - self.committed_line_count - } - pub fn clear(&mut self) { self.buffer.clear(); self.committed_line_count = 0; } - /// Replace the buffered content and mark that the first `committed_count` - /// logical lines are already committed. - pub fn replace_with_and_mark_committed(&mut self, s: &str, committed_count: usize) { - self.buffer.clear(); - self.buffer.push_str(s); - self.committed_line_count = committed_count; - } - pub fn push_delta(&mut self, delta: &str) { tracing::trace!("push_delta: {delta:?}"); self.buffer.push_str(delta); diff --git a/codex-rs/tui/src/streaming/controller.rs b/codex-rs/tui/src/streaming/controller.rs index 490d1f6d..89b7ab95 100644 --- a/codex-rs/tui/src/streaming/controller.rs +++ b/codex-rs/tui/src/streaming/controller.rs @@ -3,7 +3,6 @@ use crate::history_cell::HistoryCell; use codex_core::config::Config; use ratatui::text::Line; -use super::HeaderEmitter; use super::StreamState; /// Sink for history insertions and animation control. @@ -36,56 +35,25 @@ type Lines = Vec>; /// commit animation across streams. pub(crate) struct StreamController { config: Config, - header: HeaderEmitter, state: StreamState, - active: bool, finishing_after_drain: bool, + header_emitted: bool, } impl StreamController { pub(crate) fn new(config: Config) -> Self { Self { config, - header: HeaderEmitter::new(), state: StreamState::new(), - active: false, finishing_after_drain: false, + header_emitted: false, } } - pub(crate) fn reset_headers_for_new_turn(&mut self) { - self.header.reset_for_new_turn(); - } - - pub(crate) fn is_write_cycle_active(&self) -> bool { - self.active - } - - pub(crate) fn clear_all(&mut self) { - self.state.clear(); - self.active = false; - self.finishing_after_drain = false; - // leave header state unchanged; caller decides when to reset - } - - /// Begin an answer stream. Does not emit header yet; it is emitted on first commit. - pub(crate) fn begin(&mut self, _sink: &impl HistorySink) { - // Starting a new stream cancels any pending finish-from-previous-stream animation. - if !self.active { - self.header.reset_for_stream(); - } - self.finishing_after_drain = false; - self.active = true; - } - /// Push a delta; if it contains a newline, commit completed lines and start animation. pub(crate) fn push_and_maybe_commit(&mut self, delta: &str, sink: &impl HistorySink) { - if !self.active { - return; - } let cfg = self.config.clone(); let state = &mut self.state; - // Record that at least one delta was received for this stream if !delta.is_empty() { state.has_seen_delta = true; } @@ -99,117 +67,54 @@ impl StreamController { } } - /// Finalize the active stream. If `flush_immediately` is true, drain and emit now. - pub(crate) fn finalize(&mut self, flush_immediately: bool, sink: &impl HistorySink) -> bool { - if !self.active { - return false; - } + /// Finalize the active stream. Drain and emit now. + pub(crate) fn finalize(&mut self, sink: &impl HistorySink) { let cfg = self.config.clone(); // Finalize collector first. let remaining = { let state = &mut self.state; state.collector.finalize_and_drain(&cfg) }; - if flush_immediately { - // Collect all output first to avoid emitting headers when there is no content. - let mut out_lines: Lines = Vec::new(); - { - let state = &mut self.state; - if !remaining.is_empty() { - state.enqueue(remaining); - } - let step = state.drain_all(); - out_lines.extend(step.history); - } - if !out_lines.is_empty() { - // Insert as a HistoryCell so display drops the header while transcript keeps it. - sink.insert_history_cell(Box::new(history_cell::AgentMessageCell::new( - out_lines, - self.header.maybe_emit_header(), - ))); - } - - // Cleanup - self.state.clear(); - // Allow a subsequent block in this turn to emit its header. - self.header.allow_reemit_in_turn(); - // Also clear the per-stream emitted flag so the header can render again. - self.header.reset_for_stream(); - self.active = false; - self.finishing_after_drain = false; - true - } else { + // Collect all output first to avoid emitting headers when there is no content. + let mut out_lines: Lines = Vec::new(); + { + let state = &mut self.state; if !remaining.is_empty() { - let state = &mut self.state; state.enqueue(remaining); } - // Spacer animated out - self.state.enqueue(vec![Line::from("")]); - self.finishing_after_drain = true; - sink.start_commit_animation(); - false + let step = state.drain_all(); + out_lines.extend(step.history); } + if !out_lines.is_empty() { + // Insert as a HistoryCell so display drops the header while transcript keeps it. + self.emit(sink, out_lines); + } + + // Cleanup + self.state.clear(); + self.finishing_after_drain = false; } /// Step animation: commit at most one queued line and handle end-of-drain cleanup. pub(crate) fn on_commit_tick(&mut self, sink: &impl HistorySink) -> bool { - if !self.active { - return false; - } let step = { self.state.step() }; if !step.history.is_empty() { - sink.insert_history_cell(Box::new(history_cell::AgentMessageCell::new( - step.history, - self.header.maybe_emit_header(), - ))); + self.emit(sink, step.history); } let is_idle = self.state.is_idle(); if is_idle { sink.stop_commit_animation(); - if self.finishing_after_drain { - // Reset and notify - self.state.clear(); - // Allow a subsequent block in this turn to emit its header. - self.header.allow_reemit_in_turn(); - // Also clear the per-stream emitted flag so the header can render again. - self.header.reset_for_stream(); - self.active = false; - self.finishing_after_drain = false; - return true; - } } false } - /// Apply a full final answer: replace queued content with only the remaining tail, - /// then finalize immediately and notify completion. - pub(crate) fn apply_final_answer(&mut self, message: &str, sink: &impl HistorySink) -> bool { - self.apply_full_final(message, sink) - } - - fn apply_full_final(&mut self, message: &str, sink: &impl HistorySink) -> bool { - self.begin(sink); - - { - let state = &mut self.state; - // Only inject the final full message if we have not seen any deltas for this stream. - // If deltas were received, rely on the collector's existing buffer to avoid duplication. - if !state.has_seen_delta && !message.is_empty() { - // normalize to end with newline - let mut msg = message.to_owned(); - if !msg.ends_with('\n') { - msg.push('\n'); - } - - // replace while preserving already committed count - let committed = state.collector.committed_count(); - state - .collector - .replace_with_and_mark_committed(&msg, committed); - } - } - self.finalize(true, sink) + fn emit(&mut self, sink: &impl HistorySink, lines: Vec>) { + sink.insert_history_cell(Box::new(history_cell::AgentMessageCell::new( + lines, + !self.header_emitted, + ))); + self.header_emitted = true; } } @@ -268,7 +173,6 @@ mod tests { let cfg = test_config(); let mut ctrl = StreamController::new(cfg.clone()); let sink = TestSink::new(); - ctrl.begin(&sink); // Exact deltas from the session log (section: Loose vs. tight list items) let deltas = vec![ @@ -347,7 +251,7 @@ mod tests { let _ = ctrl.on_commit_tick(&sink); } // Finalize and flush remaining lines now. - let _ = ctrl.finalize(true, &sink); + ctrl.finalize(&sink); // Flatten sink output and strip the header that the controller inserts (blank + "codex"). let mut flat: Vec> = Vec::new(); diff --git a/codex-rs/tui/src/streaming/mod.rs b/codex-rs/tui/src/streaming/mod.rs index 3fc81360..dbb260bc 100644 --- a/codex-rs/tui/src/streaming/mod.rs +++ b/codex-rs/tui/src/streaming/mod.rs @@ -34,41 +34,3 @@ impl StreamState { self.streamer.enqueue(lines) } } - -pub(crate) struct HeaderEmitter { - emitted_this_turn: bool, - emitted_in_stream: bool, -} - -impl HeaderEmitter { - pub(crate) fn new() -> Self { - Self { - emitted_this_turn: false, - emitted_in_stream: false, - } - } - - pub(crate) fn reset_for_new_turn(&mut self) { - self.emitted_this_turn = false; - self.emitted_in_stream = false; - } - - pub(crate) fn reset_for_stream(&mut self) { - self.emitted_in_stream = false; - } - - /// Allow emitting the header again within the current turn after a finalize. - pub(crate) fn allow_reemit_in_turn(&mut self) { - self.emitted_this_turn = false; - } - - pub(crate) fn maybe_emit_header(&mut self) -> bool { - if !self.emitted_in_stream && !self.emitted_this_turn { - self.emitted_in_stream = true; - self.emitted_this_turn = true; - true - } else { - false - } - } -}