diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 857030e3..3ac33534 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -91,7 +91,6 @@ use self::agent::spawn_agent; use self::agent::spawn_agent_from_existing; mod session_header; use self::session_header::SessionHeader; -use crate::streaming::controller::AppEventHistorySink; use crate::streaming::controller::StreamController; use std::path::Path; @@ -252,11 +251,13 @@ fn create_initial_user_message(text: String, image_paths: Vec) -> Optio impl ChatWidget { fn flush_answer_stream_with_separator(&mut self) { - if let Some(mut controller) = self.stream_controller.take() { - let sink = AppEventHistorySink(self.app_event_tx.clone()); - controller.finalize(&sink); + if let Some(mut controller) = self.stream_controller.take() + && let Some(cell) = controller.finalize() + { + self.add_boxed_history(cell); } } + // --- Small event handlers --- fn on_session_configured(&mut self, event: codex_core::protocol::SessionConfiguredEvent) { self.bottom_pane @@ -346,12 +347,8 @@ impl ChatWidget { } fn on_task_complete(&mut self, last_agent_message: Option) { - // If a stream is currently active, finalize only that stream to flush any tail - // without emitting stray headers for other streams. - if let Some(mut controller) = self.stream_controller.take() { - let sink = AppEventHistorySink(self.app_event_tx.clone()); - controller.finalize(&sink); - } + // If a stream is currently active, finalize it. + self.flush_answer_stream_with_separator(); // Mark task stopped and request redraw now that all content is in history. self.bottom_pane.set_task_running(false); self.running_commands.clear(); @@ -554,14 +551,18 @@ impl ChatWidget { self.add_to_history(history_cell::new_stream_error_event(message)); self.request_redraw(); } + /// Periodic tick to commit at most one queued line to history with a small delay, /// animating the output. pub(crate) fn on_commit_tick(&mut self) { if let Some(controller) = self.stream_controller.as_mut() { - let sink = AppEventHistorySink(self.app_event_tx.clone()); - let finished = controller.on_commit_tick(&sink); - if finished { - self.handle_stream_finished(); + let (cell, is_idle) = controller.on_commit_tick(); + if let Some(cell) = cell { + self.bottom_pane.set_task_running(false); + self.add_boxed_history(cell); + } + if is_idle { + self.app_event_tx.send(AppEvent::StopCommitAnimation); } } } @@ -605,9 +606,10 @@ impl ChatWidget { if self.stream_controller.is_none() { self.stream_controller = Some(StreamController::new(self.config.clone())); } - if let Some(controller) = self.stream_controller.as_mut() { - let sink = AppEventHistorySink(self.app_event_tx.clone()); - controller.push_and_maybe_commit(&delta, &sink); + if let Some(controller) = self.stream_controller.as_mut() + && controller.push(&delta) + { + self.app_event_tx.send(AppEvent::StartCommitAnimation); } self.request_redraw(); } diff --git a/codex-rs/tui/src/markdown_stream.rs b/codex-rs/tui/src/markdown_stream.rs index 51331887..7c439337 100644 --- a/codex-rs/tui/src/markdown_stream.rs +++ b/codex-rs/tui/src/markdown_stream.rs @@ -1,5 +1,3 @@ -use std::collections::VecDeque; - use codex_core::config::Config; use ratatui::text::Line; @@ -97,59 +95,6 @@ impl MarkdownStreamCollector { } } -pub(crate) struct StepResult { - pub history: Vec>, // lines to insert into history this step -} - -/// Streams already-rendered rows into history while computing the newest K -/// rows to show in a live overlay. -pub(crate) struct AnimatedLineStreamer { - queue: VecDeque>, -} - -impl AnimatedLineStreamer { - pub fn new() -> Self { - Self { - queue: VecDeque::new(), - } - } - - pub fn clear(&mut self) { - self.queue.clear(); - } - - pub fn enqueue(&mut self, lines: Vec>) { - for l in lines { - self.queue.push_back(l); - } - } - - pub fn step(&mut self) -> StepResult { - let mut history = Vec::new(); - // Move exactly one per tick to animate gradual insertion. - let burst = if self.queue.is_empty() { 0 } else { 1 }; - for _ in 0..burst { - if let Some(l) = self.queue.pop_front() { - history.push(l); - } - } - - StepResult { history } - } - - pub fn drain_all(&mut self) -> StepResult { - let mut history = Vec::new(); - while let Some(l) = self.queue.pop_front() { - history.push(l); - } - StepResult { history } - } - - pub fn is_idle(&self) -> bool { - self.queue.is_empty() - } -} - #[cfg(test)] pub(crate) fn simulate_stream_markdown_for_tests( deltas: &[&str], diff --git a/codex-rs/tui/src/streaming/controller.rs b/codex-rs/tui/src/streaming/controller.rs index 89b7ab95..ca22bed5 100644 --- a/codex-rs/tui/src/streaming/controller.rs +++ b/codex-rs/tui/src/streaming/controller.rs @@ -1,36 +1,10 @@ -use crate::history_cell; use crate::history_cell::HistoryCell; +use crate::history_cell::{self}; use codex_core::config::Config; use ratatui::text::Line; use super::StreamState; -/// Sink for history insertions and animation control. -pub(crate) trait HistorySink { - fn insert_history_cell(&self, cell: Box); - fn start_commit_animation(&self); - fn stop_commit_animation(&self); -} - -/// Concrete sink backed by `AppEventSender`. -pub(crate) struct AppEventHistorySink(pub(crate) crate::app_event_sender::AppEventSender); - -impl HistorySink for AppEventHistorySink { - fn insert_history_cell(&self, cell: Box) { - self.0 - .send(crate::app_event::AppEvent::InsertHistoryCell(cell)) - } - fn start_commit_animation(&self) { - self.0 - .send(crate::app_event::AppEvent::StartCommitAnimation) - } - fn stop_commit_animation(&self) { - self.0.send(crate::app_event::AppEvent::StopCommitAnimation) - } -} - -type Lines = Vec>; - /// Controller that manages newline-gated streaming, header emission, and /// commit animation across streams. pub(crate) struct StreamController { @@ -51,7 +25,7 @@ impl StreamController { } /// Push a delta; if it contains a newline, commit completed lines and start animation. - pub(crate) fn push_and_maybe_commit(&mut self, delta: &str, sink: &impl HistorySink) { + pub(crate) fn push(&mut self, delta: &str) -> bool { let cfg = self.config.clone(); let state = &mut self.state; if !delta.is_empty() { @@ -62,13 +36,14 @@ impl StreamController { let newly_completed = state.collector.commit_complete_lines(&cfg); if !newly_completed.is_empty() { state.enqueue(newly_completed); - sink.start_commit_animation(); + return true; } } + false } /// Finalize the active stream. Drain and emit now. - pub(crate) fn finalize(&mut self, sink: &impl HistorySink) { + pub(crate) fn finalize(&mut self) -> Option> { let cfg = self.config.clone(); // Finalize collector first. let remaining = { @@ -76,45 +51,37 @@ impl StreamController { state.collector.finalize_and_drain(&cfg) }; // Collect all output first to avoid emitting headers when there is no content. - let mut out_lines: Lines = Vec::new(); + let mut out_lines = Vec::new(); { let state = &mut self.state; if !remaining.is_empty() { state.enqueue(remaining); } let step = state.drain_all(); - out_lines.extend(step.history); - } - if !out_lines.is_empty() { - // Insert as a HistoryCell so display drops the header while transcript keeps it. - self.emit(sink, out_lines); + out_lines.extend(step); } // Cleanup self.state.clear(); self.finishing_after_drain = false; + self.emit(out_lines) } /// Step animation: commit at most one queued line and handle end-of-drain cleanup. - pub(crate) fn on_commit_tick(&mut self, sink: &impl HistorySink) -> bool { - let step = { self.state.step() }; - if !step.history.is_empty() { - self.emit(sink, step.history); - } - - let is_idle = self.state.is_idle(); - if is_idle { - sink.stop_commit_animation(); - } - false + pub(crate) fn on_commit_tick(&mut self) -> (Option>, bool) { + let step = self.state.step(); + (self.emit(step), self.state.is_idle()) } - fn emit(&mut self, sink: &impl HistorySink, lines: Vec>) { - sink.insert_history_cell(Box::new(history_cell::AgentMessageCell::new( - lines, - !self.header_emitted, - ))); - self.header_emitted = true; + fn emit(&mut self, lines: Vec>) -> Option> { + if lines.is_empty() { + return None; + } + Some(Box::new(history_cell::AgentMessageCell::new(lines, { + let header_emitted = self.header_emitted; + self.header_emitted = true; + !header_emitted + }))) } } @@ -123,7 +90,6 @@ mod tests { use super::*; use codex_core::config::Config; use codex_core::config::ConfigOverrides; - use std::cell::RefCell; fn test_config() -> Config { let overrides = ConfigOverrides { @@ -136,25 +102,6 @@ mod tests { } } - struct TestSink { - pub lines: RefCell>>>, - } - impl TestSink { - fn new() -> Self { - Self { - lines: RefCell::new(Vec::new()), - } - } - } - impl HistorySink for TestSink { - fn insert_history_cell(&self, cell: Box) { - // For tests, store the transcript representation of the cell. - self.lines.borrow_mut().push(cell.transcript_lines()); - } - fn start_commit_animation(&self) {} - fn stop_commit_animation(&self) {} - } - fn lines_to_plain_strings(lines: &[ratatui::text::Line<'_>]) -> Vec { lines .iter() @@ -172,7 +119,7 @@ mod tests { fn controller_loose_vs_tight_with_commit_ticks_matches_full() { let cfg = test_config(); let mut ctrl = StreamController::new(cfg.clone()); - let sink = TestSink::new(); + let mut lines = Vec::new(); // Exact deltas from the session log (section: Loose vs. tight list items) let deltas = vec![ @@ -246,20 +193,21 @@ mod tests { ]; // Simulate streaming with a commit tick attempt after each delta. - for d in &deltas { - ctrl.push_and_maybe_commit(d, &sink); - let _ = ctrl.on_commit_tick(&sink); - } - // Finalize and flush remaining lines now. - ctrl.finalize(&sink); - - // Flatten sink output and strip the header that the controller inserts (blank + "codex"). - let mut flat: Vec> = Vec::new(); - for batch in sink.lines.borrow().iter() { - for l in batch { - flat.push(l.clone()); + for d in deltas.iter() { + ctrl.push(d); + while let (Some(cell), idle) = ctrl.on_commit_tick() { + lines.extend(cell.transcript_lines()); + if idle { + break; + } } } + // Finalize and flush remaining lines now. + if let Some(cell) = ctrl.finalize() { + lines.extend(cell.transcript_lines()); + } + + let mut flat = lines; // Drop leading blank and header line if present. if !flat.is_empty() && lines_to_plain_strings(&[flat[0].clone()])[0].is_empty() { flat.remove(0); diff --git a/codex-rs/tui/src/streaming/mod.rs b/codex-rs/tui/src/streaming/mod.rs index dbb260bc..e048b399 100644 --- a/codex-rs/tui/src/streaming/mod.rs +++ b/codex-rs/tui/src/streaming/mod.rs @@ -1,10 +1,13 @@ -use crate::markdown_stream::AnimatedLineStreamer; +use std::collections::VecDeque; + +use ratatui::text::Line; + use crate::markdown_stream::MarkdownStreamCollector; pub(crate) mod controller; pub(crate) struct StreamState { pub(crate) collector: MarkdownStreamCollector, - pub(crate) streamer: AnimatedLineStreamer, + queued_lines: VecDeque>, pub(crate) has_seen_delta: bool, } @@ -12,25 +15,25 @@ impl StreamState { pub(crate) fn new() -> Self { Self { collector: MarkdownStreamCollector::new(), - streamer: AnimatedLineStreamer::new(), + queued_lines: VecDeque::new(), has_seen_delta: false, } } pub(crate) fn clear(&mut self) { self.collector.clear(); - self.streamer.clear(); + self.queued_lines.clear(); self.has_seen_delta = false; } - pub(crate) fn step(&mut self) -> crate::markdown_stream::StepResult { - self.streamer.step() + pub(crate) fn step(&mut self) -> Vec> { + self.queued_lines.pop_front().into_iter().collect() } - pub(crate) fn drain_all(&mut self) -> crate::markdown_stream::StepResult { - self.streamer.drain_all() + pub(crate) fn drain_all(&mut self) -> Vec> { + self.queued_lines.drain(..).collect() } pub(crate) fn is_idle(&self) -> bool { - self.streamer.is_idle() + self.queued_lines.is_empty() } - pub(crate) fn enqueue(&mut self, lines: Vec>) { - self.streamer.enqueue(lines) + pub(crate) fn enqueue(&mut self, lines: Vec>) { + self.queued_lines.extend(lines); } }