diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index d1b83389..5ede774b 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -207,7 +207,6 @@ async fn process_chat_sse( } let mut fn_call_state = FunctionCallState::default(); - let mut assistant_text = String::new(); loop { let sse = match timeout(idle_timeout, stream.next()).await { @@ -255,42 +254,21 @@ async fn process_chat_sse( let choice_opt = chunk.get("choices").and_then(|c| c.get(0)); if let Some(choice) = choice_opt { - // Handle assistant content tokens as streaming deltas. + // Handle assistant content tokens. if let Some(content) = choice .get("delta") .and_then(|d| d.get("content")) .and_then(|c| c.as_str()) { - if !content.is_empty() { - assistant_text.push_str(content); - let _ = tx_event - .send(Ok(ResponseEvent::OutputTextDelta(content.to_string()))) - .await; - } - } + let item = ResponseItem::Message { + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: content.to_string(), + }], + id: None, + }; - // Forward any reasoning/thinking deltas if present. - if let Some(reasoning) = choice - .get("delta") - .and_then(|d| d.get("reasoning")) - .and_then(|c| c.as_str()) - { - let _ = tx_event - .send(Ok(ResponseEvent::ReasoningSummaryDelta( - reasoning.to_string(), - ))) - .await; - } - if let Some(reasoning_content) = choice - .get("delta") - .and_then(|d| d.get("reasoning_content")) - .and_then(|c| c.as_str()) - { - let _ = tx_event - .send(Ok(ResponseEvent::ReasoningSummaryDelta( - reasoning_content.to_string(), - ))) - .await; + let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } // Handle streaming function / tool calls. @@ -339,18 +317,7 @@ async fn process_chat_sse( let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } "stop" => { - // Regular turn without tool-call. Emit the final assistant message - // as a single OutputItemDone so non-delta consumers see the result. - if !assistant_text.is_empty() { - let item = ResponseItem::Message { - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: std::mem::take(&mut assistant_text), - }], - id: None, - }; - let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; - } + // Regular turn without tool-call. } _ => {} } @@ -391,10 +358,7 @@ async fn process_chat_sse( pub(crate) struct AggregatedChatStream { inner: S, cumulative: String, - cumulative_reasoning: String, - pending: std::collections::VecDeque, - // When true, do not emit a cumulative assistant message at Completed. - streaming_mode: bool, + pending_completed: Option, } impl Stream for AggregatedChatStream @@ -406,8 +370,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // First, flush any buffered events from the previous call. - if let Some(ev) = this.pending.pop_front() { + // First, flush any buffered Completed event from the previous call. + if let Some(ev) = this.pending_completed.take() { return Poll::Ready(Some(Ok(ev))); } @@ -424,21 +388,16 @@ where let is_assistant_delta = matches!(&item, crate::models::ResponseItem::Message { role, .. } if role == "assistant"); if is_assistant_delta { - // Only use the final assistant message if we have not - // seen any deltas; otherwise, deltas already built the - // cumulative text and this would duplicate it. - if this.cumulative.is_empty() { - if let crate::models::ResponseItem::Message { content, .. } = &item { - if let Some(text) = content.iter().find_map(|c| match c { - crate::models::ContentItem::OutputText { text } => Some(text), - _ => None, - }) { - this.cumulative.push_str(text); - } + if let crate::models::ResponseItem::Message { content, .. } = &item { + if let Some(text) = content.iter().find_map(|c| match c { + crate::models::ContentItem::OutputText { text } => Some(text), + _ => None, + }) { + this.cumulative.push_str(text); } } - // Swallow assistant message here; emit on Completed. + // Swallow partial assistant chunk; keep polling. continue; } @@ -449,48 +408,24 @@ where response_id, token_usage, }))) => { - // Build any aggregated items in the correct order: Reasoning first, then Message. - let mut emitted_any = false; - - if !this.cumulative_reasoning.is_empty() { - let aggregated_reasoning = crate::models::ResponseItem::Reasoning { - id: String::new(), - summary: vec![ - crate::models::ReasoningItemReasoningSummary::SummaryText { - text: std::mem::take(&mut this.cumulative_reasoning), - }, - ], - content: None, - encrypted_content: None, - }; - this.pending - .push_back(ResponseEvent::OutputItemDone(aggregated_reasoning)); - emitted_any = true; - } - if !this.cumulative.is_empty() { - let aggregated_message = crate::models::ResponseItem::Message { + let aggregated_item = crate::models::ResponseItem::Message { id: None, role: "assistant".to_string(), content: vec![crate::models::ContentItem::OutputText { text: std::mem::take(&mut this.cumulative), }], }; - this.pending - .push_back(ResponseEvent::OutputItemDone(aggregated_message)); - emitted_any = true; - } - // Always emit Completed last when anything was aggregated. - if emitted_any { - this.pending.push_back(ResponseEvent::Completed { - response_id: response_id.clone(), - token_usage: token_usage.clone(), + // Buffer Completed so it is returned *after* the aggregated message. + this.pending_completed = Some(ResponseEvent::Completed { + response_id, + token_usage, }); - // Return the first pending event now. - if let Some(ev) = this.pending.pop_front() { - return Poll::Ready(Some(Ok(ev))); - } + + return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone( + aggregated_item, + )))); } // Nothing aggregated – forward Completed directly. @@ -504,25 +439,11 @@ where // will never appear in a Chat Completions stream. continue; } - Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => { - // Always accumulate deltas so we can emit a final OutputItemDone at Completed. - this.cumulative.push_str(&delta); - if this.streaming_mode { - // In streaming mode, also forward the delta immediately. - return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))); - } else { - continue; - } - } - Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))) => { - // Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed. - this.cumulative_reasoning.push_str(&delta); - if this.streaming_mode { - // In streaming mode, also forward the delta immediately. - return Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))); - } else { - continue; - } + Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_)))) + | Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => { + // Deltas are ignored here since aggregation waits for the + // final OutputItemDone. + continue; } } } @@ -554,23 +475,9 @@ pub(crate) trait AggregateStreamExt: Stream> + Size AggregatedChatStream { inner: self, cumulative: String::new(), - cumulative_reasoning: String::new(), - pending: std::collections::VecDeque::new(), - streaming_mode: false, + pending_completed: None, } } } impl AggregateStreamExt for T where T: Stream> + Sized {} - -impl AggregatedChatStream { - pub(crate) fn streaming_mode(inner: S) -> Self { - AggregatedChatStream { - inner, - cumulative: String::new(), - cumulative_reasoning: String::new(), - pending: std::collections::VecDeque::new(), - streaming_mode: true, - } - } -} diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index fd530e0c..b9ea6b13 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -93,11 +93,7 @@ impl ModelClient { // Wrap it with the aggregation adapter so callers see *only* // the final assistant message per turn (matching the // behaviour of the Responses API). - let mut aggregated = if !self.config.hide_agent_reasoning { - crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream) - } else { - response_stream.aggregate() - }; + let mut aggregated = response_stream.aggregate(); // Bridge the aggregated stream back into a standard // `ResponseStream` by forwarding events through a channel. @@ -442,7 +438,7 @@ async fn process_sse( } } } - "response.reasoning_summary_text.delta" | "response.reasoning_text.delta" => { + "response.reasoning_summary_text.delta" => { if let Some(delta) = event.delta { let event = ResponseEvent::ReasoningSummaryDelta(delta); if tx_event.send(Ok(event)).await.is_err() { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index caebaed2..568d87c4 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -56,7 +56,6 @@ use crate::mcp_tool_call::handle_mcp_tool_call; use crate::models::ContentItem; use crate::models::FunctionCallOutputPayload; use crate::models::LocalShellAction; -use crate::models::ReasoningItemContent; use crate::models::ReasoningItemReasoningSummary; use crate::models::ResponseInputItem; use crate::models::ResponseItem; @@ -65,7 +64,6 @@ use crate::plan_tool::handle_update_plan; use crate::project_doc::get_user_instructions; use crate::protocol::AgentMessageDeltaEvent; use crate::protocol::AgentMessageEvent; -use crate::protocol::AgentReasoningContentEvent; use crate::protocol::AgentReasoningDeltaEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; @@ -229,7 +227,6 @@ pub(crate) struct Session { state: Mutex, codex_linux_sandbox_exe: Option, user_shell: shell::Shell, - hide_agent_reasoning: bool, } impl Session { @@ -825,7 +822,6 @@ async fn submission_loop( codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), disable_response_storage, user_shell: default_shell, - hide_agent_reasoning: config.hide_agent_reasoning, })); // Patch restored state into the newly created session. @@ -1136,7 +1132,6 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { ResponseItem::Reasoning { id, summary, - content, encrypted_content, }, None, @@ -1144,7 +1139,6 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { items_to_record_in_conversation_history.push(ResponseItem::Reasoning { id: id.clone(), summary: summary.clone(), - content: content.clone(), encrypted_content: encrypted_content.clone(), }); } @@ -1387,13 +1381,11 @@ async fn try_run_turn( sess.tx_event.send(event).await.ok(); } ResponseEvent::ReasoningSummaryDelta(delta) => { - if !sess.hide_agent_reasoning { - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }), - }; - sess.tx_event.send(event).await.ok(); - } + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }), + }; + sess.tx_event.send(event).await.ok(); } } } @@ -1501,36 +1493,16 @@ async fn handle_response_item( } None } - ResponseItem::Reasoning { - id: _, - summary, - content, - encrypted_content: _, - } => { - if !sess.hide_agent_reasoning { - for item in summary { - let text = match item { - ReasoningItemReasoningSummary::SummaryText { text } => text, - }; - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }), - }; - sess.tx_event.send(event).await.ok(); - } - } - if !sess.hide_agent_reasoning && content.is_some() { - let content = content.unwrap(); - for item in content { - let text = match item { - ReasoningItemContent::ReasoningText { text } => text, - }; - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentReasoningContent(AgentReasoningContentEvent { text }), - }; - sess.tx_event.send(event).await.ok(); - } + ResponseItem::Reasoning { summary, .. } => { + for item in summary { + let text = match item { + ReasoningItemReasoningSummary::SummaryText { text } => text, + }; + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }), + }; + sess.tx_event.send(event).await.ok(); } None } diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index 302c468b..b43dc56b 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -488,10 +488,6 @@ impl Config { Self::get_base_instructions(experimental_instructions_path, &resolved_cwd)?; let base_instructions = base_instructions.or(file_base_instructions); - // Resolve hide/show reasoning flags with consistent precedence: - // if hide is true, force show_reasoning_content to false. - let hide_agent_reasoning_val = cfg.hide_agent_reasoning.unwrap_or(false); - let config = Self { model, model_context_window, @@ -521,7 +517,7 @@ impl Config { tui: cfg.tui.unwrap_or_default(), codex_linux_sandbox_exe, - hide_agent_reasoning: hide_agent_reasoning_val, + hide_agent_reasoning: cfg.hide_agent_reasoning.unwrap_or(false), model_reasoning_effort: config_profile .model_reasoning_effort .or(cfg.model_reasoning_effort) diff --git a/codex-rs/core/src/models.rs b/codex-rs/core/src/models.rs index 98d8727e..16640491 100644 --- a/codex-rs/core/src/models.rs +++ b/codex-rs/core/src/models.rs @@ -45,8 +45,6 @@ pub enum ResponseItem { Reasoning { id: String, summary: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - content: Option>, encrypted_content: Option, }, LocalShellCall { @@ -138,12 +136,6 @@ pub enum ReasoningItemReasoningSummary { SummaryText { text: String }, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum ReasoningItemContent { - ReasoningText { text: String }, -} - impl From> for ResponseInputItem { fn from(items: Vec) -> Self { Self::Message { diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 1e073362..82591a2c 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -359,9 +359,6 @@ pub enum EventMsg { /// Agent reasoning delta event from agent. AgentReasoningDelta(AgentReasoningDeltaEvent), - /// Raw chain-of-thought from agent. - AgentReasoningContent(AgentReasoningContentEvent), - /// Ack the client's configure message. SessionConfigured(SessionConfiguredEvent), @@ -467,11 +464,6 @@ pub struct AgentReasoningEvent { pub text: String, } -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct AgentReasoningContentEvent { - pub text: String, -} - #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentReasoningDeltaEvent { pub delta: String, diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 1f8fe3c0..7703c138 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -4,7 +4,6 @@ use codex_core::config::Config; use codex_core::plan_tool::UpdatePlanArgs; use codex_core::protocol::AgentMessageDeltaEvent; use codex_core::protocol::AgentMessageEvent; -use codex_core::protocol::AgentReasoningContentEvent; use codex_core::protocol::AgentReasoningDeltaEvent; use codex_core::protocol::BackgroundEventEvent; use codex_core::protocol::ErrorEvent; @@ -204,14 +203,6 @@ impl EventProcessor for EventProcessorWithHumanOutput { #[allow(clippy::expect_used)] std::io::stdout().flush().expect("could not flush stdout"); } - EventMsg::AgentReasoningContent(AgentReasoningContentEvent { text }) => { - if !self.show_agent_reasoning { - return CodexStatus::Running; - } - print!("{text}"); - #[allow(clippy::expect_used)] - std::io::stdout().flush().expect("could not flush stdout"); - } EventMsg::AgentMessage(AgentMessageEvent { message }) => { // if answer_started is false, this means we haven't received any // delta. Thus, we need to print the message as a new answer. diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 3d32d8de..205dfa46 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -252,8 +252,7 @@ async fn run_codex_tool_session_inner( EventMsg::AgentMessage(AgentMessageEvent { .. }) => { // TODO: think how we want to support this in the MCP } - EventMsg::AgentReasoningContent(_) - | EventMsg::TaskStarted + EventMsg::TaskStarted | EventMsg::TokenCount(_) | EventMsg::AgentReasoning(_) | EventMsg::McpToolCallBegin(_) diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs index 5b95c313..1db39a23 100644 --- a/codex-rs/mcp-server/src/conversation_loop.rs +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -90,8 +90,7 @@ pub async fn run_conversation_loop( EventMsg::AgentMessage(AgentMessageEvent { .. }) => { // TODO: think how we want to support this in the MCP } - EventMsg::AgentReasoningContent(_) - | EventMsg::TaskStarted + EventMsg::TaskStarted | EventMsg::TokenCount(_) | EventMsg::AgentReasoning(_) | EventMsg::McpToolCallBegin(_)