diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 5ede774b..d1b83389 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -207,6 +207,7 @@ async fn process_chat_sse( } let mut fn_call_state = FunctionCallState::default(); + let mut assistant_text = String::new(); loop { let sse = match timeout(idle_timeout, stream.next()).await { @@ -254,21 +255,42 @@ async fn process_chat_sse( let choice_opt = chunk.get("choices").and_then(|c| c.get(0)); if let Some(choice) = choice_opt { - // Handle assistant content tokens. + // Handle assistant content tokens as streaming deltas. if let Some(content) = choice .get("delta") .and_then(|d| d.get("content")) .and_then(|c| c.as_str()) { - let item = ResponseItem::Message { - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: content.to_string(), - }], - id: None, - }; + if !content.is_empty() { + assistant_text.push_str(content); + let _ = tx_event + .send(Ok(ResponseEvent::OutputTextDelta(content.to_string()))) + .await; + } + } - let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + // Forward any reasoning/thinking deltas if present. + if let Some(reasoning) = choice + .get("delta") + .and_then(|d| d.get("reasoning")) + .and_then(|c| c.as_str()) + { + let _ = tx_event + .send(Ok(ResponseEvent::ReasoningSummaryDelta( + reasoning.to_string(), + ))) + .await; + } + if let Some(reasoning_content) = choice + .get("delta") + .and_then(|d| d.get("reasoning_content")) + .and_then(|c| c.as_str()) + { + let _ = tx_event + .send(Ok(ResponseEvent::ReasoningSummaryDelta( + reasoning_content.to_string(), + ))) + .await; } // Handle streaming function / tool calls. @@ -317,7 +339,18 @@ async fn process_chat_sse( let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } "stop" => { - // Regular turn without tool-call. + // Regular turn without tool-call. Emit the final assistant message + // as a single OutputItemDone so non-delta consumers see the result. + if !assistant_text.is_empty() { + let item = ResponseItem::Message { + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: std::mem::take(&mut assistant_text), + }], + id: None, + }; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } } _ => {} } @@ -358,7 +391,10 @@ async fn process_chat_sse( pub(crate) struct AggregatedChatStream { inner: S, cumulative: String, - pending_completed: Option, + cumulative_reasoning: String, + pending: std::collections::VecDeque, + // When true, do not emit a cumulative assistant message at Completed. + streaming_mode: bool, } impl Stream for AggregatedChatStream @@ -370,8 +406,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // First, flush any buffered Completed event from the previous call. - if let Some(ev) = this.pending_completed.take() { + // First, flush any buffered events from the previous call. + if let Some(ev) = this.pending.pop_front() { return Poll::Ready(Some(Ok(ev))); } @@ -388,16 +424,21 @@ where let is_assistant_delta = matches!(&item, crate::models::ResponseItem::Message { role, .. } if role == "assistant"); if is_assistant_delta { - if let crate::models::ResponseItem::Message { content, .. } = &item { - if let Some(text) = content.iter().find_map(|c| match c { - crate::models::ContentItem::OutputText { text } => Some(text), - _ => None, - }) { - this.cumulative.push_str(text); + // Only use the final assistant message if we have not + // seen any deltas; otherwise, deltas already built the + // cumulative text and this would duplicate it. + if this.cumulative.is_empty() { + if let crate::models::ResponseItem::Message { content, .. } = &item { + if let Some(text) = content.iter().find_map(|c| match c { + crate::models::ContentItem::OutputText { text } => Some(text), + _ => None, + }) { + this.cumulative.push_str(text); + } } } - // Swallow partial assistant chunk; keep polling. + // Swallow assistant message here; emit on Completed. continue; } @@ -408,24 +449,48 @@ where response_id, token_usage, }))) => { + // Build any aggregated items in the correct order: Reasoning first, then Message. + let mut emitted_any = false; + + if !this.cumulative_reasoning.is_empty() { + let aggregated_reasoning = crate::models::ResponseItem::Reasoning { + id: String::new(), + summary: vec![ + crate::models::ReasoningItemReasoningSummary::SummaryText { + text: std::mem::take(&mut this.cumulative_reasoning), + }, + ], + content: None, + encrypted_content: None, + }; + this.pending + .push_back(ResponseEvent::OutputItemDone(aggregated_reasoning)); + emitted_any = true; + } + if !this.cumulative.is_empty() { - let aggregated_item = crate::models::ResponseItem::Message { + let aggregated_message = crate::models::ResponseItem::Message { id: None, role: "assistant".to_string(), content: vec![crate::models::ContentItem::OutputText { text: std::mem::take(&mut this.cumulative), }], }; + this.pending + .push_back(ResponseEvent::OutputItemDone(aggregated_message)); + emitted_any = true; + } - // Buffer Completed so it is returned *after* the aggregated message. - this.pending_completed = Some(ResponseEvent::Completed { - response_id, - token_usage, + // Always emit Completed last when anything was aggregated. + if emitted_any { + this.pending.push_back(ResponseEvent::Completed { + response_id: response_id.clone(), + token_usage: token_usage.clone(), }); - - return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone( - aggregated_item, - )))); + // Return the first pending event now. + if let Some(ev) = this.pending.pop_front() { + return Poll::Ready(Some(Ok(ev))); + } } // Nothing aggregated – forward Completed directly. @@ -439,11 +504,25 @@ where // will never appear in a Chat Completions stream. continue; } - Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_)))) - | Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => { - // Deltas are ignored here since aggregation waits for the - // final OutputItemDone. - continue; + Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => { + // Always accumulate deltas so we can emit a final OutputItemDone at Completed. + this.cumulative.push_str(&delta); + if this.streaming_mode { + // In streaming mode, also forward the delta immediately. + return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))); + } else { + continue; + } + } + Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))) => { + // Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed. + this.cumulative_reasoning.push_str(&delta); + if this.streaming_mode { + // In streaming mode, also forward the delta immediately. + return Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))); + } else { + continue; + } } } } @@ -475,9 +554,23 @@ pub(crate) trait AggregateStreamExt: Stream> + Size AggregatedChatStream { inner: self, cumulative: String::new(), - pending_completed: None, + cumulative_reasoning: String::new(), + pending: std::collections::VecDeque::new(), + streaming_mode: false, } } } impl AggregateStreamExt for T where T: Stream> + Sized {} + +impl AggregatedChatStream { + pub(crate) fn streaming_mode(inner: S) -> Self { + AggregatedChatStream { + inner, + cumulative: String::new(), + cumulative_reasoning: String::new(), + pending: std::collections::VecDeque::new(), + streaming_mode: true, + } + } +} diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index b9ea6b13..8685bc54 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -93,7 +93,13 @@ impl ModelClient { // Wrap it with the aggregation adapter so callers see *only* // the final assistant message per turn (matching the // behaviour of the Responses API). - let mut aggregated = response_stream.aggregate(); + let mut aggregated = if self.config.show_reasoning_content + && !self.config.hide_agent_reasoning + { + crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream) + } else { + response_stream.aggregate() + }; // Bridge the aggregated stream back into a standard // `ResponseStream` by forwarding events through a channel. @@ -438,7 +444,7 @@ async fn process_sse( } } } - "response.reasoning_summary_text.delta" => { + "response.reasoning_summary_text.delta" | "response.reasoning_text.delta" => { if let Some(delta) = event.delta { let event = ResponseEvent::ReasoningSummaryDelta(delta); if tx_event.send(Ok(event)).await.is_err() { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 568d87c4..18bcf626 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -56,6 +56,7 @@ use crate::mcp_tool_call::handle_mcp_tool_call; use crate::models::ContentItem; use crate::models::FunctionCallOutputPayload; use crate::models::LocalShellAction; +use crate::models::ReasoningItemContent; use crate::models::ReasoningItemReasoningSummary; use crate::models::ResponseInputItem; use crate::models::ResponseItem; @@ -64,6 +65,7 @@ use crate::plan_tool::handle_update_plan; use crate::project_doc::get_user_instructions; use crate::protocol::AgentMessageDeltaEvent; use crate::protocol::AgentMessageEvent; +use crate::protocol::AgentReasoningContentEvent; use crate::protocol::AgentReasoningDeltaEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; @@ -227,6 +229,8 @@ pub(crate) struct Session { state: Mutex, codex_linux_sandbox_exe: Option, user_shell: shell::Shell, + show_reasoning_content: bool, + hide_agent_reasoning: bool, } impl Session { @@ -822,6 +826,8 @@ async fn submission_loop( codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), disable_response_storage, user_shell: default_shell, + show_reasoning_content: config.show_reasoning_content, + hide_agent_reasoning: config.hide_agent_reasoning, })); // Patch restored state into the newly created session. @@ -1132,6 +1138,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { ResponseItem::Reasoning { id, summary, + content, encrypted_content, }, None, @@ -1139,6 +1146,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { items_to_record_in_conversation_history.push(ResponseItem::Reasoning { id: id.clone(), summary: summary.clone(), + content: content.clone(), encrypted_content: encrypted_content.clone(), }); } @@ -1381,11 +1389,13 @@ async fn try_run_turn( sess.tx_event.send(event).await.ok(); } ResponseEvent::ReasoningSummaryDelta(delta) => { - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }), - }; - sess.tx_event.send(event).await.ok(); + if !sess.hide_agent_reasoning { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }), + }; + sess.tx_event.send(event).await.ok(); + } } } } @@ -1493,16 +1503,36 @@ async fn handle_response_item( } None } - ResponseItem::Reasoning { summary, .. } => { - for item in summary { - let text = match item { - ReasoningItemReasoningSummary::SummaryText { text } => text, - }; - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }), - }; - sess.tx_event.send(event).await.ok(); + ResponseItem::Reasoning { + id: _, + summary, + content, + encrypted_content: _, + } => { + if !sess.hide_agent_reasoning { + for item in summary { + let text = match item { + ReasoningItemReasoningSummary::SummaryText { text } => text, + }; + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }), + }; + sess.tx_event.send(event).await.ok(); + } + } + if !sess.hide_agent_reasoning && sess.show_reasoning_content && content.is_some() { + let content = content.unwrap(); + for item in content { + let text = match item { + ReasoningItemContent::ReasoningText { text } => text, + }; + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoningContent(AgentReasoningContentEvent { text }), + }; + sess.tx_event.send(event).await.ok(); + } } None } diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index b43dc56b..3277ca08 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -57,6 +57,10 @@ pub struct Config { /// users are only interested in the final agent responses. pub hide_agent_reasoning: bool, + /// When `true`, the raw chain-of-thought text from reasoning events will be + /// displayed in the UI in addition to the reasoning summaries. + pub show_reasoning_content: bool, + /// Disable server-side response storage (sends the full conversation /// context with every request). Currently necessary for OpenAI customers /// who have opted into Zero Data Retention (ZDR). @@ -325,6 +329,10 @@ pub struct ConfigToml { /// UI/output. Defaults to `false`. pub hide_agent_reasoning: Option, + /// When set to `true`, raw chain-of-thought text from reasoning events will + /// be shown in the UI. + pub show_reasoning_content: Option, + pub model_reasoning_effort: Option, pub model_reasoning_summary: Option, @@ -488,6 +496,19 @@ impl Config { Self::get_base_instructions(experimental_instructions_path, &resolved_cwd)?; let base_instructions = base_instructions.or(file_base_instructions); + // Resolve hide/show reasoning flags with consistent precedence: + // if hide is true, force show_reasoning_content to false. + let hide_agent_reasoning_val = cfg.hide_agent_reasoning.unwrap_or(false); + let show_reasoning_content_val = if hide_agent_reasoning_val { + false + } else { + cfg.show_reasoning_content.unwrap_or(false) + }; + + if cfg.hide_agent_reasoning == Some(true) && cfg.show_reasoning_content == Some(true) { + tracing::warn!("Ignoring show_reasoning_content because hide_agent_reasoning is true"); + } + let config = Self { model, model_context_window, @@ -517,7 +538,8 @@ impl Config { tui: cfg.tui.unwrap_or_default(), codex_linux_sandbox_exe, - hide_agent_reasoning: cfg.hide_agent_reasoning.unwrap_or(false), + hide_agent_reasoning: hide_agent_reasoning_val, + show_reasoning_content: show_reasoning_content_val, model_reasoning_effort: config_profile .model_reasoning_effort .or(cfg.model_reasoning_effort) @@ -891,6 +913,7 @@ disable_response_storage = true tui: Tui::default(), codex_linux_sandbox_exe: None, hide_agent_reasoning: false, + show_reasoning_content: false, model_reasoning_effort: ReasoningEffort::High, model_reasoning_summary: ReasoningSummary::Detailed, model_supports_reasoning_summaries: false, @@ -941,6 +964,7 @@ disable_response_storage = true tui: Tui::default(), codex_linux_sandbox_exe: None, hide_agent_reasoning: false, + show_reasoning_content: false, model_reasoning_effort: ReasoningEffort::default(), model_reasoning_summary: ReasoningSummary::default(), model_supports_reasoning_summaries: false, @@ -1006,6 +1030,7 @@ disable_response_storage = true tui: Tui::default(), codex_linux_sandbox_exe: None, hide_agent_reasoning: false, + show_reasoning_content: false, model_reasoning_effort: ReasoningEffort::default(), model_reasoning_summary: ReasoningSummary::default(), model_supports_reasoning_summaries: false, diff --git a/codex-rs/core/src/models.rs b/codex-rs/core/src/models.rs index 16640491..98d8727e 100644 --- a/codex-rs/core/src/models.rs +++ b/codex-rs/core/src/models.rs @@ -45,6 +45,8 @@ pub enum ResponseItem { Reasoning { id: String, summary: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + content: Option>, encrypted_content: Option, }, LocalShellCall { @@ -136,6 +138,12 @@ pub enum ReasoningItemReasoningSummary { SummaryText { text: String }, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ReasoningItemContent { + ReasoningText { text: String }, +} + impl From> for ResponseInputItem { fn from(items: Vec) -> Self { Self::Message { diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 82591a2c..1e073362 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -359,6 +359,9 @@ pub enum EventMsg { /// Agent reasoning delta event from agent. AgentReasoningDelta(AgentReasoningDeltaEvent), + /// Raw chain-of-thought from agent. + AgentReasoningContent(AgentReasoningContentEvent), + /// Ack the client's configure message. SessionConfigured(SessionConfiguredEvent), @@ -464,6 +467,11 @@ pub struct AgentReasoningEvent { pub text: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AgentReasoningContentEvent { + pub text: String, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentReasoningDeltaEvent { pub delta: String, diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 7703c138..1f8fe3c0 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -4,6 +4,7 @@ use codex_core::config::Config; use codex_core::plan_tool::UpdatePlanArgs; use codex_core::protocol::AgentMessageDeltaEvent; use codex_core::protocol::AgentMessageEvent; +use codex_core::protocol::AgentReasoningContentEvent; use codex_core::protocol::AgentReasoningDeltaEvent; use codex_core::protocol::BackgroundEventEvent; use codex_core::protocol::ErrorEvent; @@ -203,6 +204,14 @@ impl EventProcessor for EventProcessorWithHumanOutput { #[allow(clippy::expect_used)] std::io::stdout().flush().expect("could not flush stdout"); } + EventMsg::AgentReasoningContent(AgentReasoningContentEvent { text }) => { + if !self.show_agent_reasoning { + return CodexStatus::Running; + } + print!("{text}"); + #[allow(clippy::expect_used)] + std::io::stdout().flush().expect("could not flush stdout"); + } EventMsg::AgentMessage(AgentMessageEvent { message }) => { // if answer_started is false, this means we haven't received any // delta. Thus, we need to print the message as a new answer. diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 205dfa46..3d32d8de 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -252,7 +252,8 @@ async fn run_codex_tool_session_inner( EventMsg::AgentMessage(AgentMessageEvent { .. }) => { // TODO: think how we want to support this in the MCP } - EventMsg::TaskStarted + EventMsg::AgentReasoningContent(_) + | EventMsg::TaskStarted | EventMsg::TokenCount(_) | EventMsg::AgentReasoning(_) | EventMsg::McpToolCallBegin(_) diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs index 1db39a23..5b95c313 100644 --- a/codex-rs/mcp-server/src/conversation_loop.rs +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -90,7 +90,8 @@ pub async fn run_conversation_loop( EventMsg::AgentMessage(AgentMessageEvent { .. }) => { // TODO: think how we want to support this in the MCP } - EventMsg::TaskStarted + EventMsg::AgentReasoningContent(_) + | EventMsg::TaskStarted | EventMsg::TokenCount(_) | EventMsg::AgentReasoning(_) | EventMsg::McpToolCallBegin(_)