Revert to 3f13ebce10 without rewriting history. Wrong merge

This commit is contained in:
Ahmed Ibrahim
2025-08-04 17:03:24 -07:00
parent 1a33de34b0
commit e38ce39c51
9 changed files with 55 additions and 211 deletions

View File

@@ -207,7 +207,6 @@ async fn process_chat_sse<S>(
}
let mut fn_call_state = FunctionCallState::default();
let mut assistant_text = String::new();
loop {
let sse = match timeout(idle_timeout, stream.next()).await {
@@ -255,42 +254,21 @@ async fn process_chat_sse<S>(
let choice_opt = chunk.get("choices").and_then(|c| c.get(0));
if let Some(choice) = choice_opt {
// Handle assistant content tokens as streaming deltas.
// Handle assistant content tokens.
if let Some(content) = choice
.get("delta")
.and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
if !content.is_empty() {
assistant_text.push_str(content);
let _ = tx_event
.send(Ok(ResponseEvent::OutputTextDelta(content.to_string())))
.await;
}
}
let item = ResponseItem::Message {
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: content.to_string(),
}],
id: None,
};
// Forward any reasoning/thinking deltas if present.
if let Some(reasoning) = choice
.get("delta")
.and_then(|d| d.get("reasoning"))
.and_then(|c| c.as_str())
{
let _ = tx_event
.send(Ok(ResponseEvent::ReasoningSummaryDelta(
reasoning.to_string(),
)))
.await;
}
if let Some(reasoning_content) = choice
.get("delta")
.and_then(|d| d.get("reasoning_content"))
.and_then(|c| c.as_str())
{
let _ = tx_event
.send(Ok(ResponseEvent::ReasoningSummaryDelta(
reasoning_content.to_string(),
)))
.await;
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
}
// Handle streaming function / tool calls.
@@ -339,18 +317,7 @@ async fn process_chat_sse<S>(
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
}
"stop" => {
// Regular turn without tool-call. Emit the final assistant message
// as a single OutputItemDone so non-delta consumers see the result.
if !assistant_text.is_empty() {
let item = ResponseItem::Message {
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: std::mem::take(&mut assistant_text),
}],
id: None,
};
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
}
// Regular turn without tool-call.
}
_ => {}
}
@@ -391,10 +358,7 @@ async fn process_chat_sse<S>(
pub(crate) struct AggregatedChatStream<S> {
inner: S,
cumulative: String,
cumulative_reasoning: String,
pending: std::collections::VecDeque<ResponseEvent>,
// When true, do not emit a cumulative assistant message at Completed.
streaming_mode: bool,
pending_completed: Option<ResponseEvent>,
}
impl<S> Stream for AggregatedChatStream<S>
@@ -406,8 +370,8 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// First, flush any buffered events from the previous call.
if let Some(ev) = this.pending.pop_front() {
// First, flush any buffered Completed event from the previous call.
if let Some(ev) = this.pending_completed.take() {
return Poll::Ready(Some(Ok(ev)));
}
@@ -424,21 +388,16 @@ where
let is_assistant_delta = matches!(&item, crate::models::ResponseItem::Message { role, .. } if role == "assistant");
if is_assistant_delta {
// Only use the final assistant message if we have not
// seen any deltas; otherwise, deltas already built the
// cumulative text and this would duplicate it.
if this.cumulative.is_empty() {
if let crate::models::ResponseItem::Message { content, .. } = &item {
if let Some(text) = content.iter().find_map(|c| match c {
crate::models::ContentItem::OutputText { text } => Some(text),
_ => None,
}) {
this.cumulative.push_str(text);
}
if let crate::models::ResponseItem::Message { content, .. } = &item {
if let Some(text) = content.iter().find_map(|c| match c {
crate::models::ContentItem::OutputText { text } => Some(text),
_ => None,
}) {
this.cumulative.push_str(text);
}
}
// Swallow assistant message here; emit on Completed.
// Swallow partial assistant chunk; keep polling.
continue;
}
@@ -449,48 +408,24 @@ where
response_id,
token_usage,
}))) => {
// Build any aggregated items in the correct order: Reasoning first, then Message.
let mut emitted_any = false;
if !this.cumulative_reasoning.is_empty() {
let aggregated_reasoning = crate::models::ResponseItem::Reasoning {
id: String::new(),
summary: vec![
crate::models::ReasoningItemReasoningSummary::SummaryText {
text: std::mem::take(&mut this.cumulative_reasoning),
},
],
content: None,
encrypted_content: None,
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
emitted_any = true;
}
if !this.cumulative.is_empty() {
let aggregated_message = crate::models::ResponseItem::Message {
let aggregated_item = crate::models::ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![crate::models::ContentItem::OutputText {
text: std::mem::take(&mut this.cumulative),
}],
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
emitted_any = true;
}
// Always emit Completed last when anything was aggregated.
if emitted_any {
this.pending.push_back(ResponseEvent::Completed {
response_id: response_id.clone(),
token_usage: token_usage.clone(),
// Buffer Completed so it is returned *after* the aggregated message.
this.pending_completed = Some(ResponseEvent::Completed {
response_id,
token_usage,
});
// Return the first pending event now.
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
aggregated_item,
))));
}
// Nothing aggregated forward Completed directly.
@@ -504,25 +439,11 @@ where
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
this.cumulative.push_str(&delta);
if this.streaming_mode {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
} else {
continue;
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))) => {
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
this.cumulative_reasoning.push_str(&delta);
if this.streaming_mode {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta))));
} else {
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
// Deltas are ignored here since aggregation waits for the
// final OutputItemDone.
continue;
}
}
}
@@ -554,23 +475,9 @@ pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Size
AggregatedChatStream {
inner: self,
cumulative: String::new(),
cumulative_reasoning: String::new(),
pending: std::collections::VecDeque::new(),
streaming_mode: false,
pending_completed: None,
}
}
}
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
impl<S> AggregatedChatStream<S> {
pub(crate) fn streaming_mode(inner: S) -> Self {
AggregatedChatStream {
inner,
cumulative: String::new(),
cumulative_reasoning: String::new(),
pending: std::collections::VecDeque::new(),
streaming_mode: true,
}
}
}

