Streaming markdown (#1920)
We wait until we have an entire newline, then format it with markdown and stream in to the UI. This reduces time to first token but is the right thing to do with our current rendering model IMO. Also lets us add word wrapping!
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -45,13 +46,14 @@ use crate::bottom_pane::BottomPane;
|
||||
use crate::bottom_pane::BottomPaneParams;
|
||||
use crate::bottom_pane::CancellationEvent;
|
||||
use crate::bottom_pane::InputResult;
|
||||
use crate::exec_command::strip_bash_lc_and_escape;
|
||||
use crate::history_cell::CommandOutput;
|
||||
use crate::history_cell::HistoryCell;
|
||||
use crate::history_cell::PatchEventType;
|
||||
use crate::live_wrap::RowBuilder;
|
||||
use crate::markdown_stream::MarkdownNewlineCollector;
|
||||
use crate::markdown_stream::RenderedLineStreamer;
|
||||
use crate::user_approval_widget::ApprovalRequest;
|
||||
use codex_file_search::FileMatch;
|
||||
use ratatui::style::Stylize;
|
||||
|
||||
struct RunningCommand {
|
||||
command: Vec<String>,
|
||||
@@ -68,17 +70,21 @@ pub(crate) struct ChatWidget<'a> {
|
||||
initial_user_message: Option<UserMessage>,
|
||||
total_token_usage: TokenUsage,
|
||||
last_token_usage: TokenUsage,
|
||||
reasoning_buffer: String,
|
||||
content_buffer: String,
|
||||
// Buffer for streaming assistant answer text; we do not surface partial
|
||||
// We wait for the final AgentMessage event and then emit the full text
|
||||
// at once into scrollback so the history contains a single message.
|
||||
answer_buffer: String,
|
||||
// Newline-gated markdown streaming state
|
||||
reasoning_collector: MarkdownNewlineCollector,
|
||||
answer_collector: MarkdownNewlineCollector,
|
||||
reasoning_streamer: RenderedLineStreamer,
|
||||
answer_streamer: RenderedLineStreamer,
|
||||
running_commands: HashMap<String, RunningCommand>,
|
||||
live_builder: RowBuilder,
|
||||
current_stream: Option<StreamKind>,
|
||||
stream_header_emitted: bool,
|
||||
// Track header emission per stream kind to avoid cross-stream duplication
|
||||
answer_header_emitted: bool,
|
||||
reasoning_header_emitted: bool,
|
||||
live_max_rows: u16,
|
||||
task_complete_pending: bool,
|
||||
finishing_after_drain: bool,
|
||||
// Queue of interruptive UI events deferred during an active write cycle
|
||||
interrupt_queue: VecDeque<QueuedInterrupt>,
|
||||
}
|
||||
|
||||
struct UserMessage {
|
||||
@@ -92,6 +98,15 @@ enum StreamKind {
|
||||
Reasoning,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum QueuedInterrupt {
|
||||
ExecApproval(String, ExecApprovalRequestEvent),
|
||||
ApplyPatchApproval(String, ApplyPatchApprovalRequestEvent),
|
||||
ExecBegin(ExecCommandBeginEvent),
|
||||
McpBegin(McpToolCallBeginEvent),
|
||||
McpEnd(McpToolCallEndEvent),
|
||||
}
|
||||
|
||||
impl From<String> for UserMessage {
|
||||
fn from(text: String) -> Self {
|
||||
Self {
|
||||
@@ -110,19 +125,173 @@ fn create_initial_user_message(text: String, image_paths: Vec<PathBuf>) -> Optio
|
||||
}
|
||||
|
||||
impl ChatWidget<'_> {
|
||||
fn header_line(kind: StreamKind) -> ratatui::text::Line<'static> {
|
||||
use ratatui::style::Stylize;
|
||||
match kind {
|
||||
StreamKind::Reasoning => ratatui::text::Line::from("thinking".magenta().italic()),
|
||||
StreamKind::Answer => ratatui::text::Line::from("codex".magenta().bold()),
|
||||
}
|
||||
}
|
||||
fn line_is_blank(line: &ratatui::text::Line<'_>) -> bool {
|
||||
if line.spans.is_empty() {
|
||||
return true;
|
||||
}
|
||||
line.spans.iter().all(|s| s.content.trim().is_empty())
|
||||
}
|
||||
/// Periodic tick to commit at most one queued line to history with a small delay,
|
||||
/// animating the output.
|
||||
pub(crate) fn on_commit_tick(&mut self) {
|
||||
// Choose the active streamer
|
||||
let (streamer, kind_opt) = match self.current_stream {
|
||||
Some(StreamKind::Reasoning) => {
|
||||
(&mut self.reasoning_streamer, Some(StreamKind::Reasoning))
|
||||
}
|
||||
Some(StreamKind::Answer) => (&mut self.answer_streamer, Some(StreamKind::Answer)),
|
||||
None => {
|
||||
// No active stream. Nothing to animate.
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Prepare header if needed
|
||||
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
|
||||
if let Some(k) = kind_opt {
|
||||
let header_needed = match k {
|
||||
StreamKind::Reasoning => !self.reasoning_header_emitted,
|
||||
StreamKind::Answer => !self.answer_header_emitted,
|
||||
};
|
||||
if header_needed {
|
||||
lines.push(Self::header_line(k));
|
||||
match k {
|
||||
StreamKind::Reasoning => self.reasoning_header_emitted = true,
|
||||
StreamKind::Answer => self.answer_header_emitted = true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let step = streamer.step(self.live_max_rows as usize);
|
||||
if !step.history.is_empty() || !lines.is_empty() {
|
||||
lines.extend(step.history);
|
||||
self.app_event_tx.send(AppEvent::InsertHistory(lines));
|
||||
}
|
||||
|
||||
// If streamer is now idle and there is no more active stream data, finalize state.
|
||||
let is_idle = streamer.is_idle();
|
||||
if is_idle {
|
||||
// Stop animation ticks between bursts.
|
||||
self.app_event_tx.send(AppEvent::StopCommitAnimation);
|
||||
if self.finishing_after_drain {
|
||||
// Final cleanup once fully drained at end-of-stream.
|
||||
self.current_stream = None;
|
||||
self.finishing_after_drain = false;
|
||||
if self.task_complete_pending {
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.task_complete_pending = false;
|
||||
}
|
||||
// After the write cycle completes, release any queued interrupts.
|
||||
self.flush_interrupt_queue();
|
||||
}
|
||||
}
|
||||
}
|
||||
fn is_write_cycle_active(&self) -> bool {
|
||||
self.current_stream.is_some()
|
||||
}
|
||||
|
||||
fn flush_interrupt_queue(&mut self) {
|
||||
while let Some(q) = self.interrupt_queue.pop_front() {
|
||||
match q {
|
||||
QueuedInterrupt::ExecApproval(id, ev) => self.handle_exec_approval_now(id, ev),
|
||||
QueuedInterrupt::ApplyPatchApproval(id, ev) => {
|
||||
self.handle_apply_patch_approval_now(id, ev)
|
||||
}
|
||||
QueuedInterrupt::ExecBegin(ev) => self.handle_exec_begin_now(ev),
|
||||
QueuedInterrupt::McpBegin(ev) => self.handle_mcp_begin_now(ev),
|
||||
QueuedInterrupt::McpEnd(ev) => self.handle_mcp_end_now(ev),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_exec_approval_now(&mut self, id: String, ev: ExecApprovalRequestEvent) {
|
||||
// Log a background summary immediately so the history is chronological.
|
||||
let cmdline = strip_bash_lc_and_escape(&ev.command);
|
||||
let text = format!(
|
||||
"command requires approval:\n$ {cmdline}{reason}",
|
||||
reason = ev
|
||||
.reason
|
||||
.as_ref()
|
||||
.map(|r| format!("\n{r}"))
|
||||
.unwrap_or_default()
|
||||
);
|
||||
self.add_to_history(HistoryCell::new_background_event(text));
|
||||
|
||||
let request = ApprovalRequest::Exec {
|
||||
id,
|
||||
command: ev.command,
|
||||
cwd: ev.cwd,
|
||||
reason: ev.reason,
|
||||
};
|
||||
self.bottom_pane.push_approval_request(request);
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
fn handle_apply_patch_approval_now(&mut self, id: String, ev: ApplyPatchApprovalRequestEvent) {
|
||||
self.add_to_history(HistoryCell::new_patch_event(
|
||||
PatchEventType::ApprovalRequest,
|
||||
ev.changes.clone(),
|
||||
));
|
||||
|
||||
let request = ApprovalRequest::ApplyPatch {
|
||||
id,
|
||||
reason: ev.reason,
|
||||
grant_root: ev.grant_root,
|
||||
};
|
||||
self.bottom_pane.push_approval_request(request);
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) {
|
||||
// Ensure the status indicator is visible while the command runs.
|
||||
self.bottom_pane
|
||||
.update_status_text("running command".to_string());
|
||||
self.running_commands.insert(
|
||||
ev.call_id.clone(),
|
||||
RunningCommand {
|
||||
command: ev.command.clone(),
|
||||
cwd: ev.cwd.clone(),
|
||||
},
|
||||
);
|
||||
self.active_history_cell = Some(HistoryCell::new_active_exec_command(ev.command));
|
||||
}
|
||||
|
||||
fn handle_mcp_begin_now(&mut self, ev: McpToolCallBeginEvent) {
|
||||
self.add_to_history(HistoryCell::new_active_mcp_tool_call(ev.invocation));
|
||||
}
|
||||
|
||||
fn handle_mcp_end_now(&mut self, ev: McpToolCallEndEvent) {
|
||||
self.add_to_history(HistoryCell::new_completed_mcp_tool_call(
|
||||
80,
|
||||
ev.invocation,
|
||||
ev.duration,
|
||||
ev.result
|
||||
.as_ref()
|
||||
.map(|r| r.is_error.unwrap_or(false))
|
||||
.unwrap_or(false),
|
||||
ev.result,
|
||||
));
|
||||
}
|
||||
fn interrupt_running_task(&mut self) {
|
||||
if self.bottom_pane.is_task_running() {
|
||||
self.active_history_cell = None;
|
||||
self.bottom_pane.clear_ctrl_c_quit_hint();
|
||||
self.submit_op(Op::Interrupt);
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.bottom_pane.clear_live_ring();
|
||||
self.live_builder = RowBuilder::new(self.live_builder.width());
|
||||
self.reasoning_collector.clear();
|
||||
self.answer_collector.clear();
|
||||
self.reasoning_streamer.clear();
|
||||
self.answer_streamer.clear();
|
||||
self.current_stream = None;
|
||||
self.stream_header_emitted = false;
|
||||
self.answer_buffer.clear();
|
||||
self.reasoning_buffer.clear();
|
||||
self.content_buffer.clear();
|
||||
self.answer_header_emitted = false;
|
||||
self.reasoning_header_emitted = false;
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
@@ -137,24 +306,7 @@ impl ChatWidget<'_> {
|
||||
])
|
||||
.areas(area)
|
||||
}
|
||||
fn emit_stream_header(&mut self, kind: StreamKind) {
|
||||
use ratatui::text::Line as RLine;
|
||||
if self.stream_header_emitted {
|
||||
return;
|
||||
}
|
||||
let header = match kind {
|
||||
StreamKind::Reasoning => RLine::from("thinking".magenta().italic()),
|
||||
StreamKind::Answer => RLine::from("codex".magenta().bold()),
|
||||
};
|
||||
self.app_event_tx
|
||||
.send(AppEvent::InsertHistory(vec![header]));
|
||||
self.stream_header_emitted = true;
|
||||
}
|
||||
fn finalize_active_stream(&mut self) {
|
||||
if let Some(kind) = self.current_stream {
|
||||
self.finalize_stream(kind);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
app_event_tx: AppEventSender,
|
||||
@@ -216,14 +368,18 @@ impl ChatWidget<'_> {
|
||||
),
|
||||
total_token_usage: TokenUsage::default(),
|
||||
last_token_usage: TokenUsage::default(),
|
||||
reasoning_buffer: String::new(),
|
||||
content_buffer: String::new(),
|
||||
answer_buffer: String::new(),
|
||||
reasoning_collector: MarkdownNewlineCollector::new(),
|
||||
answer_collector: MarkdownNewlineCollector::new(),
|
||||
reasoning_streamer: RenderedLineStreamer::new(),
|
||||
answer_streamer: RenderedLineStreamer::new(),
|
||||
running_commands: HashMap::new(),
|
||||
live_builder: RowBuilder::new(80),
|
||||
current_stream: None,
|
||||
stream_header_emitted: false,
|
||||
answer_header_emitted: false,
|
||||
reasoning_header_emitted: false,
|
||||
live_max_rows: 3,
|
||||
task_complete_pending: false,
|
||||
finishing_after_drain: false,
|
||||
interrupt_queue: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,7 +476,6 @@ impl ChatWidget<'_> {
|
||||
}
|
||||
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
|
||||
self.begin_stream(StreamKind::Answer);
|
||||
self.answer_buffer.push_str(&delta);
|
||||
self.stream_push_and_maybe_commit(&delta);
|
||||
self.request_redraw();
|
||||
}
|
||||
@@ -328,7 +483,6 @@ impl ChatWidget<'_> {
|
||||
// Stream CoT into the live pane; keep input visible and commit
|
||||
// overflow rows incrementally to scrollback.
|
||||
self.begin_stream(StreamKind::Reasoning);
|
||||
self.reasoning_buffer.push_str(&delta);
|
||||
self.stream_push_and_maybe_commit(&delta);
|
||||
self.request_redraw();
|
||||
}
|
||||
@@ -342,7 +496,6 @@ impl ChatWidget<'_> {
|
||||
}) => {
|
||||
// Treat raw reasoning content the same as summarized reasoning for UI flow.
|
||||
self.begin_stream(StreamKind::Reasoning);
|
||||
self.reasoning_buffer.push_str(&delta);
|
||||
self.stream_push_and_maybe_commit(&delta);
|
||||
self.request_redraw();
|
||||
}
|
||||
@@ -362,9 +515,18 @@ impl ChatWidget<'_> {
|
||||
EventMsg::TaskComplete(TaskCompleteEvent {
|
||||
last_agent_message: _,
|
||||
}) => {
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.bottom_pane.clear_live_ring();
|
||||
self.request_redraw();
|
||||
// Defer clearing status/live ring until streaming fully completes.
|
||||
let streaming_active = match self.current_stream {
|
||||
Some(StreamKind::Reasoning) => !self.reasoning_streamer.is_idle(),
|
||||
Some(StreamKind::Answer) => !self.answer_streamer.is_idle(),
|
||||
None => false,
|
||||
};
|
||||
if streaming_active {
|
||||
self.task_complete_pending = true;
|
||||
} else {
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
EventMsg::TokenCount(token_usage) => {
|
||||
self.total_token_usage = add_token_usage(&self.total_token_usage, &token_usage);
|
||||
@@ -378,83 +540,42 @@ impl ChatWidget<'_> {
|
||||
EventMsg::Error(ErrorEvent { message }) => {
|
||||
self.add_to_history(HistoryCell::new_error_event(message.clone()));
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.bottom_pane.clear_live_ring();
|
||||
self.live_builder = RowBuilder::new(self.live_builder.width());
|
||||
self.reasoning_collector.clear();
|
||||
self.answer_collector.clear();
|
||||
self.reasoning_streamer.clear();
|
||||
self.answer_streamer.clear();
|
||||
self.current_stream = None;
|
||||
self.stream_header_emitted = false;
|
||||
self.answer_buffer.clear();
|
||||
self.reasoning_buffer.clear();
|
||||
self.content_buffer.clear();
|
||||
self.answer_header_emitted = false;
|
||||
self.reasoning_header_emitted = false;
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::PlanUpdate(update) => {
|
||||
// Commit plan updates directly to history (no status-line preview).
|
||||
self.add_to_history(HistoryCell::new_plan_update(update));
|
||||
}
|
||||
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
call_id: _,
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
let request = ApprovalRequest::Exec {
|
||||
id,
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
};
|
||||
self.bottom_pane.push_approval_request(request);
|
||||
self.request_redraw();
|
||||
EventMsg::ExecApprovalRequest(ev) => {
|
||||
if self.is_write_cycle_active() {
|
||||
self.interrupt_queue
|
||||
.push_back(QueuedInterrupt::ExecApproval(id, ev));
|
||||
} else {
|
||||
self.handle_exec_approval_now(id, ev);
|
||||
}
|
||||
}
|
||||
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id: _,
|
||||
changes,
|
||||
reason,
|
||||
grant_root,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
// ------------------------------------------------------------------
|
||||
// Before we even prompt the user for approval we surface the patch
|
||||
// summary in the main conversation so that the dialog appears in a
|
||||
// sensible chronological order:
|
||||
// (1) codex → proposes patch (HistoryCell::PendingPatch)
|
||||
// (2) UI → asks for approval (BottomPane)
|
||||
// This mirrors how command execution is shown (command begins →
|
||||
// approval dialog) and avoids surprising the user with a modal
|
||||
// prompt before they have seen *what* is being requested.
|
||||
// ------------------------------------------------------------------
|
||||
self.add_to_history(HistoryCell::new_patch_event(
|
||||
PatchEventType::ApprovalRequest,
|
||||
changes,
|
||||
));
|
||||
|
||||
// Now surface the approval request in the BottomPane as before.
|
||||
let request = ApprovalRequest::ApplyPatch {
|
||||
id,
|
||||
reason,
|
||||
grant_root,
|
||||
};
|
||||
self.bottom_pane.push_approval_request(request);
|
||||
self.request_redraw();
|
||||
EventMsg::ApplyPatchApprovalRequest(ev) => {
|
||||
if self.is_write_cycle_active() {
|
||||
self.interrupt_queue
|
||||
.push_back(QueuedInterrupt::ApplyPatchApproval(id, ev));
|
||||
} else {
|
||||
self.handle_apply_patch_approval_now(id, ev);
|
||||
}
|
||||
}
|
||||
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
|
||||
call_id,
|
||||
command,
|
||||
cwd,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
// Ensure the status indicator is visible while the command runs.
|
||||
self.bottom_pane
|
||||
.update_status_text("running command".to_string());
|
||||
self.running_commands.insert(
|
||||
call_id,
|
||||
RunningCommand {
|
||||
command: command.clone(),
|
||||
cwd: cwd.clone(),
|
||||
},
|
||||
);
|
||||
self.active_history_cell = Some(HistoryCell::new_active_exec_command(command));
|
||||
EventMsg::ExecCommandBegin(ev) => {
|
||||
if self.is_write_cycle_active() {
|
||||
self.interrupt_queue
|
||||
.push_back(QueuedInterrupt::ExecBegin(ev));
|
||||
} else {
|
||||
self.handle_exec_begin_now(ev);
|
||||
}
|
||||
}
|
||||
EventMsg::ExecCommandOutputDelta(_) => {
|
||||
// TODO
|
||||
@@ -493,29 +614,20 @@ impl ChatWidget<'_> {
|
||||
},
|
||||
));
|
||||
}
|
||||
EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
|
||||
call_id: _,
|
||||
invocation,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
self.add_to_history(HistoryCell::new_active_mcp_tool_call(invocation));
|
||||
EventMsg::McpToolCallBegin(ev) => {
|
||||
if self.is_write_cycle_active() {
|
||||
self.interrupt_queue
|
||||
.push_back(QueuedInterrupt::McpBegin(ev));
|
||||
} else {
|
||||
self.handle_mcp_begin_now(ev);
|
||||
}
|
||||
}
|
||||
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
|
||||
call_id: _,
|
||||
duration,
|
||||
invocation,
|
||||
result,
|
||||
}) => {
|
||||
self.add_to_history(HistoryCell::new_completed_mcp_tool_call(
|
||||
80,
|
||||
invocation,
|
||||
duration,
|
||||
result
|
||||
.as_ref()
|
||||
.map(|r| r.is_error.unwrap_or(false))
|
||||
.unwrap_or(false),
|
||||
result,
|
||||
));
|
||||
EventMsg::McpToolCallEnd(ev) => {
|
||||
if self.is_write_cycle_active() {
|
||||
self.interrupt_queue.push_back(QueuedInterrupt::McpEnd(ev));
|
||||
} else {
|
||||
self.handle_mcp_end_now(ev);
|
||||
}
|
||||
}
|
||||
EventMsg::GetHistoryEntryResponse(event) => {
|
||||
let codex_core::protocol::GetHistoryEntryResponseEvent {
|
||||
@@ -635,62 +747,98 @@ impl ChatWidget<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl ChatWidget<'_> {
|
||||
/// Test-only control to tune the maximum rows shown in the live overlay.
|
||||
/// Useful for verifying queue-head behavior without changing production defaults.
|
||||
pub fn test_set_live_max_rows(&mut self, n: u16) {
|
||||
self.live_max_rows = n;
|
||||
}
|
||||
}
|
||||
|
||||
impl ChatWidget<'_> {
|
||||
fn begin_stream(&mut self, kind: StreamKind) {
|
||||
if let Some(current) = self.current_stream {
|
||||
if current != kind {
|
||||
self.finalize_stream(current);
|
||||
// Synchronously flush the previous stream to keep ordering sane.
|
||||
let (collector, streamer) = match current {
|
||||
StreamKind::Reasoning => {
|
||||
(&mut self.reasoning_collector, &mut self.reasoning_streamer)
|
||||
}
|
||||
StreamKind::Answer => (&mut self.answer_collector, &mut self.answer_streamer),
|
||||
};
|
||||
let remaining = collector.finalize_and_drain(&self.config);
|
||||
if !remaining.is_empty() {
|
||||
streamer.enqueue(remaining);
|
||||
}
|
||||
let step = streamer.drain_all(self.live_max_rows as usize);
|
||||
let prev_header_emitted = match current {
|
||||
StreamKind::Reasoning => self.reasoning_header_emitted,
|
||||
StreamKind::Answer => self.answer_header_emitted,
|
||||
};
|
||||
if !step.history.is_empty() || !prev_header_emitted {
|
||||
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
|
||||
if !prev_header_emitted {
|
||||
lines.push(Self::header_line(current));
|
||||
match current {
|
||||
StreamKind::Reasoning => self.reasoning_header_emitted = true,
|
||||
StreamKind::Answer => self.answer_header_emitted = true,
|
||||
}
|
||||
}
|
||||
lines.extend(step.history);
|
||||
// Ensure at most one blank separator after the flushed block.
|
||||
if let Some(last) = lines.last() {
|
||||
if !Self::line_is_blank(last) {
|
||||
lines.push(ratatui::text::Line::from(""));
|
||||
}
|
||||
} else {
|
||||
lines.push(ratatui::text::Line::from(""));
|
||||
}
|
||||
self.app_event_tx.send(AppEvent::InsertHistory(lines));
|
||||
}
|
||||
// Reset for new stream
|
||||
self.current_stream = None;
|
||||
}
|
||||
}
|
||||
|
||||
if self.current_stream != Some(kind) {
|
||||
// Only reset the header flag when switching FROM a different stream kind.
|
||||
// If current_stream is None (e.g., transient idle), preserve header flags
|
||||
// to avoid duplicate headers on re-entry into the same stream.
|
||||
let prev = self.current_stream;
|
||||
self.current_stream = Some(kind);
|
||||
self.stream_header_emitted = false;
|
||||
// Clear any previous live content; we're starting a new stream.
|
||||
self.live_builder = RowBuilder::new(self.live_builder.width());
|
||||
if prev.is_some() {
|
||||
match kind {
|
||||
StreamKind::Reasoning => self.reasoning_header_emitted = false,
|
||||
StreamKind::Answer => self.answer_header_emitted = false,
|
||||
}
|
||||
}
|
||||
// Ensure the waiting status is visible (composer replaced).
|
||||
self.bottom_pane
|
||||
.update_status_text("waiting for model".to_string());
|
||||
self.emit_stream_header(kind);
|
||||
// No live ring overlay; headers will be inserted with the first commit.
|
||||
}
|
||||
}
|
||||
|
||||
fn stream_push_and_maybe_commit(&mut self, delta: &str) {
|
||||
self.live_builder.push_fragment(delta);
|
||||
// Newline-gated: only consider committing when a newline is present.
|
||||
let (collector, streamer) = match self.current_stream {
|
||||
Some(StreamKind::Reasoning) => {
|
||||
(&mut self.reasoning_collector, &mut self.reasoning_streamer)
|
||||
}
|
||||
Some(StreamKind::Answer) => (&mut self.answer_collector, &mut self.answer_streamer),
|
||||
None => return,
|
||||
};
|
||||
|
||||
// Commit overflow rows (small batches) while keeping the last N rows visible.
|
||||
let drained = self
|
||||
.live_builder
|
||||
.drain_commit_ready(self.live_max_rows as usize);
|
||||
if !drained.is_empty() {
|
||||
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
|
||||
if !self.stream_header_emitted {
|
||||
match self.current_stream {
|
||||
Some(StreamKind::Reasoning) => {
|
||||
lines.push(ratatui::text::Line::from("thinking".magenta().italic()));
|
||||
}
|
||||
Some(StreamKind::Answer) => {
|
||||
lines.push(ratatui::text::Line::from("codex".magenta().bold()));
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
self.stream_header_emitted = true;
|
||||
collector.push_delta(delta);
|
||||
if delta.contains('\n') {
|
||||
let newly_completed = collector.commit_complete_lines(&self.config);
|
||||
if !newly_completed.is_empty() {
|
||||
streamer.enqueue(newly_completed);
|
||||
// Start or continue commit animation.
|
||||
self.app_event_tx.send(AppEvent::StartCommitAnimation);
|
||||
}
|
||||
for r in drained {
|
||||
lines.push(ratatui::text::Line::from(r.text));
|
||||
}
|
||||
self.app_event_tx.send(AppEvent::InsertHistory(lines));
|
||||
}
|
||||
|
||||
// Update the live ring overlay lines (text-only, newest at bottom).
|
||||
let rows = self
|
||||
.live_builder
|
||||
.display_rows()
|
||||
.into_iter()
|
||||
.map(|r| ratatui::text::Line::from(r.text))
|
||||
.collect::<Vec<_>>();
|
||||
self.bottom_pane
|
||||
.set_live_ring_rows(self.live_max_rows, rows);
|
||||
}
|
||||
|
||||
fn finalize_stream(&mut self, kind: StreamKind) {
|
||||
@@ -698,38 +846,21 @@ impl ChatWidget<'_> {
|
||||
// Nothing to do; either already finalized or not the active stream.
|
||||
return;
|
||||
}
|
||||
// Flush any partial line as a full row, then drain all remaining rows.
|
||||
self.live_builder.end_line();
|
||||
let remaining = self.live_builder.drain_rows();
|
||||
// TODO: Re-add markdown rendering for assistant answers and reasoning.
|
||||
// When finalizing, pass the accumulated text through `markdown::append_markdown`
|
||||
// to build styled `Line<'static>` entries instead of raw plain text lines.
|
||||
if !remaining.is_empty() || !self.stream_header_emitted {
|
||||
let mut lines: Vec<ratatui::text::Line<'static>> = Vec::new();
|
||||
if !self.stream_header_emitted {
|
||||
match kind {
|
||||
StreamKind::Reasoning => {
|
||||
lines.push(ratatui::text::Line::from("thinking".magenta().italic()));
|
||||
}
|
||||
StreamKind::Answer => {
|
||||
lines.push(ratatui::text::Line::from("codex".magenta().bold()));
|
||||
}
|
||||
}
|
||||
self.stream_header_emitted = true;
|
||||
}
|
||||
for r in remaining {
|
||||
lines.push(ratatui::text::Line::from(r.text));
|
||||
}
|
||||
// Close the block with a blank line for readability.
|
||||
lines.push(ratatui::text::Line::from(""));
|
||||
self.app_event_tx.send(AppEvent::InsertHistory(lines));
|
||||
}
|
||||
let (collector, streamer) = match kind {
|
||||
StreamKind::Reasoning => (&mut self.reasoning_collector, &mut self.reasoning_streamer),
|
||||
StreamKind::Answer => (&mut self.answer_collector, &mut self.answer_streamer),
|
||||
};
|
||||
|
||||
// Clear the live overlay and reset state for the next stream.
|
||||
self.live_builder = RowBuilder::new(self.live_builder.width());
|
||||
self.bottom_pane.clear_live_ring();
|
||||
self.current_stream = None;
|
||||
self.stream_header_emitted = false;
|
||||
let remaining = collector.finalize_and_drain(&self.config);
|
||||
if !remaining.is_empty() {
|
||||
streamer.enqueue(remaining);
|
||||
}
|
||||
// Trailing blank spacer
|
||||
streamer.enqueue(vec![ratatui::text::Line::from("")]);
|
||||
// Mark that we should clear state after draining.
|
||||
self.finishing_after_drain = true;
|
||||
// Start animation to drain remaining lines. Final cleanup will occur when drained.
|
||||
self.app_event_tx.send(AppEvent::StartCommitAnimation);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -770,3 +901,34 @@ fn add_token_usage(current_usage: &TokenUsage, new_usage: &TokenUsage) -> TokenU
|
||||
total_tokens: current_usage.total_tokens + new_usage.total_tokens,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod chatwidget_helper_tests {
|
||||
use super::*;
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
fn test_config() -> Config {
|
||||
let overrides = ConfigOverrides {
|
||||
cwd: std::env::current_dir().ok(),
|
||||
..Default::default()
|
||||
};
|
||||
match Config::load_with_cli_overrides(vec![], overrides) {
|
||||
Ok(c) => c,
|
||||
Err(e) => panic!("load test config: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn helpers_are_available_and_do_not_panic() {
|
||||
let (tx_raw, _rx) = channel::<AppEvent>();
|
||||
let tx = AppEventSender::new(tx_raw);
|
||||
let cfg = test_config();
|
||||
let mut w = ChatWidget::new(cfg, tx, None, Vec::new(), false);
|
||||
|
||||
// Adjust the live ring capacity (no-op for rendering) and ensure no panic.
|
||||
w.test_set_live_max_rows(4);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user