From 3429e82e45b6956bd49950ec36bef8fb424e6002 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 29 Oct 2025 15:33:57 -0700 Subject: [PATCH] Add item streaming events (#5546) Adds AgentMessageContentDelta, ReasoningContentDelta, ReasoningRawContentDelta item streaming events while maintaining compatibility for old events. --------- Co-authored-by: Owen Lin --- codex-rs/Cargo.lock | 60 +++--- codex-rs/core/src/chat_completions.rs | 131 +++++++------ codex-rs/core/src/client.rs | 24 +-- codex-rs/core/src/client_common.rs | 4 +- codex-rs/core/src/codex.rs | 181 +++++++++-------- codex-rs/core/src/conversation_history.rs | 12 +- codex-rs/core/src/event_mapping.rs | 13 +- codex-rs/core/src/rollout/policy.rs | 5 +- codex-rs/core/src/tasks/review.rs | 9 + codex-rs/core/src/util.rs | 9 + codex-rs/core/tests/chat_completions_sse.rs | 85 +++++--- codex-rs/core/tests/common/responses.rs | 49 +++++ codex-rs/core/tests/suite/client.rs | 4 + codex-rs/core/tests/suite/items.rs | 183 ++++++++++++++++++ .../src/event_processor_with_human_output.rs | 3 + codex-rs/mcp-server/src/codex_tool_runner.rs | 3 + codex-rs/protocol/src/protocol.rs | 125 ++++++++++++ codex-rs/tui/src/chatwidget.rs | 5 +- 18 files changed, 662 insertions(+), 243 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 9d7d9ddc..7bd60b12 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -592,9 +592,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.9.1" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" [[package]] name = "block-buffer" @@ -1739,7 +1739,7 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "crossterm_winapi", "futures-core", "mio", @@ -2083,7 +2083,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "objc2", ] @@ -3208,7 +3208,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "inotify-sys", "libc", ] @@ -3271,7 +3271,7 @@ version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "cfg-if", "libc", ] @@ -3517,7 +3517,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4488594b9328dee448adb906d8b126d9b7deb7cf5c22161ee591610bb1be83c0" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "libc", ] @@ -3527,7 +3527,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "761e49ec5fd8a5a463f9b84e877c373d888935b71c6be78f3767fe2ae6bed18e" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "libc", ] @@ -3795,7 +3795,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "cfg-if", "cfg_aliases 0.1.1", "libc", @@ -3807,7 +3807,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "cfg-if", "cfg_aliases 0.2.1", "libc", @@ -3820,7 +3820,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "cfg-if", "cfg_aliases 0.2.1", "libc", @@ -3848,7 +3848,7 @@ version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "fsevent-sys", "inotify", "kqueue", @@ -4018,7 +4018,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6f29f568bec459b0ddff777cec4fe3fd8666d82d5a40ebd0ff7e66134f89bcc" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "objc2", "objc2-core-graphics", "objc2-foundation", @@ -4030,7 +4030,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "dispatch2", "objc2", ] @@ -4041,7 +4041,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "989c6c68c13021b5c2d6b71456ebb0f9dc78d752e86a98da7c716f4f9470f5a4" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "dispatch2", "objc2", "objc2-core-foundation", @@ -4060,7 +4060,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "900831247d2fe1a09a683278e5384cfb8c80c79fe6b166f9d14bfdde0ea1b03c" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "objc2", "objc2-core-foundation", ] @@ -4071,7 +4071,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7282e9ac92529fa3457ce90ebb15f4ecbc383e8338060960760fa2cf75420c3c" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "objc2", "objc2-core-foundation", ] @@ -4103,7 +4103,7 @@ version = "0.10.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -4435,7 +4435,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97baced388464909d42d89643fe4361939af9b7ce7a31ee32a168f832a70f2a0" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "crc32fast", "fdeflate", "flate2", @@ -4625,7 +4625,7 @@ version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76979bea66e7875e7509c4ec5300112b316af87fa7a252ca91c448b32dfe3993" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "getopts", "memchr", "pulldown-cmark-escape", @@ -4806,7 +4806,7 @@ name = "ratatui" version = "0.29.0" source = "git+https://github.com/nornagon/ratatui?branch=nornagon-v0.29.0-patch#9b2ad1298408c45918ee9f8241a6f95498cdbed2" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "cassowary", "compact_str", "crossterm", @@ -4836,7 +4836,7 @@ version = "0.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e8af0dde094006011e6a740d4879319439489813bd0bcdc7d821beaeeff48ec" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", ] [[package]] @@ -5065,7 +5065,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.4.15", @@ -5078,7 +5078,7 @@ version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.9.4", @@ -5144,7 +5144,7 @@ version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "cfg-if", "clipboard-win", "fd-lock", @@ -5343,7 +5343,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -5356,7 +5356,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -6055,7 +6055,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "core-foundation 0.9.4", "system-configuration-sys", ] @@ -6513,7 +6513,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", "bytes", "futures-util", "http", @@ -7588,7 +7588,7 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.9.1", + "bitflags 2.10.0", ] [[package]] diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 6200a587..9b01a6be 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -423,6 +423,61 @@ pub(crate) async fn stream_chat_completions( } } +async fn append_assistant_text( + tx_event: &mpsc::Sender>, + assistant_item: &mut Option, + text: String, +) { + if assistant_item.is_none() { + let item = ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![], + }; + *assistant_item = Some(item.clone()); + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemAdded(item))) + .await; + } + + if let Some(ResponseItem::Message { content, .. }) = assistant_item { + content.push(ContentItem::OutputText { text: text.clone() }); + let _ = tx_event + .send(Ok(ResponseEvent::OutputTextDelta(text.clone()))) + .await; + } +} + +async fn append_reasoning_text( + tx_event: &mpsc::Sender>, + reasoning_item: &mut Option, + text: String, +) { + if reasoning_item.is_none() { + let item = ResponseItem::Reasoning { + id: String::new(), + summary: Vec::new(), + content: Some(vec![]), + encrypted_content: None, + }; + *reasoning_item = Some(item.clone()); + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemAdded(item))) + .await; + } + + if let Some(ResponseItem::Reasoning { + content: Some(content), + .. + }) = reasoning_item + { + content.push(ReasoningItemContent::ReasoningText { text: text.clone() }); + + let _ = tx_event + .send(Ok(ResponseEvent::ReasoningContentDelta(text.clone()))) + .await; + } +} /// Lightweight SSE processor for the Chat Completions streaming format. The /// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest /// of the pipeline can stay agnostic of the underlying wire format. @@ -450,8 +505,8 @@ async fn process_chat_sse( } let mut fn_call_state = FunctionCallState::default(); - let mut assistant_text = String::new(); - let mut reasoning_text = String::new(); + let mut assistant_item: Option = None; + let mut reasoning_item: Option = None; loop { let start = std::time::Instant::now(); @@ -492,26 +547,11 @@ async fn process_chat_sse( if sse.data.trim() == "[DONE]" { // Emit any finalized items before closing so downstream consumers receive // terminal events for both assistant content and raw reasoning. - if !assistant_text.is_empty() { - let item = ResponseItem::Message { - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: std::mem::take(&mut assistant_text), - }], - id: None, - }; + if let Some(item) = assistant_item { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } - if !reasoning_text.is_empty() { - let item = ResponseItem::Reasoning { - id: String::new(), - summary: Vec::new(), - content: Some(vec![ReasoningItemContent::ReasoningText { - text: std::mem::take(&mut reasoning_text), - }]), - encrypted_content: None, - }; + if let Some(item) = reasoning_item { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } @@ -541,10 +581,7 @@ async fn process_chat_sse( .and_then(|c| c.as_str()) && !content.is_empty() { - assistant_text.push_str(content); - let _ = tx_event - .send(Ok(ResponseEvent::OutputTextDelta(content.to_string()))) - .await; + append_assistant_text(&tx_event, &mut assistant_item, content.to_string()).await; } // Forward any reasoning/thinking deltas if present. @@ -574,10 +611,7 @@ async fn process_chat_sse( if let Some(reasoning) = maybe_text { // Accumulate so we can emit a terminal Reasoning item at the end. - reasoning_text.push_str(&reasoning); - let _ = tx_event - .send(Ok(ResponseEvent::ReasoningContentDelta(reasoning))) - .await; + append_reasoning_text(&tx_event, &mut reasoning_item, reasoning).await; } } @@ -587,10 +621,7 @@ async fn process_chat_sse( // Accept either a plain string or an object with { text | content } if let Some(s) = message_reasoning.as_str() { if !s.is_empty() { - reasoning_text.push_str(s); - let _ = tx_event - .send(Ok(ResponseEvent::ReasoningContentDelta(s.to_string()))) - .await; + append_reasoning_text(&tx_event, &mut reasoning_item, s.to_string()).await; } } else if let Some(obj) = message_reasoning.as_object() && let Some(s) = obj @@ -599,10 +630,7 @@ async fn process_chat_sse( .or_else(|| obj.get("content").and_then(|v| v.as_str())) && !s.is_empty() { - reasoning_text.push_str(s); - let _ = tx_event - .send(Ok(ResponseEvent::ReasoningContentDelta(s.to_string()))) - .await; + append_reasoning_text(&tx_event, &mut reasoning_item, s.to_string()).await; } } @@ -640,15 +668,7 @@ async fn process_chat_sse( "tool_calls" if fn_call_state.active => { // First, flush the terminal raw reasoning so UIs can finalize // the reasoning stream before any exec/tool events begin. - if !reasoning_text.is_empty() { - let item = ResponseItem::Reasoning { - id: String::new(), - summary: Vec::new(), - content: Some(vec![ReasoningItemContent::ReasoningText { - text: std::mem::take(&mut reasoning_text), - }]), - encrypted_content: None, - }; + if let Some(item) = reasoning_item.take() { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } @@ -665,26 +685,11 @@ async fn process_chat_sse( "stop" => { // Regular turn without tool-call. Emit the final assistant message // as a single OutputItemDone so non-delta consumers see the result. - if !assistant_text.is_empty() { - let item = ResponseItem::Message { - role: "assistant".to_string(), - content: vec![ContentItem::OutputText { - text: std::mem::take(&mut assistant_text), - }], - id: None, - }; + if let Some(item) = assistant_item.take() { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } // Also emit a terminal Reasoning item so UIs can finalize raw reasoning. - if !reasoning_text.is_empty() { - let item = ResponseItem::Reasoning { - id: String::new(), - summary: Vec::new(), - content: Some(vec![ReasoningItemContent::ReasoningText { - text: std::mem::take(&mut reasoning_text), - }]), - encrypted_content: None, - }; + if let Some(item) = reasoning_item.take() { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } } @@ -903,8 +908,8 @@ where Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded))) => { continue; } - Poll::Ready(Some(Ok(ResponseEvent::WebSearchCallBegin { call_id }))) => { - return Poll::Ready(Some(Ok(ResponseEvent::WebSearchCallBegin { call_id }))); + Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => { + return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))); } } } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 5d8da794..45c84d90 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -869,21 +869,15 @@ async fn process_sse( | "response.in_progress" | "response.output_text.done" => {} "response.output_item.added" => { - if let Some(item) = event.item.as_ref() { - // Detect web_search_call begin and forward a synthetic event upstream. - if let Some(ty) = item.get("type").and_then(|v| v.as_str()) - && ty == "web_search_call" - { - let call_id = item - .get("id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let ev = ResponseEvent::WebSearchCallBegin { call_id }; - if tx_event.send(Ok(ev)).await.is_err() { - return; - } - } + let Some(item_val) = event.item else { continue }; + let Ok(item) = serde_json::from_value::(item_val) else { + debug!("failed to parse ResponseItem from output_item.done"); + continue; + }; + + let event = ResponseEvent::OutputItemAdded(item); + if tx_event.send(Ok(event)).await.is_err() { + return; } } "response.reasoning_summary_part.added" => { diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index 203a6302..2ac02f5f 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -197,6 +197,7 @@ fn strip_total_output_header(output: &str) -> Option<&str> { pub enum ResponseEvent { Created, OutputItemDone(ResponseItem), + OutputItemAdded(ResponseItem), Completed { response_id: String, token_usage: Option, @@ -205,9 +206,6 @@ pub enum ResponseEvent { ReasoningSummaryDelta(String), ReasoningContentDelta(String), ReasoningSummaryPartAdded, - WebSearchCallBegin { - call_id: String, - }, RateLimits(RateLimitSnapshot), } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8e45f250..f1d040b2 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -15,11 +15,13 @@ use crate::parse_turn_item; use crate::response_processing::process_items; use crate::terminal; use crate::user_notification::UserNotifier; +use crate::util::error_or_panic; use async_channel::Receiver; use async_channel::Sender; use codex_protocol::ConversationId; use codex_protocol::items::TurnItem; use codex_protocol::protocol::FileChange; +use codex_protocol::protocol::HasLegacyEvent; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::RawResponseItemEvent; @@ -69,9 +71,7 @@ use crate::mcp_connection_manager::McpConnectionManager; use crate::model_family::find_family_for_model; use crate::openai_model_info::get_model_info; use crate::project_doc::get_user_instructions; -use crate::protocol::AgentMessageDeltaEvent; -use crate::protocol::AgentReasoningDeltaEvent; -use crate::protocol::AgentReasoningRawContentDeltaEvent; +use crate::protocol::AgentMessageContentDeltaEvent; use crate::protocol::AgentReasoningSectionBreakEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; use crate::protocol::AskForApproval; @@ -83,6 +83,8 @@ use crate::protocol::EventMsg; use crate::protocol::ExecApprovalRequestEvent; use crate::protocol::Op; use crate::protocol::RateLimitSnapshot; +use crate::protocol::ReasoningContentDeltaEvent; +use crate::protocol::ReasoningRawContentDeltaEvent; use crate::protocol::ReviewDecision; use crate::protocol::SandboxCommandAssessment; use crate::protocol::SandboxPolicy; @@ -92,7 +94,6 @@ use crate::protocol::Submission; use crate::protocol::TokenCountEvent; use crate::protocol::TokenUsage; use crate::protocol::TurnDiffEvent; -use crate::protocol::WebSearchBeginEvent; use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; use crate::shell; @@ -729,11 +730,21 @@ impl Session { /// Persist the event to rollout and send it to clients. pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) { + let legacy_source = msg.clone(); let event = Event { id: turn_context.sub_id.clone(), msg, }; self.send_event_raw(event).await; + + let show_raw_agent_reasoning = self.show_raw_agent_reasoning(); + for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) { + let legacy_event = Event { + id: turn_context.sub_id.clone(), + msg: legacy, + }; + self.send_event_raw(legacy_event).await; + } } pub(crate) async fn send_event_raw(&self, event: Event) { @@ -757,45 +768,16 @@ impl Session { .await; } - async fn emit_turn_item_completed( - &self, - turn_context: &TurnContext, - item: TurnItem, - emit_raw_agent_reasoning: bool, - ) { + async fn emit_turn_item_completed(&self, turn_context: &TurnContext, item: TurnItem) { self.send_event( turn_context, EventMsg::ItemCompleted(ItemCompletedEvent { thread_id: self.conversation_id, turn_id: turn_context.sub_id.clone(), - item: item.clone(), + item, }), ) .await; - self.emit_turn_item_legacy_events(turn_context, &item, emit_raw_agent_reasoning) - .await; - } - - async fn emit_turn_item_started_completed( - &self, - turn_context: &TurnContext, - item: TurnItem, - emit_raw_agent_reasoning: bool, - ) { - self.emit_turn_item_started(turn_context, &item).await; - self.emit_turn_item_completed(turn_context, item, emit_raw_agent_reasoning) - .await; - } - - async fn emit_turn_item_legacy_events( - &self, - turn_context: &TurnContext, - item: &TurnItem, - emit_raw_agent_reasoning: bool, - ) { - for event in item.as_legacy_events(emit_raw_agent_reasoning) { - self.send_event(turn_context, event).await; - } } pub(crate) async fn assess_sandbox_command( @@ -1092,8 +1074,8 @@ impl Session { let turn_item = parse_turn_item(&response_item); if let Some(item @ TurnItem::UserMessage(_)) = turn_item { - self.emit_turn_item_started_completed(turn_context, item, false) - .await; + self.emit_turn_item_started(turn_context, &item).await; + self.emit_turn_item_completed(turn_context, item).await; } } @@ -1910,14 +1892,13 @@ async fn run_turn( Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), Err(e @ CodexErr::Fatal(_)) => return Err(e), Err(e @ CodexErr::ContextWindowExceeded) => { - sess.set_total_tokens_full(turn_context.as_ref()).await; + sess.set_total_tokens_full(&turn_context).await; return Err(e); } Err(CodexErr::UsageLimitReached(e)) => { let rate_limits = e.rate_limits.clone(); if let Some(rate_limits) = rate_limits { - sess.update_rate_limits(turn_context.as_ref(), rate_limits) - .await; + sess.update_rate_limits(&turn_context, rate_limits).await; } return Err(CodexErr::UsageLimitReached(e)); } @@ -1939,8 +1920,8 @@ async fn run_turn( // user understands what is happening instead of staring // at a seemingly frozen screen. sess.notify_stream_error( - turn_context.as_ref(), - format!("Reconnecting... {retries}/{max_retries}"), + &turn_context, + format!("Re-connecting... {retries}/{max_retries}"), ) .await; @@ -2004,6 +1985,8 @@ async fn try_run_turn( let mut output: FuturesOrdered>> = FuturesOrdered::new(); + let mut active_item: Option = None; + loop { // Poll the next item from the model stream. We must inspect *both* Ok and Err // cases so that transient stream failures (e.g., dropped SSE connection before @@ -2035,6 +2018,7 @@ async fn try_run_turn( match event { ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { + let previously_active_item = active_item.take(); match ToolRouter::build_tool_call(sess.as_ref(), item.clone()) { Ok(Some(call)) => { let payload_preview = call.payload.log_payload().into_owned(); @@ -2054,14 +2038,19 @@ async fn try_run_turn( ); } Ok(None) => { - let response = handle_non_tool_response_item( - sess.as_ref(), - Arc::clone(&turn_context), - item.clone(), - sess.show_raw_agent_reasoning(), - ) - .await?; - add_completed(ProcessedResponseItem { item, response }); + if let Some(turn_item) = handle_non_tool_response_item(&item).await { + if previously_active_item.is_none() { + sess.emit_turn_item_started(&turn_context, &turn_item).await; + } + + sess.emit_turn_item_completed(&turn_context, turn_item) + .await; + } + + add_completed(ProcessedResponseItem { + item, + response: None, + }); } Err(FunctionCallError::MissingLocalShellCallId) => { let msg = "LocalShellCall without call_id or id"; @@ -2102,26 +2091,24 @@ async fn try_run_turn( } } } - ResponseEvent::WebSearchCallBegin { call_id } => { - let _ = sess - .tx_event - .send(Event { - id: turn_context.sub_id.clone(), - msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }), - }) - .await; + ResponseEvent::OutputItemAdded(item) => { + if let Some(turn_item) = handle_non_tool_response_item(&item).await { + let tracked_item = turn_item.clone(); + sess.emit_turn_item_started(&turn_context, &turn_item).await; + + active_item = Some(tracked_item); + } } ResponseEvent::RateLimits(snapshot) => { // Update internal state with latest rate limits, but defer sending until // token usage is available to avoid duplicate TokenCount events. - sess.update_rate_limits(turn_context.as_ref(), snapshot) - .await; + sess.update_rate_limits(&turn_context, snapshot).await; } ResponseEvent::Completed { response_id: _, token_usage, } => { - sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref()) + sess.update_token_usage_info(&turn_context, token_usage.as_ref()) .await; let processed_items = output.try_collect().await?; let unified_diff = { @@ -2141,12 +2128,34 @@ async fn try_run_turn( return Ok(result); } ResponseEvent::OutputTextDelta(delta) => { - let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }); - sess.send_event(&turn_context, event).await; + // In review child threads, suppress assistant text deltas; the + // UI will show a selection popup from the final ReviewOutput. + if let Some(active) = active_item.as_ref() { + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta: delta.clone(), + }; + sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; + } else { + error_or_panic("ReasoningSummaryDelta without active item".to_string()); + } } ResponseEvent::ReasoningSummaryDelta(delta) => { - let event = EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }); - sess.send_event(&turn_context, event).await; + if let Some(active) = active_item.as_ref() { + let event = ReasoningContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta: delta.clone(), + }; + sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event)) + .await; + } else { + error_or_panic("ReasoningSummaryDelta without active item".to_string()); + } } ResponseEvent::ReasoningSummaryPartAdded => { let event = @@ -2154,46 +2163,36 @@ async fn try_run_turn( sess.send_event(&turn_context, event).await; } ResponseEvent::ReasoningContentDelta(delta) => { - if sess.show_raw_agent_reasoning() { - let event = EventMsg::AgentReasoningRawContentDelta( - AgentReasoningRawContentDeltaEvent { delta }, - ); - sess.send_event(&turn_context, event).await; + if let Some(active) = active_item.as_ref() { + let event = ReasoningRawContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: active.id(), + delta: delta.clone(), + }; + sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event)) + .await; + } else { + error_or_panic("ReasoningRawContentDelta without active item".to_string()); } } } } } -async fn handle_non_tool_response_item( - sess: &Session, - turn_context: Arc, - item: ResponseItem, - show_raw_agent_reasoning: bool, -) -> CodexResult> { +async fn handle_non_tool_response_item(item: &ResponseItem) -> Option { debug!(?item, "Output item"); - match &item { + match item { ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } - | ResponseItem::WebSearchCall { .. } => { - let turn_item = parse_turn_item(&item); - if let Some(turn_item) = turn_item { - sess.emit_turn_item_started_completed( - turn_context.as_ref(), - turn_item, - show_raw_agent_reasoning, - ) - .await; - } - } + | ResponseItem::WebSearchCall { .. } => parse_turn_item(item), ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { debug!("unexpected tool output from stream"); + None } - _ => {} + _ => None, } - - Ok(None) } pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index 9c9e4889..bc660d1c 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -1,12 +1,13 @@ use codex_protocol::models::FunctionCallOutputContentItem; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; + +use crate::util::error_or_panic; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; use codex_utils_string::take_bytes_at_char_boundary; use codex_utils_string::take_last_bytes_at_char_boundary; use std::ops::Deref; -use tracing::error; // Model-formatting limits: clients get full streams; only content sent to the model is truncated. pub(crate) const MODEL_FORMAT_MAX_BYTES: usize = 10 * 1024; // 10 KiB @@ -501,15 +502,6 @@ fn truncate_formatted_exec_output(content: &str, total_lines: usize) -> String { result } -#[inline] -fn error_or_panic(message: String) { - if cfg!(debug_assertions) || env!("CARGO_PKG_VERSION").contains("alpha") { - panic!("{message}"); - } else { - error!("{message}"); - } -} - /// Anything that is not a system message or "reasoning" message is considered /// an API message. fn is_api_message(message: &ResponseItem) -> bool { diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index 0c41951e..73c44fd4 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -11,6 +11,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::models::WebSearchAction; use codex_protocol::user_input::UserInput; use tracing::warn; +use uuid::Uuid; fn is_session_prefix(text: &str) -> bool { let trimmed = text.trim_start(); @@ -46,7 +47,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option { Some(UserMessageItem::new(&content)) } -fn parse_agent_message(message: &[ContentItem]) -> AgentMessageItem { +fn parse_agent_message(id: Option<&String>, message: &[ContentItem]) -> AgentMessageItem { let mut content: Vec = Vec::new(); for content_item in message.iter() { match content_item { @@ -61,14 +62,18 @@ fn parse_agent_message(message: &[ContentItem]) -> AgentMessageItem { } } } - AgentMessageItem::new(&content) + let id = id.cloned().unwrap_or_else(|| Uuid::new_v4().to_string()); + AgentMessageItem { id, content } } pub fn parse_turn_item(item: &ResponseItem) -> Option { match item { - ResponseItem::Message { role, content, .. } => match role.as_str() { + ResponseItem::Message { role, content, id } => match role.as_str() { "user" => parse_user_message(content).map(TurnItem::UserMessage), - "assistant" => Some(TurnItem::AgentMessage(parse_agent_message(content))), + "assistant" => Some(TurnItem::AgentMessage(parse_agent_message( + id.as_ref(), + content, + ))), "system" => None, _ => None, }, diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 0db57d1d..77c16206 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -77,6 +77,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::ViewImageToolCall(_) | EventMsg::DeprecationNotice(_) | EventMsg::ItemStarted(_) - | EventMsg::ItemCompleted(_) => false, + | EventMsg::ItemCompleted(_) + | EventMsg::AgentMessageContentDelta(_) + | EventMsg::ReasoningContentDelta(_) + | EventMsg::ReasoningRawContentDelta(_) => false, } } diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index ba622100..08576cad 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -1,11 +1,13 @@ use std::sync::Arc; use async_trait::async_trait; +use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExitedReviewModeEvent; +use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ReviewOutputEvent; use tokio_util::sync::CancellationToken; @@ -109,6 +111,13 @@ async fn process_review_events( } prev_agent_message = Some(event); } + // Suppress ItemCompleted for assistant messages: forwarding it would + // trigger legacy AgentMessage via as_legacy_events(), which this + // review flow intentionally hides in favor of structured output. + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(_), + .. + }) => {} EventMsg::TaskComplete(task_complete) => { // Parse review output from the last agent message (if present). let out = task_complete diff --git a/codex-rs/core/src/util.rs b/codex-rs/core/src/util.rs index 59e3154b..0bce5b44 100644 --- a/codex-rs/core/src/util.rs +++ b/codex-rs/core/src/util.rs @@ -2,6 +2,7 @@ use std::time::Duration; use rand::Rng; use tracing::debug; +use tracing::error; const INITIAL_DELAY_MS: u64 = 200; const BACKOFF_FACTOR: f64 = 2.0; @@ -13,6 +14,14 @@ pub(crate) fn backoff(attempt: u64) -> Duration { Duration::from_millis((base as f64 * jitter) as u64) } +pub(crate) fn error_or_panic(message: String) { + if cfg!(debug_assertions) || env!("CARGO_PKG_VERSION").contains("alpha") { + panic!("{message}"); + } else { + error!("{message}"); + } +} + pub(crate) fn try_parse_error_message(text: &str) -> String { debug!("Parsing server error response: {}", text); let json = serde_json::from_str::(text).unwrap_or_default(); diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index 095b08c1..46378b08 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -171,19 +171,24 @@ async fn streams_text_without_reasoning() { ); let events = run_stream(sse).await; - assert_eq!(events.len(), 3, "unexpected events: {events:?}"); + assert_eq!(events.len(), 4, "unexpected events: {events:?}"); match &events[0] { + ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }) => {} + other => panic!("expected initial assistant item, got {other:?}"), + } + + match &events[1] { ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "hi"), other => panic!("expected text delta, got {other:?}"), } - match &events[1] { + match &events[2] { ResponseEvent::OutputItemDone(item) => assert_message(item, "hi"), other => panic!("expected terminal message, got {other:?}"), } - assert_matches!(events[2], ResponseEvent::Completed { .. }); + assert_matches!(events[3], ResponseEvent::Completed { .. }); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -202,29 +207,39 @@ async fn streams_reasoning_from_string_delta() { ); let events = run_stream(sse).await; - assert_eq!(events.len(), 5, "unexpected events: {events:?}"); + assert_eq!(events.len(), 7, "unexpected events: {events:?}"); match &events[0] { + ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} + other => panic!("expected initial reasoning item, got {other:?}"), + } + + match &events[1] { ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "think1"), other => panic!("expected reasoning delta, got {other:?}"), } - match &events[1] { + match &events[2] { + ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }) => {} + other => panic!("expected initial message item, got {other:?}"), + } + + match &events[3] { ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "ok"), other => panic!("expected text delta, got {other:?}"), } - match &events[2] { + match &events[4] { ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "think1"), - other => panic!("expected reasoning item, got {other:?}"), + other => panic!("expected terminal reasoning, got {other:?}"), } - match &events[3] { + match &events[5] { ResponseEvent::OutputItemDone(item) => assert_message(item, "ok"), - other => panic!("expected message item, got {other:?}"), + other => panic!("expected terminal message, got {other:?}"), } - assert_matches!(events[4], ResponseEvent::Completed { .. }); + assert_matches!(events[6], ResponseEvent::Completed { .. }); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -244,34 +259,44 @@ async fn streams_reasoning_from_object_delta() { ); let events = run_stream(sse).await; - assert_eq!(events.len(), 6, "unexpected events: {events:?}"); + assert_eq!(events.len(), 8, "unexpected events: {events:?}"); match &events[0] { + ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} + other => panic!("expected initial reasoning item, got {other:?}"), + } + + match &events[1] { ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "partA"), other => panic!("expected reasoning delta, got {other:?}"), } - match &events[1] { + match &events[2] { ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "partB"), other => panic!("expected reasoning delta, got {other:?}"), } - match &events[2] { + match &events[3] { + ResponseEvent::OutputItemAdded(ResponseItem::Message { .. }) => {} + other => panic!("expected initial message item, got {other:?}"), + } + + match &events[4] { ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "answer"), other => panic!("expected text delta, got {other:?}"), } - match &events[3] { + match &events[5] { ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "partApartB"), - other => panic!("expected reasoning item, got {other:?}"), + other => panic!("expected terminal reasoning, got {other:?}"), } - match &events[4] { + match &events[6] { ResponseEvent::OutputItemDone(item) => assert_message(item, "answer"), - other => panic!("expected message item, got {other:?}"), + other => panic!("expected terminal message, got {other:?}"), } - assert_matches!(events[5], ResponseEvent::Completed { .. }); + assert_matches!(events[7], ResponseEvent::Completed { .. }); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -286,19 +311,24 @@ async fn streams_reasoning_from_final_message() { let sse = "data: {\"choices\":[{\"message\":{\"reasoning\":\"final-cot\"},\"finish_reason\":\"stop\"}]}\n\n"; let events = run_stream(sse).await; - assert_eq!(events.len(), 3, "unexpected events: {events:?}"); + assert_eq!(events.len(), 4, "unexpected events: {events:?}"); match &events[0] { + ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} + other => panic!("expected initial reasoning item, got {other:?}"), + } + + match &events[1] { ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "final-cot"), other => panic!("expected reasoning delta, got {other:?}"), } - match &events[1] { + match &events[2] { ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "final-cot"), other => panic!("expected reasoning item, got {other:?}"), } - assert_matches!(events[2], ResponseEvent::Completed { .. }); + assert_matches!(events[3], ResponseEvent::Completed { .. }); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -316,19 +346,24 @@ async fn streams_reasoning_before_tool_call() { ); let events = run_stream(sse).await; - assert_eq!(events.len(), 4, "unexpected events: {events:?}"); + assert_eq!(events.len(), 5, "unexpected events: {events:?}"); match &events[0] { + ResponseEvent::OutputItemAdded(ResponseItem::Reasoning { .. }) => {} + other => panic!("expected initial reasoning item, got {other:?}"), + } + + match &events[1] { ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "pre-tool"), other => panic!("expected reasoning delta, got {other:?}"), } - match &events[1] { + match &events[2] { ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "pre-tool"), other => panic!("expected reasoning item, got {other:?}"), } - match &events[2] { + match &events[3] { ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { name, arguments, @@ -342,7 +377,7 @@ async fn streams_reasoning_before_tool_call() { other => panic!("expected function call, got {other:?}"), } - assert_matches!(events[3], ResponseEvent::Completed { .. }); + assert_matches!(events[4], ResponseEvent::Completed { .. }); } #[tokio::test] diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 7fc6a69c..be8acc5f 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -217,6 +217,25 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value { }) } +pub fn ev_message_item_added(id: &str, text: &str) -> Value { + serde_json::json!({ + "type": "response.output_item.added", + "item": { + "type": "message", + "role": "assistant", + "id": id, + "content": [{"type": "output_text", "text": text}] + } + }) +} + +pub fn ev_output_text_delta(delta: &str) -> Value { + serde_json::json!({ + "type": "response.output_text.delta", + "delta": delta, + }) +} + pub fn ev_reasoning_item(id: &str, summary: &[&str], raw_content: &[&str]) -> Value { let summary_entries: Vec = summary .iter() @@ -243,6 +262,36 @@ pub fn ev_reasoning_item(id: &str, summary: &[&str], raw_content: &[&str]) -> Va event } +pub fn ev_reasoning_item_added(id: &str, summary: &[&str]) -> Value { + let summary_entries: Vec = summary + .iter() + .map(|text| serde_json::json!({"type": "summary_text", "text": text})) + .collect(); + + serde_json::json!({ + "type": "response.output_item.added", + "item": { + "type": "reasoning", + "id": id, + "summary": summary_entries, + } + }) +} + +pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value { + serde_json::json!({ + "type": "response.reasoning_summary_text.delta", + "delta": delta, + }) +} + +pub fn ev_reasoning_text_delta(delta: &str) -> Value { + serde_json::json!({ + "type": "response.reasoning_text.delta", + "delta": delta, + }) +} + pub fn ev_web_search_call_added(id: &str, status: &str, query: &str) -> Value { serde_json::json!({ "type": "response.output_item.added", diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 75b1ca3c..c1354a62 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -1262,6 +1262,10 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { // Build a small SSE stream with deltas and a final assistant message. // We emit the same body for all 3 turns; ids vary but are unused by assertions. let sse_raw = r##"[ + {"type":"response.output_item.added", "item":{ + "type":"message", "role":"assistant", + "content":[{"type":"output_text","text":""}] + }}, {"type":"response.output_text.delta", "delta":"Hey "}, {"type":"response.output_text.delta", "delta":"there"}, {"type":"response.output_text.delta", "delta":"!\n"}, diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 13c95003..7a965220 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -9,7 +9,12 @@ use codex_protocol::items::TurnItem; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_message_item_added; +use core_test_support::responses::ev_output_text_delta; use core_test_support::responses::ev_reasoning_item; +use core_test_support::responses::ev_reasoning_item_added; +use core_test_support::responses::ev_reasoning_summary_text_delta; +use core_test_support::responses::ev_reasoning_text_delta; use core_test_support::responses::ev_response_created; use core_test_support::responses::ev_web_search_call_added; use core_test_support::responses::ev_web_search_call_done; @@ -234,3 +239,181 @@ async fn web_search_item_is_emitted() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn agent_message_content_delta_has_item_metadata() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_message_item_added("msg-1", ""), + ev_output_text_delta("streamed response"), + ev_assistant_message("msg-1", "streamed response"), + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), stream).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "please stream text".into(), + }], + }) + .await?; + + let (started_turn_id, started_item) = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + turn_id, + item: TurnItem::AgentMessage(item), + .. + }) => Some((turn_id.clone(), item.clone())), + _ => None, + }) + .await; + + let delta_event = wait_for_event_match(&codex, |ev| match ev { + EventMsg::AgentMessageContentDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + let legacy_delta = wait_for_event_match(&codex, |ev| match ev { + EventMsg::AgentMessageDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + let completed_item = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + let session_id = session_configured.session_id.to_string(); + assert_eq!(delta_event.thread_id, session_id); + assert_eq!(delta_event.turn_id, started_turn_id); + assert_eq!(delta_event.item_id, started_item.id); + assert_eq!(delta_event.delta, "streamed response"); + assert_eq!(legacy_delta.delta, "streamed response"); + assert_eq!(completed_item.id, started_item.id); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_reasoning_item_added("reasoning-1", &[""]), + ev_reasoning_summary_text_delta("step one"), + ev_reasoning_item("reasoning-1", &["step one"], &[]), + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), stream).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "reason through it".into(), + }], + }) + .await?; + + let reasoning_item = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + let delta_event = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ReasoningContentDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + let legacy_delta = wait_for_event_match(&codex, |ev| match ev { + EventMsg::AgentReasoningDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + + assert_eq!(delta_event.item_id, reasoning_item.id); + assert_eq!(delta_event.delta, "step one"); + assert_eq!(legacy_delta.delta, "step one"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn reasoning_raw_content_delta_respects_flag() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.show_raw_agent_reasoning = true; + }) + .build(&server) + .await?; + + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_reasoning_item_added("reasoning-raw", &[""]), + ev_reasoning_text_delta("raw detail"), + ev_reasoning_item("reasoning-raw", &["complete"], &["raw detail"]), + ev_completed("resp-1"), + ]); + mount_sse_once_match(&server, any(), stream).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "show raw reasoning".into(), + }], + }) + .await?; + + let reasoning_item = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::Reasoning(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + let delta_event = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ReasoningRawContentDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + let legacy_delta = wait_for_event_match(&codex, |ev| match ev { + EventMsg::AgentReasoningRawContentDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + + assert_eq!(delta_event.item_id, reasoning_item.id); + assert_eq!(delta_event.delta, "raw detail"); + assert_eq!(legacy_delta.delta, "raw detail"); + + Ok(()) +} diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 0e3ac5d4..491c2f5e 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -519,6 +519,9 @@ impl EventProcessor for EventProcessorWithHumanOutput { | EventMsg::AgentReasoningRawContentDelta(_) | EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) + | EventMsg::AgentMessageContentDelta(_) + | EventMsg::ReasoningContentDelta(_) + | EventMsg::ReasoningRawContentDelta(_) | EventMsg::UndoCompleted(_) | EventMsg::UndoStarted(_) => {} } diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 48989938..365f7e17 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -288,6 +288,9 @@ async fn run_codex_tool_session_inner( | EventMsg::EnteredReviewMode(_) | EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) + | EventMsg::AgentMessageContentDelta(_) + | EventMsg::ReasoningContentDelta(_) + | EventMsg::ReasoningRawContentDelta(_) | EventMsg::UndoStarted(_) | EventMsg::UndoCompleted(_) | EventMsg::ExitedReviewMode(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 31506061..d09ecde7 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -546,6 +546,10 @@ pub enum EventMsg { ItemStarted(ItemStartedEvent), ItemCompleted(ItemCompletedEvent), + + AgentMessageContentDelta(AgentMessageContentDeltaEvent), + ReasoningContentDelta(ReasoningContentDeltaEvent), + ReasoningRawContentDelta(ReasoningRawContentDeltaEvent), } #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] @@ -560,6 +564,17 @@ pub struct ItemStartedEvent { pub item: TurnItem, } +impl HasLegacyEvent for ItemStartedEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + match &self.item { + TurnItem::WebSearch(item) => vec![EventMsg::WebSearchBegin(WebSearchBeginEvent { + call_id: item.id.clone(), + })], + _ => Vec::new(), + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct ItemCompletedEvent { pub thread_id: ConversationId, @@ -567,6 +582,84 @@ pub struct ItemCompletedEvent { pub item: TurnItem, } +pub trait HasLegacyEvent { + fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec; +} + +impl HasLegacyEvent for ItemCompletedEvent { + fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + self.item.as_legacy_events(show_raw_agent_reasoning) + } +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct AgentMessageContentDeltaEvent { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + pub delta: String, +} + +impl HasLegacyEvent for AgentMessageContentDeltaEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + vec![EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { + delta: self.delta.clone(), + })] + } +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ReasoningContentDeltaEvent { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + pub delta: String, +} + +impl HasLegacyEvent for ReasoningContentDeltaEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + vec![EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { + delta: self.delta.clone(), + })] + } +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ReasoningRawContentDeltaEvent { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + pub delta: String, +} + +impl HasLegacyEvent for ReasoningRawContentDeltaEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + vec![EventMsg::AgentReasoningRawContentDelta( + AgentReasoningRawContentDeltaEvent { + delta: self.delta.clone(), + }, + )] + } +} + +impl HasLegacyEvent for EventMsg { + fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + match self { + EventMsg::ItemCompleted(event) => event.as_legacy_events(show_raw_agent_reasoning), + EventMsg::AgentMessageContentDelta(event) => { + event.as_legacy_events(show_raw_agent_reasoning) + } + EventMsg::ReasoningContentDelta(event) => { + event.as_legacy_events(show_raw_agent_reasoning) + } + EventMsg::ReasoningRawContentDelta(event) => { + event.as_legacy_events(show_raw_agent_reasoning) + } + _ => Vec::new(), + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] #[ts(optional_fields = nullable)] pub struct ExitedReviewModeEvent { @@ -1392,10 +1485,42 @@ pub enum TurnAbortReason { #[cfg(test)] mod tests { use super::*; + use crate::items::UserMessageItem; + use crate::items::WebSearchItem; use anyhow::Result; use serde_json::json; use tempfile::NamedTempFile; + #[test] + fn item_started_event_from_web_search_emits_begin_event() { + let event = ItemStartedEvent { + thread_id: ConversationId::new(), + turn_id: "turn-1".into(), + item: TurnItem::WebSearch(WebSearchItem { + id: "search-1".into(), + query: "find docs".into(), + }), + }; + + let legacy_events = event.as_legacy_events(false); + assert_eq!(legacy_events.len(), 1); + match &legacy_events[0] { + EventMsg::WebSearchBegin(event) => assert_eq!(event.call_id, "search-1"), + _ => panic!("expected WebSearchBegin event"), + } + } + + #[test] + fn item_started_event_from_non_web_search_emits_no_legacy_events() { + let event = ItemStartedEvent { + thread_id: ConversationId::new(), + turn_id: "turn-1".into(), + item: TurnItem::UserMessage(UserMessageItem::new(&[])), + }; + + assert!(event.as_legacy_events(false).is_empty()); + } + /// Serialize Event to verify that its JSON representation has the expected /// amount of nesting. #[test] diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 229b5159..17afb9a1 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1521,7 +1521,10 @@ impl ChatWidget { EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review), EventMsg::RawResponseItem(_) | EventMsg::ItemStarted(_) - | EventMsg::ItemCompleted(_) => {} + | EventMsg::ItemCompleted(_) + | EventMsg::AgentMessageContentDelta(_) + | EventMsg::ReasoningContentDelta(_) + | EventMsg::ReasoningRawContentDelta(_) => {} } }