This release represents a comprehensive transformation of the codebase from Codex to LLMX, enhanced with LiteLLM integration to support 100+ LLM providers through a unified API. ## Major Changes ### Phase 1: Repository & Infrastructure Setup - Established new repository structure and branching strategy - Created comprehensive project documentation (CLAUDE.md, LITELLM-SETUP.md) - Set up development environment and tooling configuration ### Phase 2: Rust Workspace Transformation - Renamed all Rust crates from `codex-*` to `llmx-*` (30+ crates) - Updated package names, binary names, and workspace members - Renamed core modules: codex.rs → llmx.rs, codex_delegate.rs → llmx_delegate.rs - Updated all internal references, imports, and type names - Renamed directories: codex-rs/ → llmx-rs/, codex-backend-openapi-models/ → llmx-backend-openapi-models/ - Fixed all Rust compilation errors after mass rename ### Phase 3: LiteLLM Integration - Integrated LiteLLM for multi-provider LLM support (Anthropic, OpenAI, Azure, Google AI, AWS Bedrock, etc.) - Implemented OpenAI-compatible Chat Completions API support - Added model family detection and provider-specific handling - Updated authentication to support LiteLLM API keys - Renamed environment variables: OPENAI_BASE_URL → LLMX_BASE_URL - Added LLMX_API_KEY for unified authentication - Enhanced error handling for Chat Completions API responses - Implemented fallback mechanisms between Responses API and Chat Completions API ### Phase 4: TypeScript/Node.js Components - Renamed npm package: @codex/codex-cli → @valknar/llmx - Updated TypeScript SDK to use new LLMX APIs and endpoints - Fixed all TypeScript compilation and linting errors - Updated SDK tests to support both API backends - Enhanced mock server to handle multiple API formats - Updated build scripts for cross-platform packaging ### Phase 5: Configuration & Documentation - Updated all configuration files to use LLMX naming - Rewrote README and documentation for LLMX branding - Updated config paths: ~/.codex/ → ~/.llmx/ - Added comprehensive LiteLLM setup guide - Updated all user-facing strings and help text - Created release plan and migration documentation ### Phase 6: Testing & Validation - Fixed all Rust tests for new naming scheme - Updated snapshot tests in TUI (36 frame files) - Fixed authentication storage tests - Updated Chat Completions payload and SSE tests - Fixed SDK tests for new API endpoints - Ensured compatibility with Claude Sonnet 4.5 model - Fixed test environment variables (LLMX_API_KEY, LLMX_BASE_URL) ### Phase 7: Build & Release Pipeline - Updated GitHub Actions workflows for LLMX binary names - Fixed rust-release.yml to reference llmx-rs/ instead of codex-rs/ - Updated CI/CD pipelines for new package names - Made Apple code signing optional in release workflow - Enhanced npm packaging resilience for partial platform builds - Added Windows sandbox support to workspace - Updated dotslash configuration for new binary names ### Phase 8: Final Polish - Renamed all assets (.github images, labels, templates) - Updated VSCode and DevContainer configurations - Fixed all clippy warnings and formatting issues - Applied cargo fmt and prettier formatting across codebase - Updated issue templates and pull request templates - Fixed all remaining UI text references ## Technical Details **Breaking Changes:** - Binary name changed from `codex` to `llmx` - Config directory changed from `~/.codex/` to `~/.llmx/` - Environment variables renamed (CODEX_* → LLMX_*) - npm package renamed to `@valknar/llmx` **New Features:** - Support for 100+ LLM providers via LiteLLM - Unified authentication with LLMX_API_KEY - Enhanced model provider detection and handling - Improved error handling and fallback mechanisms **Files Changed:** - 578 files modified across Rust, TypeScript, and documentation - 30+ Rust crates renamed and updated - Complete rebrand of UI, CLI, and documentation - All tests updated and passing **Dependencies:** - Updated Cargo.lock with new package names - Updated npm dependencies in llmx-cli - Enhanced OpenAPI models for LLMX backend This release establishes LLMX as a standalone project with comprehensive LiteLLM integration, maintaining full backward compatibility with existing functionality while opening support for a wide ecosystem of LLM providers. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Sebastian Krüger <support@pivoine.art>
1524 lines
56 KiB
Rust
1524 lines
56 KiB
Rust
use std::io::BufRead;
|
||
use std::path::Path;
|
||
use std::sync::Arc;
|
||
use std::sync::OnceLock;
|
||
use std::time::Duration;
|
||
|
||
use bytes::Bytes;
|
||
use chrono::DateTime;
|
||
use chrono::Utc;
|
||
use eventsource_stream::Eventsource;
|
||
use futures::prelude::*;
|
||
use llmx_app_server_protocol::AuthMode;
|
||
use llmx_otel::otel_event_manager::OtelEventManager;
|
||
use llmx_protocol::ConversationId;
|
||
use llmx_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||
use llmx_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||
use llmx_protocol::models::ResponseItem;
|
||
use llmx_protocol::protocol::SessionSource;
|
||
use regex_lite::Regex;
|
||
use reqwest::StatusCode;
|
||
use reqwest::header::HeaderMap;
|
||
use serde::Deserialize;
|
||
use serde::Serialize;
|
||
use serde_json::Value;
|
||
use tokio::sync::mpsc;
|
||
use tokio::time::timeout;
|
||
use tokio_util::io::ReaderStream;
|
||
use tracing::debug;
|
||
use tracing::trace;
|
||
use tracing::warn;
|
||
|
||
use crate::AuthManager;
|
||
use crate::auth::LlmxAuth;
|
||
use crate::auth::RefreshTokenError;
|
||
use crate::chat_completions::AggregateStreamExt;
|
||
use crate::chat_completions::stream_chat_completions;
|
||
use crate::client_common::Prompt;
|
||
use crate::client_common::ResponseEvent;
|
||
use crate::client_common::ResponseStream;
|
||
use crate::client_common::ResponsesApiRequest;
|
||
use crate::client_common::create_reasoning_param_for_request;
|
||
use crate::client_common::create_text_param_for_request;
|
||
use crate::config::Config;
|
||
use crate::default_client::LlmxHttpClient;
|
||
use crate::default_client::create_client;
|
||
use crate::error::ConnectionFailedError;
|
||
use crate::error::LlmxErr;
|
||
use crate::error::ResponseStreamFailed;
|
||
use crate::error::Result;
|
||
use crate::error::RetryLimitReachedError;
|
||
use crate::error::UnexpectedResponseError;
|
||
use crate::error::UsageLimitReachedError;
|
||
use crate::flags::LLMX_RS_SSE_FIXTURE;
|
||
use crate::model_family::ModelFamily;
|
||
use crate::model_provider_info::ModelProviderInfo;
|
||
use crate::model_provider_info::WireApi;
|
||
use crate::openai_model_info::get_model_info;
|
||
use crate::protocol::RateLimitSnapshot;
|
||
use crate::protocol::RateLimitWindow;
|
||
use crate::protocol::TokenUsage;
|
||
use crate::token_data::PlanType;
|
||
use crate::tools::spec::create_tools_json_for_responses_api;
|
||
use crate::util::backoff;
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct ErrorResponse {
|
||
error: Error,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct Error {
|
||
r#type: Option<String>,
|
||
code: Option<String>,
|
||
message: Option<String>,
|
||
|
||
// Optional fields available on "usage_limit_reached" and "usage_not_included" errors
|
||
plan_type: Option<PlanType>,
|
||
resets_at: Option<i64>,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
pub struct ModelClient {
|
||
config: Arc<Config>,
|
||
auth_manager: Option<Arc<AuthManager>>,
|
||
otel_event_manager: OtelEventManager,
|
||
client: LlmxHttpClient,
|
||
provider: ModelProviderInfo,
|
||
conversation_id: ConversationId,
|
||
effort: Option<ReasoningEffortConfig>,
|
||
summary: ReasoningSummaryConfig,
|
||
session_source: SessionSource,
|
||
}
|
||
|
||
#[allow(clippy::too_many_arguments)]
|
||
impl ModelClient {
|
||
pub fn new(
|
||
config: Arc<Config>,
|
||
auth_manager: Option<Arc<AuthManager>>,
|
||
otel_event_manager: OtelEventManager,
|
||
provider: ModelProviderInfo,
|
||
effort: Option<ReasoningEffortConfig>,
|
||
summary: ReasoningSummaryConfig,
|
||
conversation_id: ConversationId,
|
||
session_source: SessionSource,
|
||
) -> Self {
|
||
let client = create_client();
|
||
|
||
Self {
|
||
config,
|
||
auth_manager,
|
||
otel_event_manager,
|
||
client,
|
||
provider,
|
||
conversation_id,
|
||
effort,
|
||
summary,
|
||
session_source,
|
||
}
|
||
}
|
||
|
||
pub fn get_model_context_window(&self) -> Option<i64> {
|
||
let pct = self.config.model_family.effective_context_window_percent;
|
||
self.config
|
||
.model_context_window
|
||
.or_else(|| get_model_info(&self.config.model_family).map(|info| info.context_window))
|
||
.map(|w| w.saturating_mul(pct) / 100)
|
||
}
|
||
|
||
pub fn get_auto_compact_token_limit(&self) -> Option<i64> {
|
||
self.config.model_auto_compact_token_limit.or_else(|| {
|
||
get_model_info(&self.config.model_family).and_then(|info| info.auto_compact_token_limit)
|
||
})
|
||
}
|
||
|
||
pub fn config(&self) -> Arc<Config> {
|
||
Arc::clone(&self.config)
|
||
}
|
||
|
||
pub fn provider(&self) -> &ModelProviderInfo {
|
||
&self.provider
|
||
}
|
||
|
||
pub async fn stream(&self, prompt: &Prompt) -> Result<ResponseStream> {
|
||
match self.provider.wire_api {
|
||
WireApi::Responses => self.stream_responses(prompt).await,
|
||
WireApi::Chat => {
|
||
// Create the raw streaming connection first.
|
||
let response_stream = stream_chat_completions(
|
||
prompt,
|
||
&self.config.model_family,
|
||
&self.client,
|
||
&self.provider,
|
||
&self.otel_event_manager,
|
||
&self.session_source,
|
||
)
|
||
.await?;
|
||
|
||
// Wrap it with the aggregation adapter so callers see *only*
|
||
// the final assistant message per turn (matching the
|
||
// behaviour of the Responses API).
|
||
let mut aggregated = if self.config.show_raw_agent_reasoning {
|
||
crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream)
|
||
} else {
|
||
response_stream.aggregate()
|
||
};
|
||
|
||
// Bridge the aggregated stream back into a standard
|
||
// `ResponseStream` by forwarding events through a channel.
|
||
let (tx, rx) = mpsc::channel::<Result<ResponseEvent>>(16);
|
||
|
||
tokio::spawn(async move {
|
||
use futures::StreamExt;
|
||
while let Some(ev) = aggregated.next().await {
|
||
// Exit early if receiver hung up.
|
||
if tx.send(ev).await.is_err() {
|
||
break;
|
||
}
|
||
}
|
||
});
|
||
|
||
Ok(ResponseStream { rx_event: rx })
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Implementation for the OpenAI *Responses* experimental API.
|
||
async fn stream_responses(&self, prompt: &Prompt) -> Result<ResponseStream> {
|
||
if let Some(path) = &*LLMX_RS_SSE_FIXTURE {
|
||
// short circuit for tests
|
||
warn!(path, "Streaming from fixture");
|
||
return stream_from_fixture(
|
||
path,
|
||
self.provider.clone(),
|
||
self.otel_event_manager.clone(),
|
||
)
|
||
.await;
|
||
}
|
||
|
||
let auth_manager = self.auth_manager.clone();
|
||
|
||
let full_instructions = prompt.get_full_instructions(&self.config.model_family);
|
||
let tools_json = create_tools_json_for_responses_api(&prompt.tools)?;
|
||
let reasoning = create_reasoning_param_for_request(
|
||
&self.config.model_family,
|
||
self.effort,
|
||
self.summary,
|
||
);
|
||
|
||
let include: Vec<String> = if reasoning.is_some() {
|
||
vec!["reasoning.encrypted_content".to_string()]
|
||
} else {
|
||
vec![]
|
||
};
|
||
|
||
let input_with_instructions = prompt.get_formatted_input();
|
||
|
||
let verbosity = if self.config.model_family.support_verbosity {
|
||
self.config.model_verbosity
|
||
} else {
|
||
if self.config.model_verbosity.is_some() {
|
||
warn!(
|
||
"model_verbosity is set but ignored as the model does not support verbosity: {}",
|
||
self.config.model_family.family
|
||
);
|
||
}
|
||
None
|
||
};
|
||
|
||
// Only include `text.verbosity` for GPT-5 family models
|
||
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
|
||
|
||
// In general, we want to explicitly send `store: false` when using the Responses API,
|
||
// but in practice, the Azure Responses API rejects `store: false`:
|
||
//
|
||
// - If store = false and id is sent an error is thrown that ID is not found
|
||
// - If store = false and id is not sent an error is thrown that ID is required
|
||
//
|
||
// For Azure, we send `store: true` and preserve reasoning item IDs.
|
||
let azure_workaround = self.provider.is_azure_responses_endpoint();
|
||
|
||
let payload = ResponsesApiRequest {
|
||
model: &self.config.model,
|
||
instructions: &full_instructions,
|
||
input: &input_with_instructions,
|
||
tools: &tools_json,
|
||
tool_choice: "auto",
|
||
parallel_tool_calls: prompt.parallel_tool_calls,
|
||
reasoning,
|
||
store: azure_workaround,
|
||
stream: true,
|
||
include,
|
||
prompt_cache_key: Some(self.conversation_id.to_string()),
|
||
text,
|
||
};
|
||
|
||
let mut payload_json = serde_json::to_value(&payload)?;
|
||
if azure_workaround {
|
||
attach_item_ids(&mut payload_json, &input_with_instructions);
|
||
}
|
||
|
||
let max_attempts = self.provider.request_max_retries();
|
||
for attempt in 0..=max_attempts {
|
||
match self
|
||
.attempt_stream_responses(attempt, &payload_json, &auth_manager)
|
||
.await
|
||
{
|
||
Ok(stream) => {
|
||
return Ok(stream);
|
||
}
|
||
Err(StreamAttemptError::Fatal(e)) => {
|
||
return Err(e);
|
||
}
|
||
Err(retryable_attempt_error) => {
|
||
if attempt == max_attempts {
|
||
return Err(retryable_attempt_error.into_error());
|
||
}
|
||
|
||
tokio::time::sleep(retryable_attempt_error.delay(attempt)).await;
|
||
}
|
||
}
|
||
}
|
||
|
||
unreachable!("stream_responses_attempt should always return");
|
||
}
|
||
|
||
/// Single attempt to start a streaming Responses API call.
|
||
async fn attempt_stream_responses(
|
||
&self,
|
||
attempt: u64,
|
||
payload_json: &Value,
|
||
auth_manager: &Option<Arc<AuthManager>>,
|
||
) -> std::result::Result<ResponseStream, StreamAttemptError> {
|
||
// Always fetch the latest auth in case a prior attempt refreshed the token.
|
||
let auth = auth_manager.as_ref().and_then(|m| m.auth());
|
||
|
||
trace!(
|
||
"POST to {}: {:?}",
|
||
self.provider.get_full_url(&auth),
|
||
serde_json::to_string(payload_json)
|
||
.unwrap_or("<unable to serialize payload>".to_string())
|
||
);
|
||
|
||
let mut req_builder = self
|
||
.provider
|
||
.create_request_builder(&self.client, &auth)
|
||
.await
|
||
.map_err(StreamAttemptError::Fatal)?;
|
||
|
||
// Include subagent header only for subagent sessions.
|
||
if let SessionSource::SubAgent(sub) = &self.session_source {
|
||
let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub {
|
||
label.clone()
|
||
} else {
|
||
serde_json::to_value(sub)
|
||
.ok()
|
||
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
|
||
.unwrap_or_else(|| "other".to_string())
|
||
};
|
||
req_builder = req_builder.header("x-openai-subagent", subagent);
|
||
}
|
||
|
||
req_builder = req_builder
|
||
// Send session_id for compatibility.
|
||
.header("conversation_id", self.conversation_id.to_string())
|
||
.header("session_id", self.conversation_id.to_string())
|
||
.header(reqwest::header::ACCEPT, "text/event-stream")
|
||
.json(payload_json);
|
||
|
||
if let Some(auth) = auth.as_ref()
|
||
&& auth.mode == AuthMode::ChatGPT
|
||
&& let Some(account_id) = auth.get_account_id()
|
||
{
|
||
req_builder = req_builder.header("chatgpt-account-id", account_id);
|
||
}
|
||
|
||
let res = self
|
||
.otel_event_manager
|
||
.log_request(attempt, || req_builder.send())
|
||
.await;
|
||
|
||
let mut request_id = None;
|
||
if let Ok(resp) = &res {
|
||
request_id = resp
|
||
.headers()
|
||
.get("cf-ray")
|
||
.map(|v| v.to_str().unwrap_or_default().to_string());
|
||
}
|
||
|
||
match res {
|
||
Ok(resp) if resp.status().is_success() => {
|
||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||
|
||
if let Some(snapshot) = parse_rate_limit_snapshot(resp.headers())
|
||
&& tx_event
|
||
.send(Ok(ResponseEvent::RateLimits(snapshot)))
|
||
.await
|
||
.is_err()
|
||
{
|
||
debug!("receiver dropped rate limit snapshot event");
|
||
}
|
||
|
||
// spawn task to process SSE
|
||
let stream = resp.bytes_stream().map_err(move |e| {
|
||
LlmxErr::ResponseStreamFailed(ResponseStreamFailed {
|
||
source: e,
|
||
request_id: request_id.clone(),
|
||
})
|
||
});
|
||
tokio::spawn(process_sse(
|
||
stream,
|
||
tx_event,
|
||
self.provider.stream_idle_timeout(),
|
||
self.otel_event_manager.clone(),
|
||
));
|
||
|
||
Ok(ResponseStream { rx_event })
|
||
}
|
||
Ok(res) => {
|
||
let status = res.status();
|
||
|
||
// Pull out Retry‑After header if present.
|
||
let retry_after_secs = res
|
||
.headers()
|
||
.get(reqwest::header::RETRY_AFTER)
|
||
.and_then(|v| v.to_str().ok())
|
||
.and_then(|s| s.parse::<u64>().ok());
|
||
let retry_after = retry_after_secs.map(|s| Duration::from_millis(s * 1_000));
|
||
|
||
if status == StatusCode::UNAUTHORIZED
|
||
&& let Some(manager) = auth_manager.as_ref()
|
||
&& let Some(auth) = auth.as_ref()
|
||
&& auth.mode == AuthMode::ChatGPT
|
||
&& let Err(err) = manager.refresh_token().await
|
||
{
|
||
let stream_error = match err {
|
||
RefreshTokenError::Permanent(failed) => {
|
||
StreamAttemptError::Fatal(LlmxErr::RefreshTokenFailed(failed))
|
||
}
|
||
RefreshTokenError::Transient(other) => {
|
||
StreamAttemptError::RetryableTransportError(LlmxErr::Io(other))
|
||
}
|
||
};
|
||
return Err(stream_error);
|
||
}
|
||
|
||
// The OpenAI Responses endpoint returns structured JSON bodies even for 4xx/5xx
|
||
// errors. When we bubble early with only the HTTP status the caller sees an opaque
|
||
// "unexpected status 400 Bad Request" which makes debugging nearly impossible.
|
||
// Instead, read (and include) the response text so higher layers and users see the
|
||
// exact error message (e.g. "Unknown parameter: 'input[0].metadata'"). The body is
|
||
// small and this branch only runs on error paths so the extra allocation is
|
||
// negligible.
|
||
if !(status == StatusCode::TOO_MANY_REQUESTS
|
||
|| status == StatusCode::UNAUTHORIZED
|
||
|| status.is_server_error())
|
||
{
|
||
// Surface the error body to callers. Use `unwrap_or_default` per Clippy.
|
||
let body = res.text().await.unwrap_or_default();
|
||
return Err(StreamAttemptError::Fatal(LlmxErr::UnexpectedStatus(
|
||
UnexpectedResponseError {
|
||
status,
|
||
body,
|
||
request_id: None,
|
||
},
|
||
)));
|
||
}
|
||
|
||
if status == StatusCode::TOO_MANY_REQUESTS {
|
||
let rate_limit_snapshot = parse_rate_limit_snapshot(res.headers());
|
||
let body = res.json::<ErrorResponse>().await.ok();
|
||
if let Some(ErrorResponse { error }) = body {
|
||
if error.r#type.as_deref() == Some("usage_limit_reached") {
|
||
// Prefer the plan_type provided in the error message if present
|
||
// because it's more up to date than the one encoded in the auth
|
||
// token.
|
||
let plan_type = error
|
||
.plan_type
|
||
.or_else(|| auth.as_ref().and_then(LlmxAuth::get_plan_type));
|
||
let resets_at = error
|
||
.resets_at
|
||
.and_then(|seconds| DateTime::<Utc>::from_timestamp(seconds, 0));
|
||
let llmx_err = LlmxErr::UsageLimitReached(UsageLimitReachedError {
|
||
plan_type,
|
||
resets_at,
|
||
rate_limits: rate_limit_snapshot,
|
||
});
|
||
return Err(StreamAttemptError::Fatal(llmx_err));
|
||
} else if error.r#type.as_deref() == Some("usage_not_included") {
|
||
return Err(StreamAttemptError::Fatal(LlmxErr::UsageNotIncluded));
|
||
} else if is_quota_exceeded_error(&error) {
|
||
return Err(StreamAttemptError::Fatal(LlmxErr::QuotaExceeded));
|
||
}
|
||
}
|
||
}
|
||
|
||
Err(StreamAttemptError::RetryableHttpError {
|
||
status,
|
||
retry_after,
|
||
request_id,
|
||
})
|
||
}
|
||
Err(e) => Err(StreamAttemptError::RetryableTransportError(
|
||
LlmxErr::ConnectionFailed(ConnectionFailedError { source: e }),
|
||
)),
|
||
}
|
||
}
|
||
|
||
pub fn get_provider(&self) -> ModelProviderInfo {
|
||
self.provider.clone()
|
||
}
|
||
|
||
pub fn get_otel_event_manager(&self) -> OtelEventManager {
|
||
self.otel_event_manager.clone()
|
||
}
|
||
|
||
pub fn get_session_source(&self) -> SessionSource {
|
||
self.session_source.clone()
|
||
}
|
||
|
||
/// Returns the currently configured model slug.
|
||
pub fn get_model(&self) -> String {
|
||
self.config.model.clone()
|
||
}
|
||
|
||
/// Returns the currently configured model family.
|
||
pub fn get_model_family(&self) -> ModelFamily {
|
||
self.config.model_family.clone()
|
||
}
|
||
|
||
/// Returns the current reasoning effort setting.
|
||
pub fn get_reasoning_effort(&self) -> Option<ReasoningEffortConfig> {
|
||
self.effort
|
||
}
|
||
|
||
/// Returns the current reasoning summary setting.
|
||
pub fn get_reasoning_summary(&self) -> ReasoningSummaryConfig {
|
||
self.summary
|
||
}
|
||
|
||
pub fn get_auth_manager(&self) -> Option<Arc<AuthManager>> {
|
||
self.auth_manager.clone()
|
||
}
|
||
}
|
||
|
||
enum StreamAttemptError {
|
||
RetryableHttpError {
|
||
status: StatusCode,
|
||
retry_after: Option<Duration>,
|
||
request_id: Option<String>,
|
||
},
|
||
RetryableTransportError(LlmxErr),
|
||
Fatal(LlmxErr),
|
||
}
|
||
|
||
impl StreamAttemptError {
|
||
/// attempt is 0-based.
|
||
fn delay(&self, attempt: u64) -> Duration {
|
||
// backoff() uses 1-based attempts.
|
||
let backoff_attempt = attempt + 1;
|
||
match self {
|
||
Self::RetryableHttpError { retry_after, .. } => {
|
||
retry_after.unwrap_or_else(|| backoff(backoff_attempt))
|
||
}
|
||
Self::RetryableTransportError { .. } => backoff(backoff_attempt),
|
||
Self::Fatal(_) => {
|
||
// Should not be called on Fatal errors.
|
||
Duration::from_secs(0)
|
||
}
|
||
}
|
||
}
|
||
|
||
fn into_error(self) -> LlmxErr {
|
||
match self {
|
||
Self::RetryableHttpError {
|
||
status, request_id, ..
|
||
} => {
|
||
if status == StatusCode::INTERNAL_SERVER_ERROR {
|
||
LlmxErr::InternalServerError
|
||
} else {
|
||
LlmxErr::RetryLimit(RetryLimitReachedError { status, request_id })
|
||
}
|
||
}
|
||
Self::RetryableTransportError(error) => error,
|
||
Self::Fatal(error) => error,
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, Deserialize, Serialize)]
|
||
struct SseEvent {
|
||
#[serde(rename = "type")]
|
||
kind: String,
|
||
response: Option<Value>,
|
||
item: Option<Value>,
|
||
delta: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct ResponseCompleted {
|
||
id: String,
|
||
usage: Option<ResponseCompletedUsage>,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct ResponseCompletedUsage {
|
||
input_tokens: i64,
|
||
input_tokens_details: Option<ResponseCompletedInputTokensDetails>,
|
||
output_tokens: i64,
|
||
output_tokens_details: Option<ResponseCompletedOutputTokensDetails>,
|
||
total_tokens: i64,
|
||
}
|
||
|
||
impl From<ResponseCompletedUsage> for TokenUsage {
|
||
fn from(val: ResponseCompletedUsage) -> Self {
|
||
TokenUsage {
|
||
input_tokens: val.input_tokens,
|
||
cached_input_tokens: val
|
||
.input_tokens_details
|
||
.map(|d| d.cached_tokens)
|
||
.unwrap_or(0),
|
||
output_tokens: val.output_tokens,
|
||
reasoning_output_tokens: val
|
||
.output_tokens_details
|
||
.map(|d| d.reasoning_tokens)
|
||
.unwrap_or(0),
|
||
total_tokens: val.total_tokens,
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct ResponseCompletedInputTokensDetails {
|
||
cached_tokens: i64,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct ResponseCompletedOutputTokensDetails {
|
||
reasoning_tokens: i64,
|
||
}
|
||
|
||
fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
|
||
let Some(input_value) = payload_json.get_mut("input") else {
|
||
return;
|
||
};
|
||
let serde_json::Value::Array(items) = input_value else {
|
||
return;
|
||
};
|
||
|
||
for (value, item) in items.iter_mut().zip(original_items.iter()) {
|
||
if let ResponseItem::Reasoning { id, .. }
|
||
| ResponseItem::Message { id: Some(id), .. }
|
||
| ResponseItem::WebSearchCall { id: Some(id), .. }
|
||
| ResponseItem::FunctionCall { id: Some(id), .. }
|
||
| ResponseItem::LocalShellCall { id: Some(id), .. }
|
||
| ResponseItem::CustomToolCall { id: Some(id), .. } = item
|
||
{
|
||
if id.is_empty() {
|
||
continue;
|
||
}
|
||
|
||
if let Some(obj) = value.as_object_mut() {
|
||
obj.insert("id".to_string(), Value::String(id.clone()));
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
|
||
let primary = parse_rate_limit_window(
|
||
headers,
|
||
"x-llmx-primary-used-percent",
|
||
"x-llmx-primary-window-minutes",
|
||
"x-llmx-primary-reset-at",
|
||
);
|
||
|
||
let secondary = parse_rate_limit_window(
|
||
headers,
|
||
"x-llmx-secondary-used-percent",
|
||
"x-llmx-secondary-window-minutes",
|
||
"x-llmx-secondary-reset-at",
|
||
);
|
||
|
||
Some(RateLimitSnapshot { primary, secondary })
|
||
}
|
||
|
||
fn parse_rate_limit_window(
|
||
headers: &HeaderMap,
|
||
used_percent_header: &str,
|
||
window_minutes_header: &str,
|
||
resets_at_header: &str,
|
||
) -> Option<RateLimitWindow> {
|
||
let used_percent: Option<f64> = parse_header_f64(headers, used_percent_header);
|
||
|
||
used_percent.and_then(|used_percent| {
|
||
let window_minutes = parse_header_i64(headers, window_minutes_header);
|
||
let resets_at = parse_header_i64(headers, resets_at_header);
|
||
|
||
let has_data = used_percent != 0.0
|
||
|| window_minutes.is_some_and(|minutes| minutes != 0)
|
||
|| resets_at.is_some();
|
||
|
||
has_data.then_some(RateLimitWindow {
|
||
used_percent,
|
||
window_minutes,
|
||
resets_at,
|
||
})
|
||
})
|
||
}
|
||
|
||
fn parse_header_f64(headers: &HeaderMap, name: &str) -> Option<f64> {
|
||
parse_header_str(headers, name)?
|
||
.parse::<f64>()
|
||
.ok()
|
||
.filter(|v| v.is_finite())
|
||
}
|
||
|
||
fn parse_header_i64(headers: &HeaderMap, name: &str) -> Option<i64> {
|
||
parse_header_str(headers, name)?.parse::<i64>().ok()
|
||
}
|
||
|
||
fn parse_header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> {
|
||
headers.get(name)?.to_str().ok()
|
||
}
|
||
|
||
async fn process_sse<S>(
|
||
stream: S,
|
||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||
idle_timeout: Duration,
|
||
otel_event_manager: OtelEventManager,
|
||
) where
|
||
S: Stream<Item = Result<Bytes>> + Unpin,
|
||
{
|
||
let mut stream = stream.eventsource();
|
||
|
||
// If the stream stays completely silent for an extended period treat it as disconnected.
|
||
// The response id returned from the "complete" message.
|
||
let mut response_completed: Option<ResponseCompleted> = None;
|
||
let mut response_error: Option<LlmxErr> = None;
|
||
|
||
loop {
|
||
let start = std::time::Instant::now();
|
||
let response = timeout(idle_timeout, stream.next()).await;
|
||
let duration = start.elapsed();
|
||
otel_event_manager.log_sse_event(&response, duration);
|
||
|
||
let sse = match response {
|
||
Ok(Some(Ok(sse))) => sse,
|
||
Ok(Some(Err(e))) => {
|
||
debug!("SSE Error: {e:#}");
|
||
let event = LlmxErr::Stream(e.to_string(), None);
|
||
let _ = tx_event.send(Err(event)).await;
|
||
return;
|
||
}
|
||
Ok(None) => {
|
||
match response_completed {
|
||
Some(ResponseCompleted {
|
||
id: response_id,
|
||
usage,
|
||
}) => {
|
||
if let Some(token_usage) = &usage {
|
||
otel_event_manager.sse_event_completed(
|
||
token_usage.input_tokens,
|
||
token_usage.output_tokens,
|
||
token_usage
|
||
.input_tokens_details
|
||
.as_ref()
|
||
.map(|d| d.cached_tokens),
|
||
token_usage
|
||
.output_tokens_details
|
||
.as_ref()
|
||
.map(|d| d.reasoning_tokens),
|
||
token_usage.total_tokens,
|
||
);
|
||
}
|
||
let event = ResponseEvent::Completed {
|
||
response_id,
|
||
token_usage: usage.map(Into::into),
|
||
};
|
||
let _ = tx_event.send(Ok(event)).await;
|
||
}
|
||
None => {
|
||
let error = response_error.unwrap_or(LlmxErr::Stream(
|
||
"stream closed before response.completed".into(),
|
||
None,
|
||
));
|
||
otel_event_manager.see_event_completed_failed(&error);
|
||
|
||
let _ = tx_event.send(Err(error)).await;
|
||
}
|
||
}
|
||
return;
|
||
}
|
||
Err(_) => {
|
||
let _ = tx_event
|
||
.send(Err(LlmxErr::Stream(
|
||
"idle timeout waiting for SSE".into(),
|
||
None,
|
||
)))
|
||
.await;
|
||
return;
|
||
}
|
||
};
|
||
|
||
let raw = sse.data.clone();
|
||
trace!("SSE event: {}", raw);
|
||
|
||
let event: SseEvent = match serde_json::from_str(&sse.data) {
|
||
Ok(event) => event,
|
||
Err(e) => {
|
||
debug!("Failed to parse SSE event: {e}, data: {}", &sse.data);
|
||
continue;
|
||
}
|
||
};
|
||
|
||
match event.kind.as_str() {
|
||
// Individual output item finalised. Forward immediately so the
|
||
// rest of the agent can stream assistant text/functions *live*
|
||
// instead of waiting for the final `response.completed` envelope.
|
||
//
|
||
// IMPORTANT: We used to ignore these events and forward the
|
||
// duplicated `output` array embedded in the `response.completed`
|
||
// payload. That produced two concrete issues:
|
||
// 1. No real‑time streaming – the user only saw output after the
|
||
// entire turn had finished, which broke the "typing" UX and
|
||
// made long‑running turns look stalled.
|
||
// 2. Duplicate `function_call_output` items – both the
|
||
// individual *and* the completed array were forwarded, which
|
||
// confused the backend and triggered 400
|
||
// "previous_response_not_found" errors because the duplicated
|
||
// IDs did not match the incremental turn chain.
|
||
//
|
||
// The fix is to forward the incremental events *as they come* and
|
||
// drop the duplicated list inside `response.completed`.
|
||
"response.output_item.done" => {
|
||
let Some(item_val) = event.item else { continue };
|
||
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
|
||
debug!("failed to parse ResponseItem from output_item.done");
|
||
continue;
|
||
};
|
||
|
||
let event = ResponseEvent::OutputItemDone(item);
|
||
if tx_event.send(Ok(event)).await.is_err() {
|
||
return;
|
||
}
|
||
}
|
||
"response.output_text.delta" => {
|
||
if let Some(delta) = event.delta {
|
||
let event = ResponseEvent::OutputTextDelta(delta);
|
||
if tx_event.send(Ok(event)).await.is_err() {
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
"response.reasoning_summary_text.delta" => {
|
||
if let Some(delta) = event.delta {
|
||
let event = ResponseEvent::ReasoningSummaryDelta(delta);
|
||
if tx_event.send(Ok(event)).await.is_err() {
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
"response.reasoning_text.delta" => {
|
||
if let Some(delta) = event.delta {
|
||
let event = ResponseEvent::ReasoningContentDelta(delta);
|
||
if tx_event.send(Ok(event)).await.is_err() {
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
"response.created" => {
|
||
if event.response.is_some() {
|
||
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
|
||
}
|
||
}
|
||
"response.failed" => {
|
||
if let Some(resp_val) = event.response {
|
||
response_error = Some(LlmxErr::Stream(
|
||
"response.failed event received".to_string(),
|
||
None,
|
||
));
|
||
|
||
let error = resp_val.get("error");
|
||
|
||
if let Some(error) = error {
|
||
match serde_json::from_value::<Error>(error.clone()) {
|
||
Ok(error) => {
|
||
if is_context_window_error(&error) {
|
||
response_error = Some(LlmxErr::ContextWindowExceeded);
|
||
} else if is_quota_exceeded_error(&error) {
|
||
response_error = Some(LlmxErr::QuotaExceeded);
|
||
} else {
|
||
let delay = try_parse_retry_after(&error);
|
||
let message = error.message.clone().unwrap_or_default();
|
||
response_error = Some(LlmxErr::Stream(message, delay));
|
||
}
|
||
}
|
||
Err(e) => {
|
||
let error = format!("failed to parse ErrorResponse: {e}");
|
||
debug!(error);
|
||
response_error = Some(LlmxErr::Stream(error, None))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
// Final response completed – includes array of output items & id
|
||
"response.completed" => {
|
||
if let Some(resp_val) = event.response {
|
||
match serde_json::from_value::<ResponseCompleted>(resp_val) {
|
||
Ok(r) => {
|
||
response_completed = Some(r);
|
||
}
|
||
Err(e) => {
|
||
let error = format!("failed to parse ResponseCompleted: {e}");
|
||
debug!(error);
|
||
response_error = Some(LlmxErr::Stream(error, None));
|
||
continue;
|
||
}
|
||
};
|
||
};
|
||
}
|
||
"response.content_part.done"
|
||
| "response.function_call_arguments.delta"
|
||
| "response.custom_tool_call_input.delta"
|
||
| "response.custom_tool_call_input.done" // also emitted as response.output_item.done
|
||
| "response.in_progress"
|
||
| "response.output_text.done" => {}
|
||
"response.output_item.added" => {
|
||
let Some(item_val) = event.item else { continue };
|
||
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
|
||
debug!("failed to parse ResponseItem from output_item.done");
|
||
continue;
|
||
};
|
||
|
||
let event = ResponseEvent::OutputItemAdded(item);
|
||
if tx_event.send(Ok(event)).await.is_err() {
|
||
return;
|
||
}
|
||
}
|
||
"response.reasoning_summary_part.added" => {
|
||
// Boundary between reasoning summary sections (e.g., titles).
|
||
let event = ResponseEvent::ReasoningSummaryPartAdded;
|
||
if tx_event.send(Ok(event)).await.is_err() {
|
||
return;
|
||
}
|
||
}
|
||
"response.reasoning_summary_text.done" => {}
|
||
_ => {}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// used in tests to stream from a text SSE file
|
||
async fn stream_from_fixture(
|
||
path: impl AsRef<Path>,
|
||
provider: ModelProviderInfo,
|
||
otel_event_manager: OtelEventManager,
|
||
) -> Result<ResponseStream> {
|
||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||
let f = std::fs::File::open(path.as_ref())?;
|
||
let lines = std::io::BufReader::new(f).lines();
|
||
|
||
// insert \n\n after each line for proper SSE parsing
|
||
let mut content = String::new();
|
||
for line in lines {
|
||
content.push_str(&line?);
|
||
content.push_str("\n\n");
|
||
}
|
||
|
||
let rdr = std::io::Cursor::new(content);
|
||
let stream = ReaderStream::new(rdr).map_err(LlmxErr::Io);
|
||
tokio::spawn(process_sse(
|
||
stream,
|
||
tx_event,
|
||
provider.stream_idle_timeout(),
|
||
otel_event_manager,
|
||
));
|
||
Ok(ResponseStream { rx_event })
|
||
}
|
||
|
||
fn rate_limit_regex() -> &'static Regex {
|
||
static RE: OnceLock<Regex> = OnceLock::new();
|
||
|
||
// Match both OpenAI-style messages like "Please try again in 1.898s"
|
||
// and Azure OpenAI-style messages like "Try again in 35 seconds".
|
||
#[expect(clippy::unwrap_used)]
|
||
RE.get_or_init(|| Regex::new(r"(?i)try again in\s*(\d+(?:\.\d+)?)\s*(s|ms|seconds?)").unwrap())
|
||
}
|
||
|
||
fn try_parse_retry_after(err: &Error) -> Option<Duration> {
|
||
if err.code != Some("rate_limit_exceeded".to_string()) {
|
||
return None;
|
||
}
|
||
|
||
// parse retry hints like "try again in 1.898s" or
|
||
// "Try again in 35 seconds" using regex
|
||
let re = rate_limit_regex();
|
||
if let Some(message) = &err.message
|
||
&& let Some(captures) = re.captures(message)
|
||
{
|
||
let seconds = captures.get(1);
|
||
let unit = captures.get(2);
|
||
|
||
if let (Some(value), Some(unit)) = (seconds, unit) {
|
||
let value = value.as_str().parse::<f64>().ok()?;
|
||
let unit = unit.as_str().to_ascii_lowercase();
|
||
|
||
if unit == "s" || unit.starts_with("second") {
|
||
return Some(Duration::from_secs_f64(value));
|
||
} else if unit == "ms" {
|
||
return Some(Duration::from_millis(value as u64));
|
||
}
|
||
}
|
||
}
|
||
None
|
||
}
|
||
|
||
fn is_context_window_error(error: &Error) -> bool {
|
||
error.code.as_deref() == Some("context_length_exceeded")
|
||
}
|
||
|
||
fn is_quota_exceeded_error(error: &Error) -> bool {
|
||
error.code.as_deref() == Some("insufficient_quota")
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
use assert_matches::assert_matches;
|
||
use serde_json::json;
|
||
use tokio::sync::mpsc;
|
||
use tokio_test::io::Builder as IoBuilder;
|
||
use tokio_util::io::ReaderStream;
|
||
|
||
// ────────────────────────────
|
||
// Helpers
|
||
// ────────────────────────────
|
||
|
||
/// Runs the SSE parser on pre-chunked byte slices and returns every event
|
||
/// (including any final `Err` from a stream-closure check).
|
||
async fn collect_events(
|
||
chunks: &[&[u8]],
|
||
provider: ModelProviderInfo,
|
||
otel_event_manager: OtelEventManager,
|
||
) -> Vec<Result<ResponseEvent>> {
|
||
let mut builder = IoBuilder::new();
|
||
for chunk in chunks {
|
||
builder.read(chunk);
|
||
}
|
||
|
||
let reader = builder.build();
|
||
let stream = ReaderStream::new(reader).map_err(LlmxErr::Io);
|
||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
|
||
tokio::spawn(process_sse(
|
||
stream,
|
||
tx,
|
||
provider.stream_idle_timeout(),
|
||
otel_event_manager,
|
||
));
|
||
|
||
let mut events = Vec::new();
|
||
while let Some(ev) = rx.recv().await {
|
||
events.push(ev);
|
||
}
|
||
events
|
||
}
|
||
|
||
/// Builds an in-memory SSE stream from JSON fixtures and returns only the
|
||
/// successfully parsed events (panics on internal channel errors).
|
||
async fn run_sse(
|
||
events: Vec<serde_json::Value>,
|
||
provider: ModelProviderInfo,
|
||
otel_event_manager: OtelEventManager,
|
||
) -> Vec<ResponseEvent> {
|
||
let mut body = String::new();
|
||
for e in events {
|
||
let kind = e
|
||
.get("type")
|
||
.and_then(|v| v.as_str())
|
||
.expect("fixture event missing type");
|
||
if e.as_object().map(|o| o.len() == 1).unwrap_or(false) {
|
||
body.push_str(&format!("event: {kind}\n\n"));
|
||
} else {
|
||
body.push_str(&format!("event: {kind}\ndata: {e}\n\n"));
|
||
}
|
||
}
|
||
|
||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
|
||
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(LlmxErr::Io);
|
||
tokio::spawn(process_sse(
|
||
stream,
|
||
tx,
|
||
provider.stream_idle_timeout(),
|
||
otel_event_manager,
|
||
));
|
||
|
||
let mut out = Vec::new();
|
||
while let Some(ev) = rx.recv().await {
|
||
out.push(ev.expect("channel closed"));
|
||
}
|
||
out
|
||
}
|
||
|
||
fn otel_event_manager() -> OtelEventManager {
|
||
OtelEventManager::new(
|
||
ConversationId::new(),
|
||
"test",
|
||
"test",
|
||
None,
|
||
Some("test@test.com".to_string()),
|
||
Some(AuthMode::ChatGPT),
|
||
false,
|
||
"test".to_string(),
|
||
)
|
||
}
|
||
|
||
// ────────────────────────────
|
||
// Tests from `implement-test-for-responses-api-sse-parser`
|
||
// ────────────────────────────
|
||
|
||
#[tokio::test]
|
||
async fn parses_items_and_completed() {
|
||
let item1 = json!({
|
||
"type": "response.output_item.done",
|
||
"item": {
|
||
"type": "message",
|
||
"role": "assistant",
|
||
"content": [{"type": "output_text", "text": "Hello"}]
|
||
}
|
||
})
|
||
.to_string();
|
||
|
||
let item2 = json!({
|
||
"type": "response.output_item.done",
|
||
"item": {
|
||
"type": "message",
|
||
"role": "assistant",
|
||
"content": [{"type": "output_text", "text": "World"}]
|
||
}
|
||
})
|
||
.to_string();
|
||
|
||
let completed = json!({
|
||
"type": "response.completed",
|
||
"response": { "id": "resp1" }
|
||
})
|
||
.to_string();
|
||
|
||
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
|
||
let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n");
|
||
let sse3 = format!("event: response.completed\ndata: {completed}\n\n");
|
||
|
||
let provider = ModelProviderInfo {
|
||
name: "test".to_string(),
|
||
base_url: Some("https://test.com".to_string()),
|
||
env_key: Some("TEST_API_KEY".to_string()),
|
||
env_key_instructions: None,
|
||
experimental_bearer_token: None,
|
||
wire_api: WireApi::Responses,
|
||
query_params: None,
|
||
http_headers: None,
|
||
env_http_headers: None,
|
||
request_max_retries: Some(0),
|
||
stream_max_retries: Some(0),
|
||
stream_idle_timeout_ms: Some(1000),
|
||
requires_openai_auth: false,
|
||
};
|
||
|
||
let otel_event_manager = otel_event_manager();
|
||
|
||
let events = collect_events(
|
||
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
|
||
provider,
|
||
otel_event_manager,
|
||
)
|
||
.await;
|
||
|
||
assert_eq!(events.len(), 3);
|
||
|
||
matches!(
|
||
&events[0],
|
||
Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. }))
|
||
if role == "assistant"
|
||
);
|
||
|
||
matches!(
|
||
&events[1],
|
||
Ok(ResponseEvent::OutputItemDone(ResponseItem::Message { role, .. }))
|
||
if role == "assistant"
|
||
);
|
||
|
||
match &events[2] {
|
||
Ok(ResponseEvent::Completed {
|
||
response_id,
|
||
token_usage,
|
||
}) => {
|
||
assert_eq!(response_id, "resp1");
|
||
assert!(token_usage.is_none());
|
||
}
|
||
other => panic!("unexpected third event: {other:?}"),
|
||
}
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn error_when_missing_completed() {
|
||
let item1 = json!({
|
||
"type": "response.output_item.done",
|
||
"item": {
|
||
"type": "message",
|
||
"role": "assistant",
|
||
"content": [{"type": "output_text", "text": "Hello"}]
|
||
}
|
||
})
|
||
.to_string();
|
||
|
||
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
|
||
let provider = ModelProviderInfo {
|
||
name: "test".to_string(),
|
||
base_url: Some("https://test.com".to_string()),
|
||
env_key: Some("TEST_API_KEY".to_string()),
|
||
env_key_instructions: None,
|
||
experimental_bearer_token: None,
|
||
wire_api: WireApi::Responses,
|
||
query_params: None,
|
||
http_headers: None,
|
||
env_http_headers: None,
|
||
request_max_retries: Some(0),
|
||
stream_max_retries: Some(0),
|
||
stream_idle_timeout_ms: Some(1000),
|
||
requires_openai_auth: false,
|
||
};
|
||
|
||
let otel_event_manager = otel_event_manager();
|
||
|
||
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
|
||
|
||
assert_eq!(events.len(), 2);
|
||
|
||
matches!(events[0], Ok(ResponseEvent::OutputItemDone(_)));
|
||
|
||
match &events[1] {
|
||
Err(LlmxErr::Stream(msg, _)) => {
|
||
assert_eq!(msg, "stream closed before response.completed")
|
||
}
|
||
other => panic!("unexpected second event: {other:?}"),
|
||
}
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn error_when_error_event() {
|
||
let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_689bcf18d7f08194bf3440ba62fe05d803fee0cdac429894","object":"response","created_at":1755041560,"status":"failed","background":false,"error":{"code":"rate_limit_exceeded","message":"Rate limit reached for gpt-5 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."}, "usage":null,"user":null,"metadata":{}}}"#;
|
||
|
||
let sse1 = format!("event: response.failed\ndata: {raw_error}\n\n");
|
||
let provider = ModelProviderInfo {
|
||
name: "test".to_string(),
|
||
base_url: Some("https://test.com".to_string()),
|
||
env_key: Some("TEST_API_KEY".to_string()),
|
||
env_key_instructions: None,
|
||
experimental_bearer_token: None,
|
||
wire_api: WireApi::Responses,
|
||
query_params: None,
|
||
http_headers: None,
|
||
env_http_headers: None,
|
||
request_max_retries: Some(0),
|
||
stream_max_retries: Some(0),
|
||
stream_idle_timeout_ms: Some(1000),
|
||
requires_openai_auth: false,
|
||
};
|
||
|
||
let otel_event_manager = otel_event_manager();
|
||
|
||
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
|
||
|
||
assert_eq!(events.len(), 1);
|
||
|
||
match &events[0] {
|
||
Err(LlmxErr::Stream(msg, delay)) => {
|
||
assert_eq!(
|
||
msg,
|
||
"Rate limit reached for gpt-5 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."
|
||
);
|
||
assert_eq!(*delay, Some(Duration::from_secs_f64(11.054)));
|
||
}
|
||
other => panic!("unexpected second event: {other:?}"),
|
||
}
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn context_window_error_is_fatal() {
|
||
let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_5c66275b97b9baef1ed95550adb3b7ec13b17aafd1d2f11b","object":"response","created_at":1759510079,"status":"failed","background":false,"error":{"code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again."},"usage":null,"user":null,"metadata":{}}}"#;
|
||
|
||
let sse1 = format!("event: response.failed\ndata: {raw_error}\n\n");
|
||
let provider = ModelProviderInfo {
|
||
name: "test".to_string(),
|
||
base_url: Some("https://test.com".to_string()),
|
||
env_key: Some("TEST_API_KEY".to_string()),
|
||
env_key_instructions: None,
|
||
experimental_bearer_token: None,
|
||
wire_api: WireApi::Responses,
|
||
query_params: None,
|
||
http_headers: None,
|
||
env_http_headers: None,
|
||
request_max_retries: Some(0),
|
||
stream_max_retries: Some(0),
|
||
stream_idle_timeout_ms: Some(1000),
|
||
requires_openai_auth: false,
|
||
};
|
||
|
||
let otel_event_manager = otel_event_manager();
|
||
|
||
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
|
||
|
||
assert_eq!(events.len(), 1);
|
||
|
||
match &events[0] {
|
||
Err(err @ LlmxErr::ContextWindowExceeded) => {
|
||
assert_eq!(err.to_string(), LlmxErr::ContextWindowExceeded.to_string());
|
||
}
|
||
other => panic!("unexpected context window event: {other:?}"),
|
||
}
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn context_window_error_with_newline_is_fatal() {
|
||
let raw_error = r#"{"type":"response.failed","sequence_number":4,"response":{"id":"resp_fatal_newline","object":"response","created_at":1759510080,"status":"failed","background":false,"error":{"code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try\nagain."},"usage":null,"user":null,"metadata":{}}}"#;
|
||
|
||
let sse1 = format!("event: response.failed\ndata: {raw_error}\n\n");
|
||
let provider = ModelProviderInfo {
|
||
name: "test".to_string(),
|
||
base_url: Some("https://test.com".to_string()),
|
||
env_key: Some("TEST_API_KEY".to_string()),
|
||
env_key_instructions: None,
|
||
experimental_bearer_token: None,
|
||
wire_api: WireApi::Responses,
|
||
query_params: None,
|
||
http_headers: None,
|
||
env_http_headers: None,
|
||
request_max_retries: Some(0),
|
||
stream_max_retries: Some(0),
|
||
stream_idle_timeout_ms: Some(1000),
|
||
requires_openai_auth: false,
|
||
};
|
||
|
||
let otel_event_manager = otel_event_manager();
|
||
|
||
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
|
||
|
||
assert_eq!(events.len(), 1);
|
||
|
||
match &events[0] {
|
||
Err(err @ LlmxErr::ContextWindowExceeded) => {
|
||
assert_eq!(err.to_string(), LlmxErr::ContextWindowExceeded.to_string());
|
||
}
|
||
other => panic!("unexpected context window event: {other:?}"),
|
||
}
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn quota_exceeded_error_is_fatal() {
|
||
let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_fatal_quota","object":"response","created_at":1759771626,"status":"failed","background":false,"error":{"code":"insufficient_quota","message":"You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors."},"incomplete_details":null}}"#;
|
||
|
||
let sse1 = format!("event: response.failed\ndata: {raw_error}\n\n");
|
||
let provider = ModelProviderInfo {
|
||
name: "test".to_string(),
|
||
base_url: Some("https://test.com".to_string()),
|
||
env_key: Some("TEST_API_KEY".to_string()),
|
||
env_key_instructions: None,
|
||
experimental_bearer_token: None,
|
||
wire_api: WireApi::Responses,
|
||
query_params: None,
|
||
http_headers: None,
|
||
env_http_headers: None,
|
||
request_max_retries: Some(0),
|
||
stream_max_retries: Some(0),
|
||
stream_idle_timeout_ms: Some(1000),
|
||
requires_openai_auth: false,
|
||
};
|
||
|
||
let otel_event_manager = otel_event_manager();
|
||
|
||
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
|
||
|
||
assert_eq!(events.len(), 1);
|
||
|
||
match &events[0] {
|
||
Err(err @ LlmxErr::QuotaExceeded) => {
|
||
assert_eq!(err.to_string(), LlmxErr::QuotaExceeded.to_string());
|
||
}
|
||
other => panic!("unexpected quota exceeded event: {other:?}"),
|
||
}
|
||
}
|
||
|
||
// ────────────────────────────
|
||
// Table-driven test from `main`
|
||
// ────────────────────────────
|
||
|
||
/// Verifies that the adapter produces the right `ResponseEvent` for a
|
||
/// variety of incoming `type` values.
|
||
#[tokio::test]
|
||
async fn table_driven_event_kinds() {
|
||
struct TestCase {
|
||
name: &'static str,
|
||
event: serde_json::Value,
|
||
expect_first: fn(&ResponseEvent) -> bool,
|
||
expected_len: usize,
|
||
}
|
||
|
||
fn is_created(ev: &ResponseEvent) -> bool {
|
||
matches!(ev, ResponseEvent::Created)
|
||
}
|
||
fn is_output(ev: &ResponseEvent) -> bool {
|
||
matches!(ev, ResponseEvent::OutputItemDone(_))
|
||
}
|
||
fn is_completed(ev: &ResponseEvent) -> bool {
|
||
matches!(ev, ResponseEvent::Completed { .. })
|
||
}
|
||
|
||
let completed = json!({
|
||
"type": "response.completed",
|
||
"response": {
|
||
"id": "c",
|
||
"usage": {
|
||
"input_tokens": 0,
|
||
"input_tokens_details": null,
|
||
"output_tokens": 0,
|
||
"output_tokens_details": null,
|
||
"total_tokens": 0
|
||
},
|
||
"output": []
|
||
}
|
||
});
|
||
|
||
let cases = vec![
|
||
TestCase {
|
||
name: "created",
|
||
event: json!({"type": "response.created", "response": {}}),
|
||
expect_first: is_created,
|
||
expected_len: 2,
|
||
},
|
||
TestCase {
|
||
name: "output_item.done",
|
||
event: json!({
|
||
"type": "response.output_item.done",
|
||
"item": {
|
||
"type": "message",
|
||
"role": "assistant",
|
||
"content": [
|
||
{"type": "output_text", "text": "hi"}
|
||
]
|
||
}
|
||
}),
|
||
expect_first: is_output,
|
||
expected_len: 2,
|
||
},
|
||
TestCase {
|
||
name: "unknown",
|
||
event: json!({"type": "response.new_tool_event"}),
|
||
expect_first: is_completed,
|
||
expected_len: 1,
|
||
},
|
||
];
|
||
|
||
for case in cases {
|
||
let mut evs = vec![case.event];
|
||
evs.push(completed.clone());
|
||
|
||
let provider = ModelProviderInfo {
|
||
name: "test".to_string(),
|
||
base_url: Some("https://test.com".to_string()),
|
||
env_key: Some("TEST_API_KEY".to_string()),
|
||
env_key_instructions: None,
|
||
experimental_bearer_token: None,
|
||
wire_api: WireApi::Responses,
|
||
query_params: None,
|
||
http_headers: None,
|
||
env_http_headers: None,
|
||
request_max_retries: Some(0),
|
||
stream_max_retries: Some(0),
|
||
stream_idle_timeout_ms: Some(1000),
|
||
requires_openai_auth: false,
|
||
};
|
||
|
||
let otel_event_manager = otel_event_manager();
|
||
|
||
let out = run_sse(evs, provider, otel_event_manager).await;
|
||
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
|
||
assert!(
|
||
(case.expect_first)(&out[0]),
|
||
"first event mismatch in case {}",
|
||
case.name
|
||
);
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_try_parse_retry_after() {
|
||
let err = Error {
|
||
r#type: None,
|
||
message: Some("Rate limit reached for gpt-5 in organization org- on tokens per min (TPM): Limit 1, Used 1, Requested 19304. Please try again in 28ms. Visit https://platform.openai.com/account/rate-limits to learn more.".to_string()),
|
||
code: Some("rate_limit_exceeded".to_string()),
|
||
plan_type: None,
|
||
resets_at: None
|
||
};
|
||
|
||
let delay = try_parse_retry_after(&err);
|
||
assert_eq!(delay, Some(Duration::from_millis(28)));
|
||
}
|
||
|
||
#[test]
|
||
fn test_try_parse_retry_after_no_delay() {
|
||
let err = Error {
|
||
r#type: None,
|
||
message: Some("Rate limit reached for gpt-5 in organization <ORG> on tokens per min (TPM): Limit 30000, Used 6899, Requested 24050. Please try again in 1.898s. Visit https://platform.openai.com/account/rate-limits to learn more.".to_string()),
|
||
code: Some("rate_limit_exceeded".to_string()),
|
||
plan_type: None,
|
||
resets_at: None
|
||
};
|
||
let delay = try_parse_retry_after(&err);
|
||
assert_eq!(delay, Some(Duration::from_secs_f64(1.898)));
|
||
}
|
||
|
||
#[test]
|
||
fn test_try_parse_retry_after_azure() {
|
||
let err = Error {
|
||
r#type: None,
|
||
message: Some("Rate limit exceeded. Try again in 35 seconds.".to_string()),
|
||
code: Some("rate_limit_exceeded".to_string()),
|
||
plan_type: None,
|
||
resets_at: None,
|
||
};
|
||
let delay = try_parse_retry_after(&err);
|
||
assert_eq!(delay, Some(Duration::from_secs(35)));
|
||
}
|
||
|
||
#[test]
|
||
fn error_response_deserializes_schema_known_plan_type_and_serializes_back() {
|
||
use crate::token_data::KnownPlan;
|
||
use crate::token_data::PlanType;
|
||
|
||
let json =
|
||
r#"{"error":{"type":"usage_limit_reached","plan_type":"pro","resets_at":1704067200}}"#;
|
||
let resp: ErrorResponse = serde_json::from_str(json).expect("should deserialize schema");
|
||
|
||
assert_matches!(resp.error.plan_type, Some(PlanType::Known(KnownPlan::Pro)));
|
||
|
||
let plan_json = serde_json::to_string(&resp.error.plan_type).expect("serialize plan_type");
|
||
assert_eq!(plan_json, "\"pro\"");
|
||
}
|
||
|
||
#[test]
|
||
fn error_response_deserializes_schema_unknown_plan_type_and_serializes_back() {
|
||
use crate::token_data::PlanType;
|
||
|
||
let json =
|
||
r#"{"error":{"type":"usage_limit_reached","plan_type":"vip","resets_at":1704067260}}"#;
|
||
let resp: ErrorResponse = serde_json::from_str(json).expect("should deserialize schema");
|
||
|
||
assert_matches!(resp.error.plan_type, Some(PlanType::Unknown(ref s)) if s == "vip");
|
||
|
||
let plan_json = serde_json::to_string(&resp.error.plan_type).expect("serialize plan_type");
|
||
assert_eq!(plan_json, "\"vip\"");
|
||
}
|
||
}
|