hide the status indicator when the answer stream starts (#4101)

This eliminates a "bounce" at the end of streaming where we hide the
status indicator at the end of the turn and the composer moves up two
lines.

Also, simplify streaming further by removing the HistorySink and
inverting control, and collapsing a few single-element structures.
This commit is contained in:
Jeremy Rose
2025-09-24 11:51:48 -07:00
committed by GitHub
parent addc946d13
commit 7bff8df10e
4 changed files with 67 additions and 169 deletions

View File

@@ -91,7 +91,6 @@ use self::agent::spawn_agent;
use self::agent::spawn_agent_from_existing;
mod session_header;
use self::session_header::SessionHeader;
use crate::streaming::controller::AppEventHistorySink;
use crate::streaming::controller::StreamController;
use std::path::Path;
@@ -252,11 +251,13 @@ fn create_initial_user_message(text: String, image_paths: Vec<PathBuf>) -> Optio
impl ChatWidget {
fn flush_answer_stream_with_separator(&mut self) {
if let Some(mut controller) = self.stream_controller.take() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
controller.finalize(&sink);
if let Some(mut controller) = self.stream_controller.take()
&& let Some(cell) = controller.finalize()
{
self.add_boxed_history(cell);
}
}
// --- Small event handlers ---
fn on_session_configured(&mut self, event: codex_core::protocol::SessionConfiguredEvent) {
self.bottom_pane
@@ -346,12 +347,8 @@ impl ChatWidget {
}
fn on_task_complete(&mut self, last_agent_message: Option<String>) {
// If a stream is currently active, finalize only that stream to flush any tail
// without emitting stray headers for other streams.
if let Some(mut controller) = self.stream_controller.take() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
controller.finalize(&sink);
}
// If a stream is currently active, finalize it.
self.flush_answer_stream_with_separator();
// Mark task stopped and request redraw now that all content is in history.
self.bottom_pane.set_task_running(false);
self.running_commands.clear();
@@ -554,14 +551,18 @@ impl ChatWidget {
self.add_to_history(history_cell::new_stream_error_event(message));
self.request_redraw();
}
/// Periodic tick to commit at most one queued line to history with a small delay,
/// animating the output.
pub(crate) fn on_commit_tick(&mut self) {
if let Some(controller) = self.stream_controller.as_mut() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
let finished = controller.on_commit_tick(&sink);
if finished {
self.handle_stream_finished();
let (cell, is_idle) = controller.on_commit_tick();
if let Some(cell) = cell {
self.bottom_pane.set_task_running(false);
self.add_boxed_history(cell);
}
if is_idle {
self.app_event_tx.send(AppEvent::StopCommitAnimation);
}
}
}
@@ -605,9 +606,10 @@ impl ChatWidget {
if self.stream_controller.is_none() {
self.stream_controller = Some(StreamController::new(self.config.clone()));
}
if let Some(controller) = self.stream_controller.as_mut() {
let sink = AppEventHistorySink(self.app_event_tx.clone());
controller.push_and_maybe_commit(&delta, &sink);
if let Some(controller) = self.stream_controller.as_mut()
&& controller.push(&delta)
{
self.app_event_tx.send(AppEvent::StartCommitAnimation);
}
self.request_redraw();
}

View File

@@ -1,5 +1,3 @@
use std::collections::VecDeque;
use codex_core::config::Config;
use ratatui::text::Line;
@@ -97,59 +95,6 @@ impl MarkdownStreamCollector {
}
}
pub(crate) struct StepResult {
pub history: Vec<Line<'static>>, // lines to insert into history this step
}
/// Streams already-rendered rows into history while computing the newest K
/// rows to show in a live overlay.
pub(crate) struct AnimatedLineStreamer {
queue: VecDeque<Line<'static>>,
}
impl AnimatedLineStreamer {
pub fn new() -> Self {
Self {
queue: VecDeque::new(),
}
}
pub fn clear(&mut self) {
self.queue.clear();
}
pub fn enqueue(&mut self, lines: Vec<Line<'static>>) {
for l in lines {
self.queue.push_back(l);
}
}
pub fn step(&mut self) -> StepResult {
let mut history = Vec::new();
// Move exactly one per tick to animate gradual insertion.
let burst = if self.queue.is_empty() { 0 } else { 1 };
for _ in 0..burst {
if let Some(l) = self.queue.pop_front() {
history.push(l);
}
}
StepResult { history }
}
pub fn drain_all(&mut self) -> StepResult {
let mut history = Vec::new();
while let Some(l) = self.queue.pop_front() {
history.push(l);
}
StepResult { history }
}
pub fn is_idle(&self) -> bool {
self.queue.is_empty()
}
}
#[cfg(test)]
pub(crate) fn simulate_stream_markdown_for_tests(
deltas: &[&str],

View File

@@ -1,36 +1,10 @@
use crate::history_cell;
use crate::history_cell::HistoryCell;
use crate::history_cell::{self};
use codex_core::config::Config;
use ratatui::text::Line;
use super::StreamState;
/// Sink for history insertions and animation control.
pub(crate) trait HistorySink {
fn insert_history_cell(&self, cell: Box<dyn HistoryCell>);
fn start_commit_animation(&self);
fn stop_commit_animation(&self);
}
/// Concrete sink backed by `AppEventSender`.
pub(crate) struct AppEventHistorySink(pub(crate) crate::app_event_sender::AppEventSender);
impl HistorySink for AppEventHistorySink {
fn insert_history_cell(&self, cell: Box<dyn crate::history_cell::HistoryCell>) {
self.0
.send(crate::app_event::AppEvent::InsertHistoryCell(cell))
}
fn start_commit_animation(&self) {
self.0
.send(crate::app_event::AppEvent::StartCommitAnimation)
}
fn stop_commit_animation(&self) {
self.0.send(crate::app_event::AppEvent::StopCommitAnimation)
}
}
type Lines = Vec<Line<'static>>;
/// Controller that manages newline-gated streaming, header emission, and
/// commit animation across streams.
pub(crate) struct StreamController {
@@ -51,7 +25,7 @@ impl StreamController {
}
/// Push a delta; if it contains a newline, commit completed lines and start animation.
pub(crate) fn push_and_maybe_commit(&mut self, delta: &str, sink: &impl HistorySink) {
pub(crate) fn push(&mut self, delta: &str) -> bool {
let cfg = self.config.clone();
let state = &mut self.state;
if !delta.is_empty() {
@@ -62,13 +36,14 @@ impl StreamController {
let newly_completed = state.collector.commit_complete_lines(&cfg);
if !newly_completed.is_empty() {
state.enqueue(newly_completed);
sink.start_commit_animation();
return true;
}
}
false
}
/// Finalize the active stream. Drain and emit now.
pub(crate) fn finalize(&mut self, sink: &impl HistorySink) {
pub(crate) fn finalize(&mut self) -> Option<Box<dyn HistoryCell>> {
let cfg = self.config.clone();
// Finalize collector first.
let remaining = {
@@ -76,45 +51,37 @@ impl StreamController {
state.collector.finalize_and_drain(&cfg)
};
// Collect all output first to avoid emitting headers when there is no content.
let mut out_lines: Lines = Vec::new();
let mut out_lines = Vec::new();
{
let state = &mut self.state;
if !remaining.is_empty() {
state.enqueue(remaining);
}
let step = state.drain_all();
out_lines.extend(step.history);
}
if !out_lines.is_empty() {
// Insert as a HistoryCell so display drops the header while transcript keeps it.
self.emit(sink, out_lines);
out_lines.extend(step);
}
// Cleanup
self.state.clear();
self.finishing_after_drain = false;
self.emit(out_lines)
}
/// Step animation: commit at most one queued line and handle end-of-drain cleanup.
pub(crate) fn on_commit_tick(&mut self, sink: &impl HistorySink) -> bool {
let step = { self.state.step() };
if !step.history.is_empty() {
self.emit(sink, step.history);
}
let is_idle = self.state.is_idle();
if is_idle {
sink.stop_commit_animation();
}
false
pub(crate) fn on_commit_tick(&mut self) -> (Option<Box<dyn HistoryCell>>, bool) {
let step = self.state.step();
(self.emit(step), self.state.is_idle())
}
fn emit(&mut self, sink: &impl HistorySink, lines: Vec<Line<'static>>) {
sink.insert_history_cell(Box::new(history_cell::AgentMessageCell::new(
lines,
!self.header_emitted,
)));
self.header_emitted = true;
fn emit(&mut self, lines: Vec<Line<'static>>) -> Option<Box<dyn HistoryCell>> {
if lines.is_empty() {
return None;
}
Some(Box::new(history_cell::AgentMessageCell::new(lines, {
let header_emitted = self.header_emitted;
self.header_emitted = true;
!header_emitted
})))
}
}
@@ -123,7 +90,6 @@ mod tests {
use super::*;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use std::cell::RefCell;
fn test_config() -> Config {
let overrides = ConfigOverrides {
@@ -136,25 +102,6 @@ mod tests {
}
}
struct TestSink {
pub lines: RefCell<Vec<Vec<Line<'static>>>>,
}
impl TestSink {
fn new() -> Self {
Self {
lines: RefCell::new(Vec::new()),
}
}
}
impl HistorySink for TestSink {
fn insert_history_cell(&self, cell: Box<dyn crate::history_cell::HistoryCell>) {
// For tests, store the transcript representation of the cell.
self.lines.borrow_mut().push(cell.transcript_lines());
}
fn start_commit_animation(&self) {}
fn stop_commit_animation(&self) {}
}
fn lines_to_plain_strings(lines: &[ratatui::text::Line<'_>]) -> Vec<String> {
lines
.iter()
@@ -172,7 +119,7 @@ mod tests {
fn controller_loose_vs_tight_with_commit_ticks_matches_full() {
let cfg = test_config();
let mut ctrl = StreamController::new(cfg.clone());
let sink = TestSink::new();
let mut lines = Vec::new();
// Exact deltas from the session log (section: Loose vs. tight list items)
let deltas = vec![
@@ -246,20 +193,21 @@ mod tests {
];
// Simulate streaming with a commit tick attempt after each delta.
for d in &deltas {
ctrl.push_and_maybe_commit(d, &sink);
let _ = ctrl.on_commit_tick(&sink);
}
// Finalize and flush remaining lines now.
ctrl.finalize(&sink);
// Flatten sink output and strip the header that the controller inserts (blank + "codex").
let mut flat: Vec<ratatui::text::Line<'static>> = Vec::new();
for batch in sink.lines.borrow().iter() {
for l in batch {
flat.push(l.clone());
for d in deltas.iter() {
ctrl.push(d);
while let (Some(cell), idle) = ctrl.on_commit_tick() {
lines.extend(cell.transcript_lines());
if idle {
break;
}
}
}
// Finalize and flush remaining lines now.
if let Some(cell) = ctrl.finalize() {
lines.extend(cell.transcript_lines());
}
let mut flat = lines;
// Drop leading blank and header line if present.
if !flat.is_empty() && lines_to_plain_strings(&[flat[0].clone()])[0].is_empty() {
flat.remove(0);

View File

@@ -1,10 +1,13 @@
use crate::markdown_stream::AnimatedLineStreamer;
use std::collections::VecDeque;
use ratatui::text::Line;
use crate::markdown_stream::MarkdownStreamCollector;
pub(crate) mod controller;
pub(crate) struct StreamState {
pub(crate) collector: MarkdownStreamCollector,
pub(crate) streamer: AnimatedLineStreamer,
queued_lines: VecDeque<Line<'static>>,
pub(crate) has_seen_delta: bool,
}
@@ -12,25 +15,25 @@ impl StreamState {
pub(crate) fn new() -> Self {
Self {
collector: MarkdownStreamCollector::new(),
streamer: AnimatedLineStreamer::new(),
queued_lines: VecDeque::new(),
has_seen_delta: false,
}
}
pub(crate) fn clear(&mut self) {
self.collector.clear();
self.streamer.clear();
self.queued_lines.clear();
self.has_seen_delta = false;
}
pub(crate) fn step(&mut self) -> crate::markdown_stream::StepResult {
self.streamer.step()
pub(crate) fn step(&mut self) -> Vec<Line<'static>> {
self.queued_lines.pop_front().into_iter().collect()
}
pub(crate) fn drain_all(&mut self) -> crate::markdown_stream::StepResult {
self.streamer.drain_all()
pub(crate) fn drain_all(&mut self) -> Vec<Line<'static>> {
self.queued_lines.drain(..).collect()
}
pub(crate) fn is_idle(&self) -> bool {
self.streamer.is_idle()
self.queued_lines.is_empty()
}
pub(crate) fn enqueue(&mut self, lines: Vec<ratatui::text::Line<'static>>) {
self.streamer.enqueue(lines)
pub(crate) fn enqueue(&mut self, lines: Vec<Line<'static>>) {
self.queued_lines.extend(lines);
}
}