From e0303dbac06b6ead612107fa2144cf5b809bbe98 Mon Sep 17 00:00:00 2001 From: easong-openai Date: Tue, 5 Aug 2025 01:56:13 -0700 Subject: [PATCH] Rescue chat completion changes (#1846) https://github.com/openai/codex/pull/1835 has some messed up history. This adds support for streaming chat completions, which is useful for ollama. We should probably take a very skeptical eye to the code introduced in this PR. --------- Co-authored-by: Ahmed Ibrahim --- codex-rs/config.md | 13 + codex-rs/core/src/chat_completions.rs | 241 ++++++++++++++---- codex-rs/core/src/client.rs | 14 +- codex-rs/core/src/client_common.rs | 1 + codex-rs/core/src/codex.rs | 40 ++- codex-rs/core/src/config.rs | 12 + codex-rs/core/src/models.rs | 8 + codex-rs/core/src/protocol.rs | 16 ++ .../src/event_processor_with_human_output.rs | 34 +++ codex-rs/mcp-server/src/codex_tool_runner.rs | 4 +- codex-rs/mcp-server/src/conversation_loop.rs | 4 +- codex-rs/tui/src/bottom_pane/mod.rs | 148 ++++++++++- codex-rs/tui/src/chatwidget.rs | 69 ++++- 13 files changed, 547 insertions(+), 57 deletions(-) diff --git a/codex-rs/config.md b/codex-rs/config.md index c7dfe42a..992fe1aa 100644 --- a/codex-rs/config.md +++ b/codex-rs/config.md @@ -483,6 +483,19 @@ Setting `hide_agent_reasoning` to `true` suppresses these events in **both** the hide_agent_reasoning = true # defaults to false ``` +## show_raw_agent_reasoning + +Surfaces the model’s raw chain-of-thought ("raw reasoning content") when available. + +Notes: +- Only takes effect if the selected model/provider actually emits raw reasoning content. Many models do not. When unsupported, this option has no visible effect. +- Raw reasoning may include intermediate thoughts or sensitive context. Enable only if acceptable for your workflow. + +Example: +```toml +show_raw_agent_reasoning = true # defaults to false +``` + ## model_context_window The size of the context window for the model, in tokens. diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 6aeccc5d..956dcebd 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -23,6 +23,7 @@ use crate::error::CodexErr; use crate::error::Result; use crate::model_family::ModelFamily; use crate::models::ContentItem; +use crate::models::ReasoningItemContent; use crate::models::ResponseItem; use crate::openai_tools::create_tools_json_for_chat_completions_api; use crate::util::backoff; @@ -209,6 +210,8 @@ async fn process_chat_sse( } let mut fn_call_state = FunctionCallState::default(); + let mut assistant_text = String::new(); + let mut reasoning_text = String::new(); loop { let sse = match timeout(idle_timeout, stream.next()).await { @@ -237,6 +240,31 @@ async fn process_chat_sse( // OpenAI Chat streaming sends a literal string "[DONE]" when finished. if sse.data.trim() == "[DONE]" { + // Emit any finalized items before closing so downstream consumers receive + // terminal events for both assistant content and raw reasoning. + if !assistant_text.is_empty() { + let item = ResponseItem::Message { + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: std::mem::take(&mut assistant_text), + }], + id: None, + }; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } + + if !reasoning_text.is_empty() { + let item = ResponseItem::Reasoning { + id: String::new(), + summary: Vec::new(), + content: Some(vec![ReasoningItemContent::ReasoningText { + text: std::mem::take(&mut reasoning_text), + }]), + encrypted_content: None, + }; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } + let _ = tx_event .send(Ok(ResponseEvent::Completed { response_id: String::new(), @@ -256,26 +284,47 @@ async fn process_chat_sse( let choice_opt = chunk.get("choices").and_then(|c| c.get(0)); if let Some(choice) = choice_opt { - // Handle assistant content tokens. + // Handle assistant content tokens as streaming deltas. if let Some(content) = choice .get("delta") .and_then(|d| d.get("content")) .and_then(|c| c.as_str()) { - // Emit a delta so downstream consumers can stream text live. - let _ = tx_event - .send(Ok(ResponseEvent::OutputTextDelta(content.to_string()))) - .await; + if !content.is_empty() { + assistant_text.push_str(content); + let _ = tx_event + .send(Ok(ResponseEvent::OutputTextDelta(content.to_string()))) + .await; + } + } - let item = ResponseItem::Message { - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: content.to_string(), - }], - id: None, - }; + // Forward any reasoning/thinking deltas if present. + // Some providers stream `reasoning` as a plain string while others + // nest the text under an object (e.g. `{ "reasoning": { "text": "…" } }`). + if let Some(reasoning_val) = choice.get("delta").and_then(|d| d.get("reasoning")) { + let mut maybe_text = reasoning_val.as_str().map(|s| s.to_string()); - let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + if maybe_text.is_none() && reasoning_val.is_object() { + if let Some(s) = reasoning_val + .get("text") + .and_then(|t| t.as_str()) + .filter(|s| !s.is_empty()) + { + maybe_text = Some(s.to_string()); + } else if let Some(s) = reasoning_val + .get("content") + .and_then(|t| t.as_str()) + .filter(|s| !s.is_empty()) + { + maybe_text = Some(s.to_string()); + } + } + + if let Some(reasoning) = maybe_text { + let _ = tx_event + .send(Ok(ResponseEvent::ReasoningContentDelta(reasoning))) + .await; + } } // Handle streaming function / tool calls. @@ -312,7 +361,21 @@ async fn process_chat_sse( if let Some(finish_reason) = choice.get("finish_reason").and_then(|v| v.as_str()) { match finish_reason { "tool_calls" if fn_call_state.active => { - // Build the FunctionCall response item. + // First, flush the terminal raw reasoning so UIs can finalize + // the reasoning stream before any exec/tool events begin. + if !reasoning_text.is_empty() { + let item = ResponseItem::Reasoning { + id: String::new(), + summary: Vec::new(), + content: Some(vec![ReasoningItemContent::ReasoningText { + text: std::mem::take(&mut reasoning_text), + }]), + encrypted_content: None, + }; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } + + // Then emit the FunctionCall response item. let item = ResponseItem::FunctionCall { id: None, name: fn_call_state.name.clone().unwrap_or_else(|| "".to_string()), @@ -320,11 +383,33 @@ async fn process_chat_sse( call_id: fn_call_state.call_id.clone().unwrap_or_else(String::new), }; - // Emit it downstream. let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } "stop" => { - // Regular turn without tool-call. + // Regular turn without tool-call. Emit the final assistant message + // as a single OutputItemDone so non-delta consumers see the result. + if !assistant_text.is_empty() { + let item = ResponseItem::Message { + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: std::mem::take(&mut assistant_text), + }], + id: None, + }; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } + // Also emit a terminal Reasoning item so UIs can finalize raw reasoning. + if !reasoning_text.is_empty() { + let item = ResponseItem::Reasoning { + id: String::new(), + summary: Vec::new(), + content: Some(vec![ReasoningItemContent::ReasoningText { + text: std::mem::take(&mut reasoning_text), + }]), + encrypted_content: None, + }; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } } _ => {} } @@ -362,10 +447,17 @@ async fn process_chat_sse( /// The adapter is intentionally *lossless*: callers who do **not** opt in via /// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified /// events. +#[derive(Copy, Clone, Eq, PartialEq)] +enum AggregateMode { + AggregatedOnly, + Streaming, +} pub(crate) struct AggregatedChatStream { inner: S, cumulative: String, - pending_completed: Option, + cumulative_reasoning: String, + pending: std::collections::VecDeque, + mode: AggregateMode, } impl Stream for AggregatedChatStream @@ -377,8 +469,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // First, flush any buffered Completed event from the previous call. - if let Some(ev) = this.pending_completed.take() { + // First, flush any buffered events from the previous call. + if let Some(ev) = this.pending.pop_front() { return Poll::Ready(Some(Ok(ev))); } @@ -395,16 +487,21 @@ where let is_assistant_delta = matches!(&item, crate::models::ResponseItem::Message { role, .. } if role == "assistant"); if is_assistant_delta { - if let crate::models::ResponseItem::Message { content, .. } = &item { - if let Some(text) = content.iter().find_map(|c| match c { - crate::models::ContentItem::OutputText { text } => Some(text), - _ => None, - }) { - this.cumulative.push_str(text); + // Only use the final assistant message if we have not + // seen any deltas; otherwise, deltas already built the + // cumulative text and this would duplicate it. + if this.cumulative.is_empty() { + if let crate::models::ResponseItem::Message { content, .. } = &item { + if let Some(text) = content.iter().find_map(|c| match c { + crate::models::ContentItem::OutputText { text } => Some(text), + _ => None, + }) { + this.cumulative.push_str(text); + } } } - // Swallow partial assistant chunk; keep polling. + // Swallow assistant message here; emit on Completed. continue; } @@ -415,24 +512,50 @@ where response_id, token_usage, }))) => { + // Build any aggregated items in the correct order: Reasoning first, then Message. + let mut emitted_any = false; + + if !this.cumulative_reasoning.is_empty() + && matches!(this.mode, AggregateMode::AggregatedOnly) + { + let aggregated_reasoning = crate::models::ResponseItem::Reasoning { + id: String::new(), + summary: Vec::new(), + content: Some(vec![ + crate::models::ReasoningItemContent::ReasoningText { + text: std::mem::take(&mut this.cumulative_reasoning), + }, + ]), + encrypted_content: None, + }; + this.pending + .push_back(ResponseEvent::OutputItemDone(aggregated_reasoning)); + emitted_any = true; + } + if !this.cumulative.is_empty() { - let aggregated_item = crate::models::ResponseItem::Message { + let aggregated_message = crate::models::ResponseItem::Message { id: None, role: "assistant".to_string(), content: vec![crate::models::ContentItem::OutputText { text: std::mem::take(&mut this.cumulative), }], }; + this.pending + .push_back(ResponseEvent::OutputItemDone(aggregated_message)); + emitted_any = true; + } - // Buffer Completed so it is returned *after* the aggregated message. - this.pending_completed = Some(ResponseEvent::Completed { - response_id, - token_usage, + // Always emit Completed last when anything was aggregated. + if emitted_any { + this.pending.push_back(ResponseEvent::Completed { + response_id: response_id.clone(), + token_usage: token_usage.clone(), }); - - return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone( - aggregated_item, - )))); + // Return the first pending event now. + if let Some(ev) = this.pending.pop_front() { + return Poll::Ready(Some(Ok(ev))); + } } // Nothing aggregated – forward Completed directly. @@ -447,13 +570,27 @@ where continue; } Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => { - // Forward deltas unchanged so callers can stream text - // live while still receiving a single aggregated - // OutputItemDone at the end of the turn. - return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))); + // Always accumulate deltas so we can emit a final OutputItemDone at Completed. + this.cumulative.push_str(&delta); + if matches!(this.mode, AggregateMode::Streaming) { + // In streaming mode, also forward the delta immediately. + return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))); + } else { + continue; + } } - Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))) => { - return Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))); + Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))) => { + // Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed. + this.cumulative_reasoning.push_str(&delta); + if matches!(this.mode, AggregateMode::Streaming) { + // In streaming mode, also forward the delta immediately. + return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))); + } else { + continue; + } + } + Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => { + continue; } } } @@ -482,12 +619,24 @@ pub(crate) trait AggregateStreamExt: Stream> + Size /// } /// ``` fn aggregate(self) -> AggregatedChatStream { - AggregatedChatStream { - inner: self, - cumulative: String::new(), - pending_completed: None, - } + AggregatedChatStream::new(self, AggregateMode::AggregatedOnly) } } impl AggregateStreamExt for T where T: Stream> + Sized {} + +impl AggregatedChatStream { + fn new(inner: S, mode: AggregateMode) -> Self { + AggregatedChatStream { + inner, + cumulative: String::new(), + cumulative_reasoning: String::new(), + pending: std::collections::VecDeque::new(), + mode, + } + } + + pub(crate) fn streaming_mode(inner: S) -> Self { + Self::new(inner, AggregateMode::Streaming) + } +} diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 38f390cb..514e683e 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -92,7 +92,11 @@ impl ModelClient { // Wrap it with the aggregation adapter so callers see *only* // the final assistant message per turn (matching the // behaviour of the Responses API). - let mut aggregated = response_stream.aggregate(); + let mut aggregated = if self.config.show_raw_agent_reasoning { + crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream) + } else { + response_stream.aggregate() + }; // Bridge the aggregated stream back into a standard // `ResponseStream` by forwarding events through a channel. @@ -437,6 +441,14 @@ async fn process_sse( } } } + "response.reasoning_text.delta" => { + if let Some(delta) = event.delta { + let event = ResponseEvent::ReasoningContentDelta(delta); + if tx_event.send(Ok(event)).await.is_err() { + return; + } + } + } "response.created" => { if event.response.is_some() { let _ = tx_event.send(Ok(ResponseEvent::Created {})).await; diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index 58ec1c3f..8b845a52 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -72,6 +72,7 @@ pub enum ResponseEvent { }, OutputTextDelta(String), ReasoningSummaryDelta(String), + ReasoningContentDelta(String), } #[derive(Debug, Serialize)] diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8d243564..0ce0c4ea 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -56,6 +56,7 @@ use crate::mcp_tool_call::handle_mcp_tool_call; use crate::models::ContentItem; use crate::models::FunctionCallOutputPayload; use crate::models::LocalShellAction; +use crate::models::ReasoningItemContent; use crate::models::ReasoningItemReasoningSummary; use crate::models::ResponseInputItem; use crate::models::ResponseItem; @@ -66,6 +67,8 @@ use crate::protocol::AgentMessageDeltaEvent; use crate::protocol::AgentMessageEvent; use crate::protocol::AgentReasoningDeltaEvent; use crate::protocol::AgentReasoningEvent; +use crate::protocol::AgentReasoningRawContentDeltaEvent; +use crate::protocol::AgentReasoningRawContentEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; use crate::protocol::AskForApproval; use crate::protocol::BackgroundEventEvent; @@ -227,6 +230,7 @@ pub(crate) struct Session { state: Mutex, codex_linux_sandbox_exe: Option, user_shell: shell::Shell, + show_raw_agent_reasoning: bool, } impl Session { @@ -822,6 +826,7 @@ async fn submission_loop( codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), disable_response_storage, user_shell: default_shell, + show_raw_agent_reasoning: config.show_raw_agent_reasoning, })); // Patch restored state into the newly created session. @@ -1132,6 +1137,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { ResponseItem::Reasoning { id, summary, + content, encrypted_content, }, None, @@ -1139,6 +1145,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { items_to_record_in_conversation_history.push(ResponseItem::Reasoning { id: id.clone(), summary: summary.clone(), + content: content.clone(), encrypted_content: encrypted_content.clone(), }); } @@ -1392,6 +1399,17 @@ async fn try_run_turn( }; sess.tx_event.send(event).await.ok(); } + ResponseEvent::ReasoningContentDelta(delta) => { + if sess.show_raw_agent_reasoning { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoningRawContentDelta( + AgentReasoningRawContentDeltaEvent { delta }, + ), + }; + sess.tx_event.send(event).await.ok(); + } + } } } } @@ -1498,7 +1516,12 @@ async fn handle_response_item( } None } - ResponseItem::Reasoning { summary, .. } => { + ResponseItem::Reasoning { + id: _, + summary, + content, + encrypted_content: _, + } => { for item in summary { let text = match item { ReasoningItemReasoningSummary::SummaryText { text } => text, @@ -1509,6 +1532,21 @@ async fn handle_response_item( }; sess.tx_event.send(event).await.ok(); } + if sess.show_raw_agent_reasoning && content.is_some() { + let content = content.unwrap(); + for item in content { + let text = match item { + ReasoningItemContent::ReasoningText { text } => text, + }; + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { + text, + }), + }; + sess.tx_event.send(event).await.ok(); + } + } None } ResponseItem::FunctionCall { diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index a0f36f45..d97d5ec1 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -61,6 +61,10 @@ pub struct Config { /// users are only interested in the final agent responses. pub hide_agent_reasoning: bool, + /// When set to `true`, `AgentReasoningRawContentEvent` events will be shown in the UI/output. + /// Defaults to `false`. + pub show_raw_agent_reasoning: bool, + /// Disable server-side response storage (sends the full conversation /// context with every request). Currently necessary for OpenAI customers /// who have opted into Zero Data Retention (ZDR). @@ -325,6 +329,10 @@ pub struct ConfigToml { /// UI/output. Defaults to `false`. pub hide_agent_reasoning: Option, + /// When set to `true`, `AgentReasoningRawContentEvent` events will be shown in the UI/output. + /// Defaults to `false`. + pub show_raw_agent_reasoning: Option, + pub model_reasoning_effort: Option, pub model_reasoning_summary: Option, @@ -531,6 +539,7 @@ impl Config { codex_linux_sandbox_exe, hide_agent_reasoning: cfg.hide_agent_reasoning.unwrap_or(false), + show_raw_agent_reasoning: cfg.show_raw_agent_reasoning.unwrap_or(false), model_reasoning_effort: config_profile .model_reasoning_effort .or(cfg.model_reasoning_effort) @@ -901,6 +910,7 @@ disable_response_storage = true tui: Tui::default(), codex_linux_sandbox_exe: None, hide_agent_reasoning: false, + show_raw_agent_reasoning: false, model_reasoning_effort: ReasoningEffort::High, model_reasoning_summary: ReasoningSummary::Detailed, chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), @@ -951,6 +961,7 @@ disable_response_storage = true tui: Tui::default(), codex_linux_sandbox_exe: None, hide_agent_reasoning: false, + show_raw_agent_reasoning: false, model_reasoning_effort: ReasoningEffort::default(), model_reasoning_summary: ReasoningSummary::default(), chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), @@ -1016,6 +1027,7 @@ disable_response_storage = true tui: Tui::default(), codex_linux_sandbox_exe: None, hide_agent_reasoning: false, + show_raw_agent_reasoning: false, model_reasoning_effort: ReasoningEffort::default(), model_reasoning_summary: ReasoningSummary::default(), chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), diff --git a/codex-rs/core/src/models.rs b/codex-rs/core/src/models.rs index 91bfb3bc..fb48b530 100644 --- a/codex-rs/core/src/models.rs +++ b/codex-rs/core/src/models.rs @@ -45,6 +45,8 @@ pub enum ResponseItem { Reasoning { id: String, summary: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + content: Option>, encrypted_content: Option, }, LocalShellCall { @@ -136,6 +138,12 @@ pub enum ReasoningItemReasoningSummary { SummaryText { text: String }, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ReasoningItemContent { + ReasoningText { text: String }, +} + impl From> for ResponseInputItem { fn from(items: Vec) -> Self { Self::Message { diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 82591a2c..aa330f6b 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -359,6 +359,12 @@ pub enum EventMsg { /// Agent reasoning delta event from agent. AgentReasoningDelta(AgentReasoningDeltaEvent), + /// Raw chain-of-thought from agent. + AgentReasoningRawContent(AgentReasoningRawContentEvent), + + /// Agent reasoning content delta event from agent. + AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent), + /// Ack the client's configure message. SessionConfigured(SessionConfiguredEvent), @@ -464,6 +470,16 @@ pub struct AgentReasoningEvent { pub text: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AgentReasoningRawContentEvent { + pub text: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AgentReasoningRawContentDeltaEvent { + pub delta: String, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentReasoningDeltaEvent { pub delta: String, diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 7703c138..393ef4ab 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -5,6 +5,8 @@ use codex_core::plan_tool::UpdatePlanArgs; use codex_core::protocol::AgentMessageDeltaEvent; use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::AgentReasoningDeltaEvent; +use codex_core::protocol::AgentReasoningRawContentDeltaEvent; +use codex_core::protocol::AgentReasoningRawContentEvent; use codex_core::protocol::BackgroundEventEvent; use codex_core::protocol::ErrorEvent; use codex_core::protocol::Event; @@ -55,8 +57,10 @@ pub(crate) struct EventProcessorWithHumanOutput { /// Whether to include `AgentReasoning` events in the output. show_agent_reasoning: bool, + show_raw_agent_reasoning: bool, answer_started: bool, reasoning_started: bool, + raw_reasoning_started: bool, last_message_path: Option, } @@ -81,8 +85,10 @@ impl EventProcessorWithHumanOutput { green: Style::new().green(), cyan: Style::new().cyan(), show_agent_reasoning: !config.hide_agent_reasoning, + show_raw_agent_reasoning: config.show_raw_agent_reasoning, answer_started: false, reasoning_started: false, + raw_reasoning_started: false, last_message_path, } } else { @@ -97,8 +103,10 @@ impl EventProcessorWithHumanOutput { green: Style::new(), cyan: Style::new(), show_agent_reasoning: !config.hide_agent_reasoning, + show_raw_agent_reasoning: config.show_raw_agent_reasoning, answer_started: false, reasoning_started: false, + raw_reasoning_started: false, last_message_path, } } @@ -203,6 +211,32 @@ impl EventProcessor for EventProcessorWithHumanOutput { #[allow(clippy::expect_used)] std::io::stdout().flush().expect("could not flush stdout"); } + EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text }) => { + if !self.show_raw_agent_reasoning { + return CodexStatus::Running; + } + if !self.raw_reasoning_started { + print!("{text}"); + #[allow(clippy::expect_used)] + std::io::stdout().flush().expect("could not flush stdout"); + } else { + println!(); + self.raw_reasoning_started = false; + } + } + EventMsg::AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent { + delta, + }) => { + if !self.show_raw_agent_reasoning { + return CodexStatus::Running; + } + if !self.raw_reasoning_started { + self.raw_reasoning_started = true; + } + print!("{delta}"); + #[allow(clippy::expect_used)] + std::io::stdout().flush().expect("could not flush stdout"); + } EventMsg::AgentMessage(AgentMessageEvent { message }) => { // if answer_started is false, this means we haven't received any // delta. Thus, we need to print the message as a new answer. diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 205dfa46..b91c4a76 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -252,7 +252,9 @@ async fn run_codex_tool_session_inner( EventMsg::AgentMessage(AgentMessageEvent { .. }) => { // TODO: think how we want to support this in the MCP } - EventMsg::TaskStarted + EventMsg::AgentReasoningRawContent(_) + | EventMsg::AgentReasoningRawContentDelta(_) + | EventMsg::TaskStarted | EventMsg::TokenCount(_) | EventMsg::AgentReasoning(_) | EventMsg::McpToolCallBegin(_) diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs index 1db39a23..80c34760 100644 --- a/codex-rs/mcp-server/src/conversation_loop.rs +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -90,7 +90,9 @@ pub async fn run_conversation_loop( EventMsg::AgentMessage(AgentMessageEvent { .. }) => { // TODO: think how we want to support this in the MCP } - EventMsg::TaskStarted + EventMsg::AgentReasoningRawContent(_) + | EventMsg::AgentReasoningRawContentDelta(_) + | EventMsg::TaskStarted | EventMsg::TokenCount(_) | EventMsg::AgentReasoning(_) | EventMsg::McpToolCallBegin(_) diff --git a/codex-rs/tui/src/bottom_pane/mod.rs b/codex-rs/tui/src/bottom_pane/mod.rs index fde0b3bd..cdb01ba0 100644 --- a/codex-rs/tui/src/bottom_pane/mod.rs +++ b/codex-rs/tui/src/bottom_pane/mod.rs @@ -138,6 +138,11 @@ impl BottomPane<'_> { view.handle_key_event(self, key_event); if !view.is_complete() { self.active_view = Some(view); + } else if self.is_task_running { + let mut v = StatusIndicatorView::new(self.app_event_tx.clone()); + v.update_text("waiting for model".to_string()); + self.active_view = Some(Box::new(v)); + self.status_view_active = true; } self.request_redraw(); InputResult::None @@ -163,6 +168,12 @@ impl BottomPane<'_> { CancellationEvent::Handled => { if !view.is_complete() { self.active_view = Some(view); + } else if self.is_task_running { + // Modal aborted but task still running – restore status indicator. + let mut v = StatusIndicatorView::new(self.app_event_tx.clone()); + v.update_text("waiting for model".to_string()); + self.active_view = Some(Box::new(v)); + self.status_view_active = true; } self.show_ctrl_c_quit_hint(); } @@ -202,15 +213,20 @@ impl BottomPane<'_> { handled_by_view = true; } - // Fallback: if the current active view did not consume status updates, - // present an overlay above the composer. - if !handled_by_view { + // Fallback: if the current active view did not consume status updates + // and no modal view is active, present an overlay above the composer. + // If a modal is active, do NOT render the overlay to avoid drawing + // over the dialog. + if !handled_by_view && self.active_view.is_none() { if self.live_status.is_none() { self.live_status = Some(StatusIndicatorWidget::new(self.app_event_tx.clone())); } if let Some(status) = &mut self.live_status { status.update_text(text); } + } else if !handled_by_view { + // Ensure any previous overlay is cleared when a modal becomes active. + self.live_status = None; } self.request_redraw(); } @@ -296,6 +312,8 @@ impl BottomPane<'_> { // Otherwise create a new approval modal overlay. let modal = ApprovalModalView::new(request, self.app_event_tx.clone()); self.active_view = Some(Box::new(modal)); + // Hide any overlay status while a modal is visible. + self.live_status = None; self.status_view_active = false; self.request_redraw() } @@ -368,16 +386,18 @@ impl WidgetRef for &BottomPane<'_> { y_offset = y_offset.saturating_add(1); } if let Some(status) = &self.live_status { - let live_h = status.desired_height(area.width).min(area.height); + let live_h = status + .desired_height(area.width) + .min(area.height.saturating_sub(y_offset)); if live_h > 0 { let live_rect = Rect { x: area.x, - y: area.y, + y: area.y + y_offset, width: area.width, height: live_h, }; status.render_ref(live_rect, buf); - y_offset = live_h; + y_offset = y_offset.saturating_add(live_h); } } @@ -540,6 +560,122 @@ mod tests { ); } + #[test] + fn overlay_not_shown_above_approval_modal() { + let (tx_raw, _rx) = channel::(); + let tx = AppEventSender::new(tx_raw); + let mut pane = BottomPane::new(BottomPaneParams { + app_event_tx: tx, + has_input_focus: true, + enhanced_keys_supported: false, + }); + + // Create an approval modal (active view). + pane.push_approval_request(exec_request()); + // Attempt to update status; this should NOT create an overlay while modal is visible. + pane.update_status_text("running command".to_string()); + + // Render and verify the top row does not include the Working header overlay. + let area = Rect::new(0, 0, 60, 6); + let mut buf = Buffer::empty(area); + (&pane).render_ref(area, &mut buf); + + let mut r0 = String::new(); + for x in 0..area.width { + r0.push(buf[(x, 0)].symbol().chars().next().unwrap_or(' ')); + } + assert!( + !r0.contains("Working"), + "overlay Working header should not render above modal" + ); + } + + #[test] + fn composer_not_shown_after_denied_if_task_running() { + let (tx_raw, rx) = channel::(); + let tx = AppEventSender::new(tx_raw); + let mut pane = BottomPane::new(BottomPaneParams { + app_event_tx: tx.clone(), + has_input_focus: true, + enhanced_keys_supported: false, + }); + + // Start a running task so the status indicator replaces the composer. + pane.set_task_running(true); + pane.update_status_text("waiting for model".to_string()); + + // Push an approval modal (e.g., command approval) which should hide the status view. + pane.push_approval_request(exec_request()); + + // Simulate pressing 'n' (deny) on the modal. + use crossterm::event::KeyCode; + use crossterm::event::KeyEvent; + use crossterm::event::KeyModifiers; + pane.handle_key_event(KeyEvent::new(KeyCode::Char('n'), KeyModifiers::NONE)); + + // After denial, since the task is still running, the status indicator + // should be restored as the active view; the composer should NOT be visible. + assert!( + pane.status_view_active, + "status view should be active after denial" + ); + assert!(pane.active_view.is_some(), "active view should be present"); + + // Render and ensure the top row includes the Working header instead of the composer. + // Give the animation thread a moment to tick. + std::thread::sleep(std::time::Duration::from_millis(120)); + let area = Rect::new(0, 0, 40, 3); + let mut buf = Buffer::empty(area); + (&pane).render_ref(area, &mut buf); + let mut row0 = String::new(); + for x in 0..area.width { + row0.push(buf[(x, 0)].symbol().chars().next().unwrap_or(' ')); + } + assert!( + row0.contains("Working"), + "expected Working header after denial: {row0:?}" + ); + + // Drain the channel to avoid unused warnings. + drop(rx); + } + + #[test] + fn status_indicator_visible_during_command_execution() { + let (tx_raw, _rx) = channel::(); + let tx = AppEventSender::new(tx_raw); + let mut pane = BottomPane::new(BottomPaneParams { + app_event_tx: tx, + has_input_focus: true, + enhanced_keys_supported: false, + }); + + // Begin a task: show initial status. + pane.set_task_running(true); + pane.update_status_text("waiting for model".to_string()); + + // As a long-running command begins (post-approval), ensure the status + // indicator is visible while we wait for the command to run. + pane.update_status_text("running command".to_string()); + + // Allow some frames so the animation thread ticks. + std::thread::sleep(std::time::Duration::from_millis(120)); + + // Render and confirm the line contains the "Working" header. + let area = Rect::new(0, 0, 40, 3); + let mut buf = Buffer::empty(area); + (&pane).render_ref(area, &mut buf); + + let mut row0 = String::new(); + for x in 0..area.width { + row0.push(buf[(x, 0)].symbol().chars().next().unwrap_or(' ')); + } + assert!( + row0.contains("Working"), + "expected Working header: {row0:?}" + ); + } + #[test] fn bottom_padding_present_for_status_view() { let (tx_raw, _rx) = channel::(); diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index f63810b6..94bd2f12 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -9,6 +9,8 @@ use codex_core::protocol::AgentMessageDeltaEvent; use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::AgentReasoningDeltaEvent; use codex_core::protocol::AgentReasoningEvent; +use codex_core::protocol::AgentReasoningRawContentDeltaEvent; +use codex_core::protocol::AgentReasoningRawContentEvent; use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::ErrorEvent; use codex_core::protocol::Event; @@ -61,6 +63,7 @@ pub(crate) struct ChatWidget<'a> { initial_user_message: Option, token_usage: TokenUsage, reasoning_buffer: String, + content_buffer: String, // Buffer for streaming assistant answer text; we do not surface partial // We wait for the final AgentMessage event and then emit the full text // at once into scrollback so the history contains a single message. @@ -101,6 +104,24 @@ fn create_initial_user_message(text: String, image_paths: Vec) -> Optio } impl ChatWidget<'_> { + fn emit_stream_header(&mut self, kind: StreamKind) { + use ratatui::text::Line as RLine; + if self.stream_header_emitted { + return; + } + let header = match kind { + StreamKind::Reasoning => RLine::from("thinking".magenta().italic()), + StreamKind::Answer => RLine::from("codex".magenta().bold()), + }; + self.app_event_tx + .send(AppEvent::InsertHistory(vec![header])); + self.stream_header_emitted = true; + } + fn finalize_active_stream(&mut self) { + if let Some(kind) = self.current_stream { + self.finalize_stream(kind); + } + } pub(crate) fn new( config: Config, app_event_tx: AppEventSender, @@ -161,6 +182,7 @@ impl ChatWidget<'_> { ), token_usage: TokenUsage::default(), reasoning_buffer: String::new(), + content_buffer: String::new(), answer_buffer: String::new(), running_commands: HashMap::new(), live_builder: RowBuilder::new(80), @@ -276,6 +298,20 @@ impl ChatWidget<'_> { self.finalize_stream(StreamKind::Reasoning); self.request_redraw(); } + EventMsg::AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent { + delta, + }) => { + // Treat raw reasoning content the same as summarized reasoning for UI flow. + self.begin_stream(StreamKind::Reasoning); + self.reasoning_buffer.push_str(&delta); + self.stream_push_and_maybe_commit(&delta); + self.request_redraw(); + } + EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text: _ }) => { + // Finalize the raw reasoning stream just like the summarized reasoning event. + self.finalize_stream(StreamKind::Reasoning); + self.request_redraw(); + } EventMsg::TaskStarted => { self.bottom_pane.clear_ctrl_c_quit_hint(); self.bottom_pane.set_task_running(true); @@ -299,6 +335,14 @@ impl ChatWidget<'_> { EventMsg::Error(ErrorEvent { message }) => { self.add_to_history(HistoryCell::new_error_event(message.clone())); self.bottom_pane.set_task_running(false); + self.bottom_pane.clear_live_ring(); + self.live_builder = RowBuilder::new(self.live_builder.width()); + self.current_stream = None; + self.stream_header_emitted = false; + self.answer_buffer.clear(); + self.reasoning_buffer.clear(); + self.content_buffer.clear(); + self.request_redraw(); } EventMsg::PlanUpdate(update) => { // Commit plan updates directly to history (no status-line preview). @@ -310,6 +354,7 @@ impl ChatWidget<'_> { cwd, reason, }) => { + self.finalize_active_stream(); // Log a background summary immediately so the history is chronological. let cmdline = strip_bash_lc_and_escape(&command); let text = format!( @@ -336,6 +381,7 @@ impl ChatWidget<'_> { reason, grant_root, }) => { + self.finalize_active_stream(); // ------------------------------------------------------------------ // Before we even prompt the user for approval we surface the patch // summary in the main conversation so that the dialog appears in a @@ -365,6 +411,10 @@ impl ChatWidget<'_> { command, cwd, }) => { + self.finalize_active_stream(); + // Ensure the status indicator is visible while the command runs. + self.bottom_pane + .update_status_text("running command".to_string()); self.running_commands.insert( call_id, RunningCommand { @@ -408,6 +458,7 @@ impl ChatWidget<'_> { call_id: _, invocation, }) => { + self.finalize_active_stream(); self.add_to_history(HistoryCell::new_active_mcp_tool_call(invocation)); } EventMsg::McpToolCallEnd(McpToolCallEndEvent { @@ -451,7 +502,9 @@ impl ChatWidget<'_> { /// Update the live log preview while a task is running. pub(crate) fn update_latest_log(&mut self, line: String) { - self.bottom_pane.update_status_text(line); + if self.bottom_pane.is_task_running() { + self.bottom_pane.update_status_text(line); + } } fn request_redraw(&mut self) { @@ -478,8 +531,15 @@ impl ChatWidget<'_> { if self.bottom_pane.is_task_running() { self.bottom_pane.clear_ctrl_c_quit_hint(); self.submit_op(Op::Interrupt); + self.bottom_pane.set_task_running(false); + self.bottom_pane.clear_live_ring(); + self.live_builder = RowBuilder::new(self.live_builder.width()); + self.current_stream = None; + self.stream_header_emitted = false; self.answer_buffer.clear(); self.reasoning_buffer.clear(); + self.content_buffer.clear(); + self.request_redraw(); CancellationEvent::Ignored } else if self.bottom_pane.ctrl_c_quit_hint_visible() { self.submit_op(Op::Shutdown); @@ -518,6 +578,12 @@ impl ChatWidget<'_> { impl ChatWidget<'_> { fn begin_stream(&mut self, kind: StreamKind) { + if let Some(current) = self.current_stream { + if current != kind { + self.finalize_stream(current); + } + } + if self.current_stream != Some(kind) { self.current_stream = Some(kind); self.stream_header_emitted = false; @@ -526,6 +592,7 @@ impl ChatWidget<'_> { // Ensure the waiting status is visible (composer replaced). self.bottom_pane .update_status_text("waiting for model".to_string()); + self.emit_stream_header(kind); } }