Revert "Streaming markdown (#1920)" (#1981)

This reverts commit 2b7139859e.
This commit is contained in:
easong-openai
2025-08-07 18:38:39 -07:00
committed by GitHub
parent 2b7139859e
commit 52e12f2b6c
14 changed files with 481 additions and 1940 deletions

View File

@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::Arc;
@@ -46,14 +45,13 @@ use crate::bottom_pane::BottomPane;
use crate::bottom_pane::BottomPaneParams;
use crate::bottom_pane::CancellationEvent;
use crate::bottom_pane::InputResult;
use crate::exec_command::strip_bash_lc_and_escape;
use crate::history_cell::CommandOutput;
use crate::history_cell::HistoryCell;
use crate::history_cell::PatchEventType;
use crate::markdown_stream::MarkdownNewlineCollector;
use crate::markdown_stream::RenderedLineStreamer;
use crate::live_wrap::RowBuilder;
use crate::user_approval_widget::ApprovalRequest;
use codex_file_search::FileMatch;
use ratatui::style::Stylize;
struct RunningCommand {
command: Vec<String>,
@@ -70,21 +68,17 @@ pub(crate) struct ChatWidget<'a> {
initial_user_message: Option<UserMessage>,
total_token_usage: TokenUsage,
last_token_usage: TokenUsage,
// Newline-gated markdown streaming state
reasoning_collector: MarkdownNewlineCollector,
answer_collector: MarkdownNewlineCollector,
reasoning_streamer: RenderedLineStreamer,
answer_streamer: RenderedLineStreamer,
reasoning_buffer: String,
content_buffer: String,
// Buffer for streaming assistant answer text; we do not surface partial
// We wait for the final AgentMessage event and then emit the full text
// at once into scrollback so the history contains a single message.
answer_buffer: String,
running_commands: HashMap<String, RunningCommand>,
live_builder: RowBuilder,
current_stream: Option<StreamKind>,
// Track header emission per stream kind to avoid cross-stream duplication
answer_header_emitted: bool,
reasoning_header_emitted: bool,
stream_header_emitted: bool,
live_max_rows: u16,
task_complete_pending: bool,
finishing_after_drain: bool,
// Queue of interruptive UI events deferred during an active write cycle
interrupt_queue: VecDeque<QueuedInterrupt>,
}
struct UserMessage {
@@ -98,15 +92,6 @@ enum StreamKind {
Reasoning,
}
#[derive(Debug)]
enum QueuedInterrupt {
ExecApproval(String, ExecApprovalRequestEvent),
ApplyPatchApproval(String, ApplyPatchApprovalRequestEvent),
ExecBegin(ExecCommandBeginEvent),
McpBegin(McpToolCallBeginEvent),
McpEnd(McpToolCallEndEvent),
}
impl From<String> for UserMessage {
fn from(text: String) -> Self {
Self {
@@ -125,173 +110,19 @@ fn create_initial_user_message(text: String, image_paths: Vec<PathBuf>) -> Optio
}
impl ChatWidget<'_> {
fn header_line(kind: StreamKind) -> ratatui::text::Line<'static> {
use ratatui::style::Stylize;
match kind {
StreamKind::Reasoning => ratatui::text::Line::from("thinking".magenta().italic()),
StreamKind::Answer => ratatui::text::Line::from("codex".magenta().bold()),
}
}
fn line_is_blank(line: &ratatui::text::Line<'_>) -> bool {
if line.spans.is_empty() {
return true;
}
line.spans.iter().all(|s| s.content.trim().is_empty())
}
/// Periodic tick to commit at most one queued line to history with a small delay,
/// animating the output.
pub(crate) fn on_commit_tick(&mut self) {
// Choose the active streamer
let (streamer, kind_opt) = match self.current_stream {
Some(StreamKind::Reasoning) => {
(&mut self.reasoning_streamer, Some(StreamKind::Reasoning))
}
Some(StreamKind::Answer) => (&mut self.answer_streamer, Some(StreamKind::Answer)),
None => {
// No active stream. Nothing to animate.
return;
}
};
// Prepare header if needed
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
if let Some(k) = kind_opt {
let header_needed = match k {
StreamKind::Reasoning => !self.reasoning_header_emitted,
StreamKind::Answer => !self.answer_header_emitted,
};
if header_needed {
lines.push(Self::header_line(k));
match k {
StreamKind::Reasoning => self.reasoning_header_emitted = true,
StreamKind::Answer => self.answer_header_emitted = true,
}
}
}
let step = streamer.step(self.live_max_rows as usize);
if !step.history.is_empty() || !lines.is_empty() {
lines.extend(step.history);
self.app_event_tx.send(AppEvent::InsertHistory(lines));
}
// If streamer is now idle and there is no more active stream data, finalize state.
let is_idle = streamer.is_idle();
if is_idle {
// Stop animation ticks between bursts.
self.app_event_tx.send(AppEvent::StopCommitAnimation);
if self.finishing_after_drain {
// Final cleanup once fully drained at end-of-stream.
self.current_stream = None;
self.finishing_after_drain = false;
if self.task_complete_pending {
self.bottom_pane.set_task_running(false);
self.task_complete_pending = false;
}
// After the write cycle completes, release any queued interrupts.
self.flush_interrupt_queue();
}
}
}
fn is_write_cycle_active(&self) -> bool {
self.current_stream.is_some()
}
fn flush_interrupt_queue(&mut self) {
while let Some(q) = self.interrupt_queue.pop_front() {
match q {
QueuedInterrupt::ExecApproval(id, ev) => self.handle_exec_approval_now(id, ev),
QueuedInterrupt::ApplyPatchApproval(id, ev) => {
self.handle_apply_patch_approval_now(id, ev)
}
QueuedInterrupt::ExecBegin(ev) => self.handle_exec_begin_now(ev),
QueuedInterrupt::McpBegin(ev) => self.handle_mcp_begin_now(ev),
QueuedInterrupt::McpEnd(ev) => self.handle_mcp_end_now(ev),
}
}
}
fn handle_exec_approval_now(&mut self, id: String, ev: ExecApprovalRequestEvent) {
// Log a background summary immediately so the history is chronological.
let cmdline = strip_bash_lc_and_escape(&ev.command);
let text = format!(
"command requires approval:\n$ {cmdline}{reason}",
reason = ev
.reason
.as_ref()
.map(|r| format!("\n{r}"))
.unwrap_or_default()
);
self.add_to_history(HistoryCell::new_background_event(text));
let request = ApprovalRequest::Exec {
id,
command: ev.command,
cwd: ev.cwd,
reason: ev.reason,
};
self.bottom_pane.push_approval_request(request);
self.request_redraw();
}
fn handle_apply_patch_approval_now(&mut self, id: String, ev: ApplyPatchApprovalRequestEvent) {
self.add_to_history(HistoryCell::new_patch_event(
PatchEventType::ApprovalRequest,
ev.changes.clone(),
));
let request = ApprovalRequest::ApplyPatch {
id,
reason: ev.reason,
grant_root: ev.grant_root,
};
self.bottom_pane.push_approval_request(request);
self.request_redraw();
}
fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) {
// Ensure the status indicator is visible while the command runs.
self.bottom_pane
.update_status_text("running command".to_string());
self.running_commands.insert(
ev.call_id.clone(),
RunningCommand {
command: ev.command.clone(),
cwd: ev.cwd.clone(),
},
);
self.active_history_cell = Some(HistoryCell::new_active_exec_command(ev.command));
}
fn handle_mcp_begin_now(&mut self, ev: McpToolCallBeginEvent) {
self.add_to_history(HistoryCell::new_active_mcp_tool_call(ev.invocation));
}
fn handle_mcp_end_now(&mut self, ev: McpToolCallEndEvent) {
self.add_to_history(HistoryCell::new_completed_mcp_tool_call(
80,
ev.invocation,
ev.duration,
ev.result
.as_ref()
.map(|r| r.is_error.unwrap_or(false))
.unwrap_or(false),
ev.result,
));
}
fn interrupt_running_task(&mut self) {
if self.bottom_pane.is_task_running() {
self.active_history_cell = None;
self.bottom_pane.clear_ctrl_c_quit_hint();
self.submit_op(Op::Interrupt);
self.bottom_pane.set_task_running(false);
self.reasoning_collector.clear();
self.answer_collector.clear();
self.reasoning_streamer.clear();
self.answer_streamer.clear();
self.bottom_pane.clear_live_ring();
self.live_builder = RowBuilder::new(self.live_builder.width());
self.current_stream = None;
self.answer_header_emitted = false;
self.reasoning_header_emitted = false;
self.stream_header_emitted = false;
self.answer_buffer.clear();
self.reasoning_buffer.clear();
self.content_buffer.clear();
self.request_redraw();
}
}
@@ -306,7 +137,24 @@ impl ChatWidget<'_> {
])
.areas(area)
}
fn emit_stream_header(&mut self, kind: StreamKind) {
use ratatui::text::Line as RLine;
if self.stream_header_emitted {
return;
}
let header = match kind {
StreamKind::Reasoning => RLine::from("thinking".magenta().italic()),
StreamKind::Answer => RLine::from("codex".magenta().bold()),
};
self.app_event_tx
.send(AppEvent::InsertHistory(vec![header]));
self.stream_header_emitted = true;
}
fn finalize_active_stream(&mut self) {
if let Some(kind) = self.current_stream {
self.finalize_stream(kind);
}
}
pub(crate) fn new(
config: Config,
app_event_tx: AppEventSender,
@@ -368,18 +216,14 @@ impl ChatWidget<'_> {
),
total_token_usage: TokenUsage::default(),
last_token_usage: TokenUsage::default(),
reasoning_collector: MarkdownNewlineCollector::new(),
answer_collector: MarkdownNewlineCollector::new(),
reasoning_streamer: RenderedLineStreamer::new(),
answer_streamer: RenderedLineStreamer::new(),
reasoning_buffer: String::new(),
content_buffer: String::new(),
answer_buffer: String::new(),
running_commands: HashMap::new(),
live_builder: RowBuilder::new(80),
current_stream: None,
answer_header_emitted: false,
reasoning_header_emitted: false,
stream_header_emitted: false,
live_max_rows: 3,
task_complete_pending: false,
finishing_after_drain: false,
interrupt_queue: VecDeque::new(),
}
}
@@ -476,6 +320,7 @@ impl ChatWidget<'_> {
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
self.begin_stream(StreamKind::Answer);
self.answer_buffer.push_str(&delta);
self.stream_push_and_maybe_commit(&delta);
self.request_redraw();
}
@@ -483,6 +328,7 @@ impl ChatWidget<'_> {
// Stream CoT into the live pane; keep input visible and commit
// overflow rows incrementally to scrollback.
self.begin_stream(StreamKind::Reasoning);
self.reasoning_buffer.push_str(&delta);
self.stream_push_and_maybe_commit(&delta);
self.request_redraw();
}
@@ -496,6 +342,7 @@ impl ChatWidget<'_> {
}) => {
// Treat raw reasoning content the same as summarized reasoning for UI flow.
self.begin_stream(StreamKind::Reasoning);
self.reasoning_buffer.push_str(&delta);
self.stream_push_and_maybe_commit(&delta);
self.request_redraw();
}
@@ -515,18 +362,9 @@ impl ChatWidget<'_> {
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => {
// Defer clearing status/live ring until streaming fully completes.
let streaming_active = match self.current_stream {
Some(StreamKind::Reasoning) => !self.reasoning_streamer.is_idle(),
Some(StreamKind::Answer) => !self.answer_streamer.is_idle(),
None => false,
};
if streaming_active {
self.task_complete_pending = true;
} else {
self.bottom_pane.set_task_running(false);
self.request_redraw();
}
self.bottom_pane.set_task_running(false);
self.bottom_pane.clear_live_ring();
self.request_redraw();
}
EventMsg::TokenCount(token_usage) => {
self.total_token_usage = add_token_usage(&self.total_token_usage, &token_usage);
@@ -540,42 +378,83 @@ impl ChatWidget<'_> {
EventMsg::Error(ErrorEvent { message }) => {
self.add_to_history(HistoryCell::new_error_event(message.clone()));
self.bottom_pane.set_task_running(false);
self.reasoning_collector.clear();
self.answer_collector.clear();
self.reasoning_streamer.clear();
self.answer_streamer.clear();
self.bottom_pane.clear_live_ring();
self.live_builder = RowBuilder::new(self.live_builder.width());
self.current_stream = None;
self.answer_header_emitted = false;
self.reasoning_header_emitted = false;
self.stream_header_emitted = false;
self.answer_buffer.clear();
self.reasoning_buffer.clear();
self.content_buffer.clear();
self.request_redraw();
}
EventMsg::PlanUpdate(update) => {
// Commit plan updates directly to history (no status-line preview).
self.add_to_history(HistoryCell::new_plan_update(update));
}
EventMsg::ExecApprovalRequest(ev) => {
if self.is_write_cycle_active() {
self.interrupt_queue
.push_back(QueuedInterrupt::ExecApproval(id, ev));
} else {
self.handle_exec_approval_now(id, ev);
}
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id: _,
command,
cwd,
reason,
}) => {
self.finalize_active_stream();
let request = ApprovalRequest::Exec {
id,
command,
cwd,
reason,
};
self.bottom_pane.push_approval_request(request);
self.request_redraw();
}
EventMsg::ApplyPatchApprovalRequest(ev) => {
if self.is_write_cycle_active() {
self.interrupt_queue
.push_back(QueuedInterrupt::ApplyPatchApproval(id, ev));
} else {
self.handle_apply_patch_approval_now(id, ev);
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: _,
changes,
reason,
grant_root,
}) => {
self.finalize_active_stream();
// ------------------------------------------------------------------
// Before we even prompt the user for approval we surface the patch
// summary in the main conversation so that the dialog appears in a
// sensible chronological order:
// (1) codex → proposes patch (HistoryCell::PendingPatch)
// (2) UI → asks for approval (BottomPane)
// This mirrors how command execution is shown (command begins →
// approval dialog) and avoids surprising the user with a modal
// prompt before they have seen *what* is being requested.
// ------------------------------------------------------------------
self.add_to_history(HistoryCell::new_patch_event(
PatchEventType::ApprovalRequest,
changes,
));
// Now surface the approval request in the BottomPane as before.
let request = ApprovalRequest::ApplyPatch {
id,
reason,
grant_root,
};
self.bottom_pane.push_approval_request(request);
self.request_redraw();
}
EventMsg::ExecCommandBegin(ev) => {
if self.is_write_cycle_active() {
self.interrupt_queue
.push_back(QueuedInterrupt::ExecBegin(ev));
} else {
self.handle_exec_begin_now(ev);
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
command,
cwd,
}) => {
self.finalize_active_stream();
// Ensure the status indicator is visible while the command runs.
self.bottom_pane
.update_status_text("running command".to_string());
self.running_commands.insert(
call_id,
RunningCommand {
command: command.clone(),
cwd: cwd.clone(),
},
);
self.active_history_cell = Some(HistoryCell::new_active_exec_command(command));
}
EventMsg::ExecCommandOutputDelta(_) => {
// TODO
@@ -614,20 +493,29 @@ impl ChatWidget<'_> {
},
));
}
EventMsg::McpToolCallBegin(ev) => {
if self.is_write_cycle_active() {
self.interrupt_queue
.push_back(QueuedInterrupt::McpBegin(ev));
} else {
self.handle_mcp_begin_now(ev);
}
EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id: _,
invocation,
}) => {
self.finalize_active_stream();
self.add_to_history(HistoryCell::new_active_mcp_tool_call(invocation));
}
EventMsg::McpToolCallEnd(ev) => {
if self.is_write_cycle_active() {
self.interrupt_queue.push_back(QueuedInterrupt::McpEnd(ev));
} else {
self.handle_mcp_end_now(ev);
}
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id: _,
duration,
invocation,
result,
}) => {
self.add_to_history(HistoryCell::new_completed_mcp_tool_call(
80,
invocation,
duration,
result
.as_ref()
.map(|r| r.is_error.unwrap_or(false))
.unwrap_or(false),
result,
));
}
EventMsg::GetHistoryEntryResponse(event) => {
let codex_core::protocol::GetHistoryEntryResponseEvent {
@@ -747,98 +635,62 @@ impl ChatWidget<'_> {
}
}
#[cfg(test)]
impl ChatWidget<'_> {
/// Test-only control to tune the maximum rows shown in the live overlay.
/// Useful for verifying queue-head behavior without changing production defaults.
pub fn test_set_live_max_rows(&mut self, n: u16) {
self.live_max_rows = n;
}
}
impl ChatWidget<'_> {
fn begin_stream(&mut self, kind: StreamKind) {
if let Some(current) = self.current_stream {
if current != kind {
// Synchronously flush the previous stream to keep ordering sane.
let (collector, streamer) = match current {
StreamKind::Reasoning => {
(&mut self.reasoning_collector, &mut self.reasoning_streamer)
}
StreamKind::Answer => (&mut self.answer_collector, &mut self.answer_streamer),
};
let remaining = collector.finalize_and_drain(&self.config);
if !remaining.is_empty() {
streamer.enqueue(remaining);
}
let step = streamer.drain_all(self.live_max_rows as usize);
let prev_header_emitted = match current {
StreamKind::Reasoning => self.reasoning_header_emitted,
StreamKind::Answer => self.answer_header_emitted,
};
if !step.history.is_empty() || !prev_header_emitted {
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
if !prev_header_emitted {
lines.push(Self::header_line(current));
match current {
StreamKind::Reasoning => self.reasoning_header_emitted = true,
StreamKind::Answer => self.answer_header_emitted = true,
}
}
lines.extend(step.history);
// Ensure at most one blank separator after the flushed block.
if let Some(last) = lines.last() {
if !Self::line_is_blank(last) {
lines.push(ratatui::text::Line::from(""));
}
} else {
lines.push(ratatui::text::Line::from(""));
}
self.app_event_tx.send(AppEvent::InsertHistory(lines));
}
// Reset for new stream
self.current_stream = None;
self.finalize_stream(current);
}
}
if self.current_stream != Some(kind) {
// Only reset the header flag when switching FROM a different stream kind.
// If current_stream is None (e.g., transient idle), preserve header flags
// to avoid duplicate headers on re-entry into the same stream.
let prev = self.current_stream;
self.current_stream = Some(kind);
if prev.is_some() {
match kind {
StreamKind::Reasoning => self.reasoning_header_emitted = false,
StreamKind::Answer => self.answer_header_emitted = false,
}
}
self.stream_header_emitted = false;
// Clear any previous live content; we're starting a new stream.
self.live_builder = RowBuilder::new(self.live_builder.width());
// Ensure the waiting status is visible (composer replaced).
self.bottom_pane
.update_status_text("waiting for model".to_string());
// No live ring overlay; headers will be inserted with the first commit.
self.emit_stream_header(kind);
}
}
fn stream_push_and_maybe_commit(&mut self, delta: &str) {
// Newline-gated: only consider committing when a newline is present.
let (collector, streamer) = match self.current_stream {
Some(StreamKind::Reasoning) => {
(&mut self.reasoning_collector, &mut self.reasoning_streamer)
}
Some(StreamKind::Answer) => (&mut self.answer_collector, &mut self.answer_streamer),
None => return,
};
self.live_builder.push_fragment(delta);
collector.push_delta(delta);
if delta.contains('\n') {
let newly_completed = collector.commit_complete_lines(&self.config);
if !newly_completed.is_empty() {
streamer.enqueue(newly_completed);
// Start or continue commit animation.
self.app_event_tx.send(AppEvent::StartCommitAnimation);
// Commit overflow rows (small batches) while keeping the last N rows visible.
let drained = self
.live_builder
.drain_commit_ready(self.live_max_rows as usize);
if !drained.is_empty() {
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
if !self.stream_header_emitted {
match self.current_stream {
Some(StreamKind::Reasoning) => {
lines.push(ratatui::text::Line::from("thinking".magenta().italic()));
}
Some(StreamKind::Answer) => {
lines.push(ratatui::text::Line::from("codex".magenta().bold()));
}
None => {}
}
self.stream_header_emitted = true;
}
for r in drained {
lines.push(ratatui::text::Line::from(r.text));
}
self.app_event_tx.send(AppEvent::InsertHistory(lines));
}
// Update the live ring overlay lines (text-only, newest at bottom).
let rows = self
.live_builder
.display_rows()
.into_iter()
.map(|r| ratatui::text::Line::from(r.text))
.collect::<Vec<_>>();
self.bottom_pane
.set_live_ring_rows(self.live_max_rows, rows);
}
fn finalize_stream(&mut self, kind: StreamKind) {
@@ -846,21 +698,38 @@ impl ChatWidget<'_> {
// Nothing to do; either already finalized or not the active stream.
return;
}
let (collector, streamer) = match kind {
StreamKind::Reasoning => (&mut self.reasoning_collector, &mut self.reasoning_streamer),
StreamKind::Answer => (&mut self.answer_collector, &mut self.answer_streamer),
};
let remaining = collector.finalize_and_drain(&self.config);
if !remaining.is_empty() {
streamer.enqueue(remaining);
// Flush any partial line as a full row, then drain all remaining rows.
self.live_builder.end_line();
let remaining = self.live_builder.drain_rows();
// TODO: Re-add markdown rendering for assistant answers and reasoning.
// When finalizing, pass the accumulated text through `markdown::append_markdown`
// to build styled `Line<'static>` entries instead of raw plain text lines.
if !remaining.is_empty() || !self.stream_header_emitted {
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
if !self.stream_header_emitted {
match kind {
StreamKind::Reasoning => {
lines.push(ratatui::text::Line::from("thinking".magenta().italic()));
}
StreamKind::Answer => {
lines.push(ratatui::text::Line::from("codex".magenta().bold()));
}
}
self.stream_header_emitted = true;
}
for r in remaining {
lines.push(ratatui::text::Line::from(r.text));
}
// Close the block with a blank line for readability.
lines.push(ratatui::text::Line::from(""));
self.app_event_tx.send(AppEvent::InsertHistory(lines));
}
// Trailing blank spacer
streamer.enqueue(vec![ratatui::text::Line::from("")]);
// Mark that we should clear state after draining.
self.finishing_after_drain = true;
// Start animation to drain remaining lines. Final cleanup will occur when drained.
self.app_event_tx.send(AppEvent::StartCommitAnimation);
// Clear the live overlay and reset state for the next stream.
self.live_builder = RowBuilder::new(self.live_builder.width());
self.bottom_pane.clear_live_ring();
self.current_stream = None;
self.stream_header_emitted = false;
}
}
@@ -901,34 +770,3 @@ fn add_token_usage(current_usage: &TokenUsage, new_usage: &TokenUsage) -> TokenU
total_tokens: current_usage.total_tokens + new_usage.total_tokens,
}
}
#[cfg(test)]
mod chatwidget_helper_tests {
use super::*;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
use codex_core::config::ConfigOverrides;
use std::sync::mpsc::channel;
fn test_config() -> Config {
let overrides = ConfigOverrides {
cwd: std::env::current_dir().ok(),
..Default::default()
};
match Config::load_with_cli_overrides(vec![], overrides) {
Ok(c) => c,
Err(e) => panic!("load test config: {e}"),
}
}
#[tokio::test(flavor = "current_thread")]
async fn helpers_are_available_and_do_not_panic() {
let (tx_raw, _rx) = channel::<AppEvent>();
let tx = AppEventSender::new(tx_raw);
let cfg = test_config();
let mut w = ChatWidget::new(cfg, tx, None, Vec::new(), false);
// Adjust the live ring capacity (no-op for rendering) and ensure no panic.
w.test_set_live_max_rows(4);
}
}