From 643ab1f582a248a9f995bf94110d28fe9677d387 Mon Sep 17 00:00:00 2001 From: aibrahim-oai Date: Wed, 16 Jul 2025 22:26:31 -0700 Subject: [PATCH] Add streaming to exec and tui (#1594) Added support for streaming in `tui` Added support for streaming in `exec` https://github.com/user-attachments/assets/4215892e-d940-452c-a1d0-416ed0cf14eb --- codex-rs/exec/src/event_processor.rs | 77 ++++++++++++++----- codex-rs/exec/src/lib.rs | 3 +- codex-rs/tui/src/app.rs | 2 + codex-rs/tui/src/chatwidget.rs | 53 ++++++++++--- .../tui/src/conversation_history_widget.rs | 46 ++++++++++- 5 files changed, 149 insertions(+), 32 deletions(-) diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 2a7c4c62..5ab09994 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -23,6 +23,7 @@ use owo_colors::OwoColorize; use owo_colors::Style; use shlex::try_join; use std::collections::HashMap; +use std::io::Write; use std::time::Instant; /// This should be configurable. When used in CI, users may not want to impose @@ -52,10 +53,12 @@ pub(crate) struct EventProcessor { /// Whether to include `AgentReasoning` events in the output. show_agent_reasoning: bool, + answer_started: bool, + reasoning_started: bool, } impl EventProcessor { - pub(crate) fn create_with_ansi(with_ansi: bool, show_agent_reasoning: bool) -> Self { + pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self { let call_id_to_command = HashMap::new(); let call_id_to_patch = HashMap::new(); let call_id_to_tool_call = HashMap::new(); @@ -72,7 +75,9 @@ impl EventProcessor { green: Style::new().green(), cyan: Style::new().cyan(), call_id_to_tool_call, - show_agent_reasoning, + show_agent_reasoning: !config.hide_agent_reasoning, + answer_started: false, + reasoning_started: false, } } else { Self { @@ -86,7 +91,9 @@ impl EventProcessor { green: Style::new(), cyan: Style::new(), call_id_to_tool_call, - show_agent_reasoning, + show_agent_reasoning: !config.hide_agent_reasoning, + answer_started: false, + reasoning_started: false, } } } @@ -186,18 +193,45 @@ impl EventProcessor { EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => { ts_println!(self, "tokens used: {total_tokens}"); } - EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the CLI + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => { + if !self.answer_started { + ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta)); + self.answer_started = true; + } + print!("{delta}"); + #[allow(clippy::expect_used)] + std::io::stdout().flush().expect("could not flush stdout"); } - EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the CLI + EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { + if !self.show_agent_reasoning { + return; + } + if !self.reasoning_started { + ts_println!( + self, + "{}\n", + "thinking".style(self.italic).style(self.magenta), + ); + self.reasoning_started = true; + } + print!("{delta}"); + #[allow(clippy::expect_used)] + std::io::stdout().flush().expect("could not flush stdout"); } EventMsg::AgentMessage(AgentMessageEvent { message }) => { - ts_println!( - self, - "{}\n{message}", - "codex".style(self.bold).style(self.magenta) - ); + // if answer_started is false, this means we haven't received any + // delta. Thus, we need to print the message as a new answer. + if !self.answer_started { + ts_println!( + self, + "{}\n{}", + "codex".style(self.italic).style(self.magenta), + message, + ); + } else { + println!(); + self.answer_started = false; + } } EventMsg::ExecCommandBegin(ExecCommandBeginEvent { call_id, @@ -351,7 +385,7 @@ impl EventProcessor { ); // Pretty-print the patch summary with colored diff markers so - // it’s easy to scan in the terminal output. + // it's easy to scan in the terminal output. for (path, change) in changes.iter() { match change { FileChange::Add { content } => { @@ -449,12 +483,17 @@ impl EventProcessor { } EventMsg::AgentReasoning(agent_reasoning_event) => { if self.show_agent_reasoning { - ts_println!( - self, - "{}\n{}", - "thinking".style(self.italic).style(self.magenta), - agent_reasoning_event.text - ); + if !self.reasoning_started { + ts_println!( + self, + "{}\n{}", + "codex".style(self.italic).style(self.magenta), + agent_reasoning_event.text, + ); + } else { + println!(); + self.reasoning_started = false; + } } } EventMsg::SessionConfigured(session_configured_event) => { diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 44dddd4d..afefed1a 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -115,8 +115,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any }; let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?; - let mut event_processor = - EventProcessor::create_with_ansi(stdout_with_ansi, !config.hide_agent_reasoning); + let mut event_processor = EventProcessor::create_with_ansi(stdout_with_ansi, &config); // Print the effective configuration and prompt so users can see what Codex // is using. event_processor.print_config_summary(&config, &prompt); diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 33297ad3..88325040 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -297,6 +297,8 @@ impl<'a> App<'a> { } fn draw_next_frame(&mut self, terminal: &mut tui::Tui) -> Result<()> { + // TODO: add a throttle to avoid redrawing too often + match &mut self.app_state { AppState::Chat { widget } => { terminal.draw(|frame| frame.render_widget_ref(&**widget, frame.area()))?; diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 28014c6e..860439ff 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -51,6 +51,8 @@ pub(crate) struct ChatWidget<'a> { config: Config, initial_user_message: Option, token_usage: TokenUsage, + reasoning_buffer: String, + answer_buffer: String, } #[derive(Clone, Copy, Eq, PartialEq)] @@ -137,6 +139,8 @@ impl ChatWidget<'_> { initial_images, ), token_usage: TokenUsage::default(), + reasoning_buffer: String::new(), + answer_buffer: String::new(), } } @@ -242,16 +246,51 @@ impl ChatWidget<'_> { self.request_redraw(); } EventMsg::AgentMessage(AgentMessageEvent { message }) => { + // if the answer buffer is empty, this means we haven't received any + // delta. Thus, we need to print the message as a new answer. + if self.answer_buffer.is_empty() { + self.conversation_history + .add_agent_message(&self.config, message); + } else { + self.conversation_history + .replace_prev_agent_message(&self.config, message); + } + self.answer_buffer.clear(); + self.request_redraw(); + } + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => { + if self.answer_buffer.is_empty() { + self.conversation_history + .add_agent_message(&self.config, "".to_string()); + } + self.answer_buffer.push_str(&delta.clone()); self.conversation_history - .add_agent_message(&self.config, message); + .replace_prev_agent_message(&self.config, self.answer_buffer.clone()); + self.request_redraw(); + } + EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { + if self.reasoning_buffer.is_empty() { + self.conversation_history + .add_agent_reasoning(&self.config, "".to_string()); + } + self.reasoning_buffer.push_str(&delta.clone()); + self.conversation_history + .replace_prev_agent_reasoning(&self.config, self.reasoning_buffer.clone()); self.request_redraw(); } EventMsg::AgentReasoning(AgentReasoningEvent { text }) => { - if !self.config.hide_agent_reasoning { + // if the reasoning buffer is empty, this means we haven't received any + // delta. Thus, we need to print the message as a new reasoning. + if self.reasoning_buffer.is_empty() { self.conversation_history - .add_agent_reasoning(&self.config, text); - self.request_redraw(); + .add_agent_reasoning(&self.config, "".to_string()); + } else { + // else, we rerender one last time. + self.conversation_history + .replace_prev_agent_reasoning(&self.config, text); } + self.reasoning_buffer.clear(); + self.request_redraw(); } EventMsg::TaskStarted => { self.bottom_pane.clear_ctrl_c_quit_hint(); @@ -377,12 +416,6 @@ impl ChatWidget<'_> { self.bottom_pane .on_history_entry_response(log_id, offset, entry.map(|e| e.text)); } - EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the TUI - } - EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta: _ }) => { - // TODO: think how we want to support this in the TUI - } event => { self.conversation_history .add_background_event(format!("{event:?}")); diff --git a/codex-rs/tui/src/conversation_history_widget.rs b/codex-rs/tui/src/conversation_history_widget.rs index c0e5031d..01a8dc68 100644 --- a/codex-rs/tui/src/conversation_history_widget.rs +++ b/codex-rs/tui/src/conversation_history_widget.rs @@ -202,6 +202,14 @@ impl ConversationHistoryWidget { self.add_to_history(HistoryCell::new_agent_reasoning(config, text)); } + pub fn replace_prev_agent_reasoning(&mut self, config: &Config, text: String) { + self.replace_last_agent_reasoning(config, text); + } + + pub fn replace_prev_agent_message(&mut self, config: &Config, text: String) { + self.replace_last_agent_message(config, text); + } + pub fn add_background_event(&mut self, message: String) { self.add_to_history(HistoryCell::new_background_event(message)); } @@ -249,6 +257,42 @@ impl ConversationHistoryWidget { }); } + pub fn replace_last_agent_reasoning(&mut self, config: &Config, text: String) { + if let Some(idx) = self + .entries + .iter() + .rposition(|entry| matches!(entry.cell, HistoryCell::AgentReasoning { .. })) + { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_reasoning(config, text); + let height = if width > 0 { + entry.cell.height(width) + } else { + 0 + }; + entry.line_count.set(height); + } + } + + pub fn replace_last_agent_message(&mut self, config: &Config, text: String) { + if let Some(idx) = self + .entries + .iter() + .rposition(|entry| matches!(entry.cell, HistoryCell::AgentMessage { .. })) + { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_message(config, text); + let height = if width > 0 { + entry.cell.height(width) + } else { + 0 + }; + entry.line_count.set(height); + } + } + pub fn record_completed_exec_command( &mut self, call_id: String, @@ -454,7 +498,7 @@ impl WidgetRef for ConversationHistoryWidget { { // Choose a thumb color that stands out only when this pane has focus so that the - // user’s attention is naturally drawn to the active viewport. When unfocused we show + // user's attention is naturally drawn to the active viewport. When unfocused we show // a low-contrast thumb so the scrollbar fades into the background without becoming // invisible. let thumb_style = if self.has_input_focus {