use std::time::Duration; use crate::ModelProviderInfo; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; use crate::client_common::ResponseStream; use crate::default_client::LlmxHttpClient; use crate::error::ConnectionFailedError; use crate::error::LlmxErr; use crate::error::ResponseStreamFailed; use crate::error::Result; use crate::error::RetryLimitReachedError; use crate::error::UnexpectedResponseError; use crate::model_family::ModelFamily; use crate::tools::spec::create_tools_json_for_chat_completions_api; use crate::util::backoff; use bytes::Bytes; use eventsource_stream::Eventsource; use futures::Stream; use futures::StreamExt; use futures::TryStreamExt; use llmx_otel::otel_event_manager::OtelEventManager; use llmx_protocol::models::ContentItem; use llmx_protocol::models::FunctionCallOutputContentItem; use llmx_protocol::models::ReasoningItemContent; use llmx_protocol::models::ResponseItem; use llmx_protocol::protocol::SessionSource; use llmx_protocol::protocol::SubAgentSource; use llmx_protocol::protocol::TokenUsage; use reqwest::StatusCode; use serde_json::json; use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio::sync::mpsc; use tokio::time::timeout; use tracing::debug; use tracing::trace; /// Implementation for the classic Chat Completions API. pub(crate) async fn stream_chat_completions( prompt: &Prompt, model_family: &ModelFamily, client: &LlmxHttpClient, provider: &ModelProviderInfo, otel_event_manager: &OtelEventManager, session_source: &SessionSource, ) -> Result { if prompt.output_schema.is_some() { return Err(LlmxErr::UnsupportedOperation( "output_schema is not supported for Chat Completions API".to_string(), )); } // Build messages array let mut messages = Vec::::new(); let full_instructions = prompt.get_full_instructions(model_family); messages.push(json!({"role": "system", "content": full_instructions})); let input = prompt.get_formatted_input(); // Pre-scan: map Reasoning blocks to the adjacent assistant anchor after the last user. // - If the last emitted message is a user message, drop all reasoning. // - Otherwise, for each Reasoning item after the last user message, attach it // to the immediate previous assistant message (stop turns) or the immediate // next assistant anchor (tool-call turns: function/local shell call, or assistant message). let mut reasoning_by_anchor_index: std::collections::HashMap = std::collections::HashMap::new(); // Determine the last role that would be emitted to Chat Completions. let mut last_emitted_role: Option<&str> = None; for item in &input { match item { ResponseItem::Message { role, .. } => last_emitted_role = Some(role.as_str()), ResponseItem::FunctionCall { .. } | ResponseItem::LocalShellCall { .. } => { last_emitted_role = Some("assistant") } ResponseItem::FunctionCallOutput { .. } => last_emitted_role = Some("tool"), ResponseItem::Reasoning { .. } | ResponseItem::Other => {} ResponseItem::CustomToolCall { .. } => {} ResponseItem::CustomToolCallOutput { .. } => {} ResponseItem::WebSearchCall { .. } => {} ResponseItem::GhostSnapshot { .. } => {} } } // Find the last user message index in the input. let mut last_user_index: Option = None; for (idx, item) in input.iter().enumerate() { if let ResponseItem::Message { role, .. } = item && role == "user" { last_user_index = Some(idx); } } // Attach reasoning only if the conversation does not end with a user message. if !matches!(last_emitted_role, Some("user")) { for (idx, item) in input.iter().enumerate() { // Only consider reasoning that appears after the last user message. if let Some(u_idx) = last_user_index && idx <= u_idx { continue; } if let ResponseItem::Reasoning { content: Some(items), .. } = item { let mut text = String::new(); for entry in items { match entry { ReasoningItemContent::ReasoningText { text: segment } | ReasoningItemContent::Text { text: segment } => text.push_str(segment), } } if text.trim().is_empty() { continue; } // Prefer immediate previous assistant message (stop turns) let mut attached = false; if idx > 0 && let ResponseItem::Message { role, .. } = &input[idx - 1] && role == "assistant" { reasoning_by_anchor_index .entry(idx - 1) .and_modify(|v| v.push_str(&text)) .or_insert(text.clone()); attached = true; } // Otherwise, attach to immediate next assistant anchor (tool-calls or assistant message) if !attached && idx + 1 < input.len() { match &input[idx + 1] { ResponseItem::FunctionCall { .. } | ResponseItem::LocalShellCall { .. } => { reasoning_by_anchor_index .entry(idx + 1) .and_modify(|v| v.push_str(&text)) .or_insert(text.clone()); } ResponseItem::Message { role, .. } if role == "assistant" => { reasoning_by_anchor_index .entry(idx + 1) .and_modify(|v| v.push_str(&text)) .or_insert(text.clone()); } _ => {} } } } } } // Track last assistant text we emitted to avoid duplicate assistant messages // in the outbound Chat Completions payload (can happen if a final // aggregated assistant message was recorded alongside an earlier partial). let mut last_assistant_text: Option = None; // Build a map of which call_ids have outputs // We'll use this to ensure we never send a FunctionCall without its corresponding output let mut call_ids_with_outputs: std::collections::HashSet = std::collections::HashSet::new(); // First pass: collect all call_ids that have outputs for item in input.iter() { if let ResponseItem::FunctionCallOutput { call_id, .. } = item { call_ids_with_outputs.insert(call_id.clone()); } } debug!("=== Chat Completions Request Debug ==="); debug!("Input items count: {}", input.len()); debug!("Call IDs with outputs: {:?}", call_ids_with_outputs); // Second pass: find the first FunctionCall that doesn't have an output let mut cutoff_at_idx: Option = None; for (idx, item) in input.iter().enumerate() { if let ResponseItem::FunctionCall { call_id, name, .. } = item { if !call_ids_with_outputs.contains(call_id) { debug!("Found unanswered function call '{}' (call_id: {}) at index {}", name, call_id, idx); cutoff_at_idx = Some(idx); break; } } } if let Some(cutoff) = cutoff_at_idx { debug!("Cutting off at index {} to avoid orphaned tool calls", cutoff); } else { debug!("No unanswered function calls found, processing all items"); } // Track whether the MOST RECENT FunctionCall with each call_id was skipped // This allows the same call_id to be retried - we only skip outputs for the specific skipped calls let mut call_id_skip_state: std::collections::HashMap = std::collections::HashMap::new(); for (idx, item) in input.iter().enumerate() { // Stop processing if we've reached an unanswered function call if let Some(cutoff) = cutoff_at_idx { if idx >= cutoff { debug!("Stopping at index {} due to unanswered function call", idx); break; } } debug!("Processing item {} of type: {}", idx, match item { ResponseItem::Message { role, .. } => format!("Message(role={})", role), ResponseItem::FunctionCall { name, call_id, .. } => format!("FunctionCall(name={}, call_id={})", name, call_id), ResponseItem::FunctionCallOutput { call_id, .. } => format!("FunctionCallOutput(call_id={})", call_id), ResponseItem::LocalShellCall { .. } => "LocalShellCall".to_string(), ResponseItem::CustomToolCall { .. } => "CustomToolCall".to_string(), ResponseItem::CustomToolCallOutput { .. } => "CustomToolCallOutput".to_string(), ResponseItem::Reasoning { .. } => "Reasoning".to_string(), ResponseItem::WebSearchCall { .. } => "WebSearchCall".to_string(), ResponseItem::GhostSnapshot { .. } => "GhostSnapshot".to_string(), ResponseItem::Other => "Other".to_string(), }); match item { ResponseItem::Message { role, content, .. } => { // Build content either as a plain string (typical for assistant text) // or as an array of content items when images are present (user/tool multimodal). let mut text = String::new(); let mut items: Vec = Vec::new(); let mut saw_image = false; for c in content { match c { ContentItem::InputText { text: t } | ContentItem::OutputText { text: t } => { text.push_str(t); // Only add text content blocks that are non-empty if !t.trim().is_empty() { items.push(json!({"type":"text","text": t})); } } ContentItem::InputImage { image_url } => { saw_image = true; items.push(json!({"type":"image_url","image_url": {"url": image_url}})); } } } // Skip messages with empty or whitespace-only text content (unless they contain images) if text.trim().is_empty() && !saw_image { continue; } // Skip exact-duplicate assistant messages. if role == "assistant" { if let Some(prev) = &last_assistant_text && prev == &text { continue; } last_assistant_text = Some(text.clone()); } // For assistant messages, always send a plain string for compatibility. // For user messages, if an image is present, send an array of content items. let content_value = if role == "assistant" { json!(text) } else if saw_image { json!(items) } else { json!(text) }; let mut msg = json!({"role": role, "content": content_value}); if role == "assistant" && let Some(reasoning) = reasoning_by_anchor_index.get(&idx) && let Some(obj) = msg.as_object_mut() { obj.insert("reasoning".to_string(), json!(reasoning)); } messages.push(msg); } ResponseItem::FunctionCall { name, arguments, call_id, .. } => { // Validate that arguments is valid JSON before sending to API // If invalid, skip this function call to avoid API errors if serde_json::from_str::(arguments).is_err() { debug!("Skipping malformed function call with invalid JSON arguments: {}", arguments); // Mark this call_id's most recent state as skipped call_id_skip_state.insert(call_id.clone(), true); continue; } // Mark this call_id's most recent state as NOT skipped (valid call) call_id_skip_state.insert(call_id.clone(), false); let mut msg = json!({ "role": "assistant", "content": null, "tool_calls": [{ "id": call_id, "type": "function", "function": { "name": name, "arguments": arguments, } }] }); if let Some(reasoning) = reasoning_by_anchor_index.get(&idx) && let Some(obj) = msg.as_object_mut() { obj.insert("reasoning".to_string(), json!(reasoning)); } messages.push(msg); } ResponseItem::LocalShellCall { id, call_id: _, status, action, } => { // Confirm with API team. let mut msg = json!({ "role": "assistant", "content": null, "tool_calls": [{ "id": id.clone().unwrap_or_else(|| "".to_string()), "type": "local_shell_call", "status": status, "action": action, }] }); if let Some(reasoning) = reasoning_by_anchor_index.get(&idx) && let Some(obj) = msg.as_object_mut() { obj.insert("reasoning".to_string(), json!(reasoning)); } messages.push(msg); } ResponseItem::FunctionCallOutput { call_id, output } => { // Skip outputs only if the MOST RECENT FunctionCall with this call_id was skipped if call_id_skip_state.get(call_id) == Some(&true) { debug!("Skipping function call output for most recent skipped call_id: {}", call_id); continue; } // Prefer structured content items when available (e.g., images) // otherwise fall back to the legacy plain-string content. let content_value = if let Some(items) = &output.content_items { let mapped: Vec = items .iter() .map(|it| match it { FunctionCallOutputContentItem::InputText { text } => { json!({"type":"text","text": text}) } FunctionCallOutputContentItem::InputImage { image_url } => { json!({"type":"image_url","image_url": {"url": image_url}}) } }) .collect(); json!(mapped) } else { json!(output.content) }; messages.push(json!({ "role": "tool", "tool_call_id": call_id, "content": content_value, })); } ResponseItem::CustomToolCall { id, call_id: _, name, input, status: _, } => { messages.push(json!({ "role": "assistant", "content": null, "tool_calls": [{ "id": id, "type": "custom", "custom": { "name": name, "input": input, } }] })); } ResponseItem::CustomToolCallOutput { call_id, output } => { messages.push(json!({ "role": "tool", "tool_call_id": call_id, "content": output, })); } ResponseItem::GhostSnapshot { .. } => { // Ghost snapshots annotate history but are not sent to the model. continue; } ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } | ResponseItem::Other => { // Omit these items from the conversation history. continue; } } } debug!("Built {} messages for API request", messages.len()); debug!("=== End Chat Completions Request Debug ==="); let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?; let mut payload = json!({ "model": model_family.slug, "messages": messages, "stream": true, "tools": tools_json, }); // Add max_tokens - required by Anthropic Messages API // Use a sensible default of 8192 if not configured if let Some(obj) = payload.as_object_mut() { obj.insert("max_tokens".to_string(), json!(8192)); } debug!( "POST to {}: {}", provider.get_full_url(&None), serde_json::to_string_pretty(&payload).unwrap_or_default() ); let mut attempt = 0; let max_retries = provider.request_max_retries(); loop { attempt += 1; let mut req_builder = provider.create_request_builder(client, &None).await?; // Include subagent header only for subagent sessions. if let SessionSource::SubAgent(sub) = session_source.clone() { let subagent = if let SubAgentSource::Other(label) = sub { label } else { serde_json::to_value(&sub) .ok() .and_then(|v| v.as_str().map(std::string::ToString::to_string)) .unwrap_or_else(|| "other".to_string()) }; req_builder = req_builder.header("x-openai-subagent", subagent); } let res = otel_event_manager .log_request(attempt, || { req_builder .header(reqwest::header::ACCEPT, "text/event-stream") .json(&payload) .send() }) .await; match res { Ok(resp) if resp.status().is_success() => { let (tx_event, rx_event) = mpsc::channel::>(1600); let stream = resp.bytes_stream().map_err(|e| { LlmxErr::ResponseStreamFailed(ResponseStreamFailed { source: e, request_id: None, }) }); tokio::spawn(process_chat_sse( stream, tx_event, provider.stream_idle_timeout(), otel_event_manager.clone(), )); return Ok(ResponseStream { rx_event }); } Ok(res) => { let status = res.status(); if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) { let body = (res.text().await).unwrap_or_default(); return Err(LlmxErr::UnexpectedStatus(UnexpectedResponseError { status, body, request_id: None, })); } if attempt > max_retries { return Err(LlmxErr::RetryLimit(RetryLimitReachedError { status, request_id: None, })); } let retry_after_secs = res .headers() .get(reqwest::header::RETRY_AFTER) .and_then(|v| v.to_str().ok()) .and_then(|s| s.parse::().ok()); let delay = retry_after_secs .map(|s| Duration::from_millis(s * 1_000)) .unwrap_or_else(|| backoff(attempt)); tokio::time::sleep(delay).await; } Err(e) => { if attempt > max_retries { return Err(LlmxErr::ConnectionFailed(ConnectionFailedError { source: e, })); } let delay = backoff(attempt); tokio::time::sleep(delay).await; } } } } async fn append_assistant_text( tx_event: &mpsc::Sender>, assistant_item: &mut Option, text: String, ) { if assistant_item.is_none() { let item = ResponseItem::Message { id: None, role: "assistant".to_string(), content: vec![], }; *assistant_item = Some(item.clone()); let _ = tx_event .send(Ok(ResponseEvent::OutputItemAdded(item))) .await; } if let Some(ResponseItem::Message { content, .. }) = assistant_item { content.push(ContentItem::OutputText { text: text.clone() }); let _ = tx_event .send(Ok(ResponseEvent::OutputTextDelta(text.clone()))) .await; } } async fn append_reasoning_text( tx_event: &mpsc::Sender>, reasoning_item: &mut Option, text: String, ) { if reasoning_item.is_none() { let item = ResponseItem::Reasoning { id: String::new(), summary: Vec::new(), content: Some(vec![]), encrypted_content: None, }; *reasoning_item = Some(item.clone()); let _ = tx_event .send(Ok(ResponseEvent::OutputItemAdded(item))) .await; } if let Some(ResponseItem::Reasoning { content: Some(content), .. }) = reasoning_item { content.push(ReasoningItemContent::ReasoningText { text: text.clone() }); let _ = tx_event .send(Ok(ResponseEvent::ReasoningContentDelta(text.clone()))) .await; } } /// Lightweight SSE processor for the Chat Completions streaming format. The /// output is mapped onto Llmx's internal [`ResponseEvent`] so that the rest /// of the pipeline can stay agnostic of the underlying wire format. async fn process_chat_sse( stream: S, tx_event: mpsc::Sender>, idle_timeout: Duration, otel_event_manager: OtelEventManager, ) where S: Stream> + Unpin, { let mut stream = stream.eventsource(); // State to accumulate a function call across streaming chunks. // OpenAI may split the `arguments` string over multiple `delta` events // until the chunk whose `finish_reason` is `tool_calls` is emitted. We // keep collecting the pieces here and forward a single // `ResponseItem::FunctionCall` once the call is complete. #[derive(Default)] struct FunctionCallState { name: Option, arguments: String, call_id: Option, active: bool, } let mut fn_call_state = FunctionCallState::default(); let mut assistant_item: Option = None; let mut reasoning_item: Option = None; let mut token_usage: Option = None; loop { let start = std::time::Instant::now(); let response = timeout(idle_timeout, stream.next()).await; let duration = start.elapsed(); otel_event_manager.log_sse_event(&response, duration); let sse = match response { Ok(Some(Ok(ev))) => ev, Ok(Some(Err(e))) => { let _ = tx_event .send(Err(LlmxErr::Stream(e.to_string(), None))) .await; return; } Ok(None) => { // Stream closed gracefully – emit Completed with dummy id. let _ = tx_event .send(Ok(ResponseEvent::Completed { response_id: String::new(), token_usage: token_usage.clone(), })) .await; return; } Err(_) => { let _ = tx_event .send(Err(LlmxErr::Stream( "idle timeout waiting for SSE".into(), None, ))) .await; return; } }; // OpenAI Chat streaming sends a literal string "[DONE]" when finished. if sse.data.trim() == "[DONE]" { // Emit any finalized items before closing so downstream consumers receive // terminal events for both assistant content and raw reasoning. if let Some(item) = assistant_item { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } if let Some(item) = reasoning_item { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } let _ = tx_event .send(Ok(ResponseEvent::Completed { response_id: String::new(), token_usage: token_usage.clone(), })) .await; return; } // Parse JSON chunk let chunk: serde_json::Value = match serde_json::from_str(&sse.data) { Ok(v) => v, Err(_) => continue, }; trace!("chat_completions received SSE chunk: {chunk:?}"); // Parse usage data if present (typically comes before [DONE]) if let Some(usage_obj) = chunk.get("usage") && let (Some(prompt_tokens), Some(completion_tokens), Some(total_tokens)) = ( usage_obj .get("prompt_tokens") .and_then(serde_json::Value::as_i64), usage_obj .get("completion_tokens") .and_then(serde_json::Value::as_i64), usage_obj .get("total_tokens") .and_then(serde_json::Value::as_i64), ) { // Extract cached_tokens from prompt_tokens_details if present let cached_tokens = usage_obj .get("prompt_tokens_details") .and_then(|d| d.get("cached_tokens")) .and_then(serde_json::Value::as_i64) .unwrap_or(0); // Extract reasoning_tokens from completion_tokens_details if present let reasoning_tokens = usage_obj .get("completion_tokens_details") .and_then(|d| d.get("reasoning_tokens")) .and_then(serde_json::Value::as_i64) .unwrap_or(0); token_usage = Some(TokenUsage { input_tokens: prompt_tokens, cached_input_tokens: cached_tokens, output_tokens: completion_tokens, reasoning_output_tokens: reasoning_tokens, total_tokens, }); } // Check for error chunks (e.g., { "error": { "message": "...", "type": "...", "code": "..." } }) if let Some(error_obj) = chunk.get("error") { let error_message = error_obj .get("message") .and_then(|m| m.as_str()) .unwrap_or("Unknown error"); // Send error through Result and stop processing let _ = tx_event .send(Err(LlmxErr::UnexpectedStatus(UnexpectedResponseError { status: StatusCode::OK, // Stream errors come with 200 status body: error_message.to_string(), request_id: None, }))) .await; break; } let choice_opt = chunk.get("choices").and_then(|c| c.get(0)); if let Some(choice) = choice_opt { // Handle assistant content tokens as streaming deltas. if let Some(content) = choice .get("delta") .and_then(|d| d.get("content")) .and_then(|c| c.as_str()) && !content.is_empty() { append_assistant_text(&tx_event, &mut assistant_item, content.to_string()).await; } // Forward any reasoning/thinking deltas if present. // Some providers stream `reasoning` as a plain string while others // nest the text under an object (e.g. `{ "reasoning": { "text": "…" } }`). if let Some(reasoning_val) = choice.get("delta").and_then(|d| d.get("reasoning")) { let mut maybe_text = reasoning_val .as_str() .map(str::to_string) .filter(|s| !s.is_empty()); if maybe_text.is_none() && reasoning_val.is_object() { if let Some(s) = reasoning_val .get("text") .and_then(|t| t.as_str()) .filter(|s| !s.is_empty()) { maybe_text = Some(s.to_string()); } else if let Some(s) = reasoning_val .get("content") .and_then(|t| t.as_str()) .filter(|s| !s.is_empty()) { maybe_text = Some(s.to_string()); } } if let Some(reasoning) = maybe_text { // Accumulate so we can emit a terminal Reasoning item at the end. append_reasoning_text(&tx_event, &mut reasoning_item, reasoning).await; } } // Some providers only include reasoning on the final message object. if let Some(message_reasoning) = choice.get("message").and_then(|m| m.get("reasoning")) { // Accept either a plain string or an object with { text | content } if let Some(s) = message_reasoning.as_str() { if !s.is_empty() { append_reasoning_text(&tx_event, &mut reasoning_item, s.to_string()).await; } } else if let Some(obj) = message_reasoning.as_object() && let Some(s) = obj .get("text") .and_then(|v| v.as_str()) .or_else(|| obj.get("content").and_then(|v| v.as_str())) && !s.is_empty() { append_reasoning_text(&tx_event, &mut reasoning_item, s.to_string()).await; } } // Handle streaming function / tool calls. if let Some(tool_calls) = choice .get("delta") .and_then(|d| d.get("tool_calls")) .and_then(|tc| tc.as_array()) && let Some(tool_call) = tool_calls.first() { // Mark that we have an active function call in progress. fn_call_state.active = true; // Extract call_id if present. if let Some(id) = tool_call.get("id").and_then(|v| v.as_str()) { fn_call_state.call_id.get_or_insert_with(|| id.to_string()); } // Extract function details if present. if let Some(function) = tool_call.get("function") { if let Some(name) = function.get("name").and_then(|n| n.as_str()) { fn_call_state.name.get_or_insert_with(|| name.to_string()); } if let Some(args_fragment) = function.get("arguments").and_then(|a| a.as_str()) { fn_call_state.arguments.push_str(args_fragment); } } } // Emit end-of-turn when finish_reason signals completion. if let Some(finish_reason) = choice.get("finish_reason").and_then(|v| v.as_str()) { match finish_reason { "tool_calls" if fn_call_state.active => { // First, flush the terminal raw reasoning so UIs can finalize // the reasoning stream before any exec/tool events begin. if let Some(item) = reasoning_item.take() { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } // Then emit the FunctionCall response item. let item = ResponseItem::FunctionCall { id: None, name: fn_call_state.name.clone().unwrap_or_else(|| "".to_string()), arguments: fn_call_state.arguments.clone(), call_id: fn_call_state.call_id.clone().unwrap_or_else(String::new), }; let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } "stop" => { // Regular turn without tool-call. Emit the final assistant message // as a single OutputItemDone so non-delta consumers see the result. if let Some(item) = assistant_item.take() { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } // Also emit a terminal Reasoning item so UIs can finalize raw reasoning. if let Some(item) = reasoning_item.take() { let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } } _ => {} } // Emit Completed regardless of reason so the agent can advance. let _ = tx_event .send(Ok(ResponseEvent::Completed { response_id: String::new(), token_usage: token_usage.clone(), })) .await; // Prepare for potential next turn (should not happen in same stream). // fn_call_state = FunctionCallState::default(); return; // End processing for this SSE stream. } } } } /// Optional client-side aggregation helper /// /// Stream adapter that merges the incremental `OutputItemDone` chunks coming from /// [`process_chat_sse`] into a *running* assistant message, **suppressing the /// per-token deltas**. The stream stays silent while the model is thinking /// and only emits two events per turn: /// /// 1. `ResponseEvent::OutputItemDone` with the *complete* assistant message /// (fully concatenated). /// 2. The original `ResponseEvent::Completed` right after it. /// /// This mirrors the behaviour the TypeScript CLI exposes to its higher layers. /// /// The adapter is intentionally *lossless*: callers who do **not** opt in via /// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified /// events. #[derive(Copy, Clone, Eq, PartialEq)] enum AggregateMode { AggregatedOnly, Streaming, } pub(crate) struct AggregatedChatStream { inner: S, cumulative: String, cumulative_reasoning: String, pending: std::collections::VecDeque, mode: AggregateMode, } impl Stream for AggregatedChatStream where S: Stream> + Unpin, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); // First, flush any buffered events from the previous call. if let Some(ev) = this.pending.pop_front() { return Poll::Ready(Some(Ok(ev))); } loop { match Pin::new(&mut this.inner).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => { // If this is an incremental assistant message chunk, accumulate but // do NOT emit yet. Forward any other item (e.g. FunctionCall) right // away so downstream consumers see it. let is_assistant_message = matches!( &item, llmx_protocol::models::ResponseItem::Message { role, .. } if role == "assistant" ); if is_assistant_message { match this.mode { AggregateMode::AggregatedOnly => { // Only use the final assistant message if we have not // seen any deltas; otherwise, deltas already built the // cumulative text and this would duplicate it. if this.cumulative.is_empty() && let llmx_protocol::models::ResponseItem::Message { content, .. } = &item && let Some(text) = content.iter().find_map(|c| match c { llmx_protocol::models::ContentItem::OutputText { text } => { Some(text) } _ => None, }) { this.cumulative.push_str(text); } // Swallow assistant message here; emit on Completed. continue; } AggregateMode::Streaming => { // In streaming mode, if we have not seen any deltas, forward // the final assistant message directly. If deltas were seen, // suppress the final message to avoid duplication. if this.cumulative.is_empty() { return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone( item, )))); } else { continue; } } } } // Not an assistant message – forward immediately. return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))); } Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => { return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))); } Poll::Ready(Some(Ok(ResponseEvent::Completed { response_id, token_usage, }))) => { // Build any aggregated items in the correct order: Reasoning first, then Message. let mut emitted_any = false; if !this.cumulative_reasoning.is_empty() && matches!(this.mode, AggregateMode::AggregatedOnly) { let aggregated_reasoning = llmx_protocol::models::ResponseItem::Reasoning { id: String::new(), summary: Vec::new(), content: Some(vec![ llmx_protocol::models::ReasoningItemContent::ReasoningText { text: std::mem::take(&mut this.cumulative_reasoning), }, ]), encrypted_content: None, }; this.pending .push_back(ResponseEvent::OutputItemDone(aggregated_reasoning)); emitted_any = true; } // Always emit the final aggregated assistant message when any // content deltas have been observed. In AggregatedOnly mode this // is the sole assistant output; in Streaming mode this finalizes // the streamed deltas into a terminal OutputItemDone so callers // can persist/render the message once per turn. if !this.cumulative.is_empty() { let aggregated_message = llmx_protocol::models::ResponseItem::Message { id: None, role: "assistant".to_string(), content: vec![llmx_protocol::models::ContentItem::OutputText { text: std::mem::take(&mut this.cumulative), }], }; this.pending .push_back(ResponseEvent::OutputItemDone(aggregated_message)); emitted_any = true; } // Always emit Completed last when anything was aggregated. if emitted_any { this.pending.push_back(ResponseEvent::Completed { response_id: response_id.clone(), token_usage: token_usage.clone(), }); // Return the first pending event now. if let Some(ev) = this.pending.pop_front() { return Poll::Ready(Some(Ok(ev))); } } // Nothing aggregated – forward Completed directly. return Poll::Ready(Some(Ok(ResponseEvent::Completed { response_id, token_usage, }))); } Poll::Ready(Some(Ok(ResponseEvent::Created))) => { // These events are exclusive to the Responses API and // will never appear in a Chat Completions stream. continue; } Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => { // Always accumulate deltas so we can emit a final OutputItemDone at Completed. this.cumulative.push_str(&delta); if matches!(this.mode, AggregateMode::Streaming) { // In streaming mode, also forward the delta immediately. return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))); } else { continue; } } Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))) => { // Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed. this.cumulative_reasoning.push_str(&delta); if matches!(this.mode, AggregateMode::Streaming) { // In streaming mode, also forward the delta immediately. return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))); } else { continue; } } Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => { continue; } Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded))) => { continue; } Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => { return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))); } } } } } /// Extension trait that activates aggregation on any stream of [`ResponseEvent`]. pub(crate) trait AggregateStreamExt: Stream> + Sized { /// Returns a new stream that emits **only** the final assistant message /// per turn instead of every incremental delta. The produced /// `ResponseEvent` sequence for a typical text turn looks like: /// /// ```ignore /// OutputItemDone() /// Completed /// ``` /// /// No other `OutputItemDone` events will be seen by the caller. /// /// Usage: /// /// ```ignore /// let agg_stream = client.stream(&prompt).await?.aggregate(); /// while let Some(event) = agg_stream.next().await { /// // event now contains cumulative text /// } /// ``` fn aggregate(self) -> AggregatedChatStream { AggregatedChatStream::new(self, AggregateMode::AggregatedOnly) } } impl AggregateStreamExt for T where T: Stream> + Sized {} impl AggregatedChatStream { fn new(inner: S, mode: AggregateMode) -> Self { AggregatedChatStream { inner, cumulative: String::new(), cumulative_reasoning: String::new(), pending: std::collections::VecDeque::new(), mode, } } pub(crate) fn streaming_mode(inner: S) -> Self { Self::new(inner, AggregateMode::Streaming) } }