View File

@@ -93,11 +93,7 @@ impl ModelClient {
// Wrap it with the aggregation adapter so callers see *only*
// the final assistant message per turn (matching the
// behaviour of the Responses API).
let mut aggregated = if !self.config.hide_agent_reasoning {
crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream)
} else {
response_stream.aggregate()
};
let mut aggregated = response_stream.aggregate();
// Bridge the aggregated stream back into a standard
// `ResponseStream` by forwarding events through a channel.
@@ -442,7 +438,7 @@ async fn process_sse<S>(
}
}
}
"response.reasoning_summary_text.delta" | "response.reasoning_text.delta" => {
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {

View File

@@ -56,7 +56,6 @@ use crate::mcp_tool_call::handle_mcp_tool_call;
use crate::models::ContentItem;
use crate::models::FunctionCallOutputPayload;
use crate::models::LocalShellAction;
use crate::models::ReasoningItemContent;
use crate::models::ReasoningItemReasoningSummary;
use crate::models::ResponseInputItem;
use crate::models::ResponseItem;
@@ -65,7 +64,6 @@ use crate::plan_tool::handle_update_plan;
use crate::project_doc::get_user_instructions;
use crate::protocol::AgentMessageDeltaEvent;
use crate::protocol::AgentMessageEvent;
use crate::protocol::AgentReasoningContentEvent;
use crate::protocol::AgentReasoningDeltaEvent;
use crate::protocol::AgentReasoningEvent;
use crate::protocol::ApplyPatchApprovalRequestEvent;
@@ -229,7 +227,6 @@ pub(crate) struct Session {
state: Mutex<State>,
codex_linux_sandbox_exe: Option<PathBuf>,
user_shell: shell::Shell,
hide_agent_reasoning: bool,
}
impl Session {
@@ -825,7 +822,6 @@ async fn submission_loop(
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
disable_response_storage,
user_shell: default_shell,
hide_agent_reasoning: config.hide_agent_reasoning,
}));
// Patch restored state into the newly created session.
@@ -1136,7 +1132,6 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
@@ -1144,7 +1139,6 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
@@ -1387,13 +1381,11 @@ async fn try_run_turn(
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
if !sess.hide_agent_reasoning {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
}
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
}
}
}
@@ -1501,36 +1493,16 @@ async fn handle_response_item(
}
None
}
ResponseItem::Reasoning {
id: _,
summary,
content,
encrypted_content: _,
} => {
if !sess.hide_agent_reasoning {
for item in summary {
let text = match item {
ReasoningItemReasoningSummary::SummaryText { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
}
if !sess.hide_agent_reasoning && content.is_some() {
let content = content.unwrap();
for item in content {
let text = match item {
ReasoningItemContent::ReasoningText { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningContent(AgentReasoningContentEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
ResponseItem::Reasoning { summary, .. } => {
for item in summary {
let text = match item {
ReasoningItemReasoningSummary::SummaryText { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
None
}

View File

@@ -488,10 +488,6 @@ impl Config {
Self::get_base_instructions(experimental_instructions_path, &resolved_cwd)?;
let base_instructions = base_instructions.or(file_base_instructions);
// Resolve hide/show reasoning flags with consistent precedence:
// if hide is true, force show_reasoning_content to false.
let hide_agent_reasoning_val = cfg.hide_agent_reasoning.unwrap_or(false);
let config = Self {
model,
model_context_window,
@@ -521,7 +517,7 @@ impl Config {
tui: cfg.tui.unwrap_or_default(),
codex_linux_sandbox_exe,
hide_agent_reasoning: hide_agent_reasoning_val,
hide_agent_reasoning: cfg.hide_agent_reasoning.unwrap_or(false),
model_reasoning_effort: config_profile
.model_reasoning_effort
.or(cfg.model_reasoning_effort)

View File

@@ -45,8 +45,6 @@ pub enum ResponseItem {
Reasoning {
id: String,
summary: Vec<ReasoningItemReasoningSummary>,
#[serde(default, skip_serializing_if = "Option::is_none")]
content: Option<Vec<ReasoningItemContent>>,
encrypted_content: Option<String>,
},
LocalShellCall {
@@ -138,12 +136,6 @@ pub enum ReasoningItemReasoningSummary {
SummaryText { text: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ReasoningItemContent {
ReasoningText { text: String },
}
impl From<Vec<InputItem>> for ResponseInputItem {
fn from(items: Vec<InputItem>) -> Self {
Self::Message {

View File

@@ -359,9 +359,6 @@ pub enum EventMsg {
/// Agent reasoning delta event from agent.
AgentReasoningDelta(AgentReasoningDeltaEvent),
/// Raw chain-of-thought from agent.
AgentReasoningContent(AgentReasoningContentEvent),
/// Ack the client's configure message.
SessionConfigured(SessionConfiguredEvent),
@@ -467,11 +464,6 @@ pub struct AgentReasoningEvent {
pub text: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningContentEvent {
pub text: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningDeltaEvent {
pub delta: String,

View File

@@ -4,7 +4,6 @@ use codex_core::config::Config;
use codex_core::plan_tool::UpdatePlanArgs;
use codex_core::protocol::AgentMessageDeltaEvent;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningContentEvent;
use codex_core::protocol::AgentReasoningDeltaEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::ErrorEvent;
@@ -204,14 +203,6 @@ impl EventProcessor for EventProcessorWithHumanOutput {
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningContent(AgentReasoningContentEvent { text }) => {
if !self.show_agent_reasoning {
return CodexStatus::Running;
}
print!("{text}");
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// if answer_started is false, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.

View File

@@ -252,8 +252,7 @@ async fn run_codex_tool_session_inner(
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::AgentReasoningContent(_)
| EventMsg::TaskStarted
EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::McpToolCallBegin(_)

View File

@@ -90,8 +90,7 @@ pub async fn run_conversation_loop(
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::AgentReasoningContent(_)
| EventMsg::TaskStarted
EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::McpToolCallBegin(_)