From 04504d8218465326ed19f967e773b29b152e916b Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 20 Sep 2025 21:26:16 -0700 Subject: [PATCH] Forward Rate limits to the UI (#3965) We currently get information about rate limits in the response headers. We want to forward them to the clients to have better transparency. UI/UX plans have been discussed and this information is needed. --- codex-rs/core/src/chat_completions.rs | 3 + codex-rs/core/src/client.rs | 43 ++++++++++++ codex-rs/core/src/client_common.rs | 2 + codex-rs/core/src/codex.rs | 46 +++++++++---- codex-rs/core/tests/suite/client.rs | 95 +++++++++++++++++++++++++++ codex-rs/protocol/src/protocol.rs | 15 +++++ 6 files changed, 192 insertions(+), 12 deletions(-) diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index fc8602de..f666cc11 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -716,6 +716,9 @@ where // Not an assistant message – forward immediately. return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))); } + Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => { + return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))); + } Poll::Ready(Some(Ok(ResponseEvent::Completed { response_id, token_usage, diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 055c3afa..57848d38 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -11,6 +11,7 @@ use eventsource_stream::Eventsource; use futures::prelude::*; use regex_lite::Regex; use reqwest::StatusCode; +use reqwest::header::HeaderMap; use serde::Deserialize; use serde::Serialize; use serde_json::Value; @@ -40,6 +41,7 @@ use crate::model_provider_info::ModelProviderInfo; use crate::model_provider_info::WireApi; use crate::openai_model_info::get_model_info; use crate::openai_tools::create_tools_json_for_responses_api; +use crate::protocol::RateLimitSnapshotEvent; use crate::protocol::TokenUsage; use crate::token_data::PlanType; use crate::util::backoff; @@ -274,6 +276,15 @@ impl ModelClient { Ok(resp) if resp.status().is_success() => { let (tx_event, rx_event) = mpsc::channel::>(1600); + if let Some(snapshot) = parse_rate_limit_snapshot(resp.headers()) + && tx_event + .send(Ok(ResponseEvent::RateLimits(snapshot))) + .await + .is_err() + { + debug!("receiver dropped rate limit snapshot event"); + } + // spawn task to process SSE let stream = resp.bytes_stream().map_err(CodexErr::Reqwest); tokio::spawn(process_sse( @@ -473,6 +484,38 @@ fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) { } } +fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option { + let primary_used_percent = parse_header_f64(headers, "x-codex-primary-used-percent")?; + let weekly_used_percent = parse_header_f64(headers, "x-codex-protection-used-percent")?; + let primary_to_weekly_ratio_percent = + parse_header_f64(headers, "x-codex-primary-over-protection-limit-percent")?; + let primary_window_minutes = parse_header_u64(headers, "x-codex-primary-window-minutes")?; + let weekly_window_minutes = parse_header_u64(headers, "x-codex-protection-window-minutes")?; + + Some(RateLimitSnapshotEvent { + primary_used_percent, + weekly_used_percent, + primary_to_weekly_ratio_percent, + primary_window_minutes, + weekly_window_minutes, + }) +} + +fn parse_header_f64(headers: &HeaderMap, name: &str) -> Option { + parse_header_str(headers, name)? + .parse::() + .ok() + .filter(|v| v.is_finite()) +} + +fn parse_header_u64(headers: &HeaderMap, name: &str) -> Option { + parse_header_str(headers, name)?.parse::().ok() +} + +fn parse_header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> { + headers.get(name)?.to_str().ok() +} + async fn process_sse( stream: S, tx_event: mpsc::Sender>, diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index 3a5eb5b1..15bfb5d4 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -1,6 +1,7 @@ use crate::error::Result; use crate::model_family::ModelFamily; use crate::openai_tools::OpenAiTool; +use crate::protocol::RateLimitSnapshotEvent; use crate::protocol::TokenUsage; use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; @@ -78,6 +79,7 @@ pub enum ResponseEvent { WebSearchCallBegin { call_id: String, }, + RateLimits(RateLimitSnapshotEvent), } #[derive(Debug, Serialize)] diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8852b258..05ef0937 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -98,6 +98,7 @@ use crate::protocol::ListCustomPromptsResponseEvent; use crate::protocol::Op; use crate::protocol::PatchApplyBeginEvent; use crate::protocol::PatchApplyEndEvent; +use crate::protocol::RateLimitSnapshotEvent; use crate::protocol::ReviewDecision; use crate::protocol::ReviewOutputEvent; use crate::protocol::SandboxPolicy; @@ -105,6 +106,7 @@ use crate::protocol::SessionConfiguredEvent; use crate::protocol::StreamErrorEvent; use crate::protocol::Submission; use crate::protocol::TaskCompleteEvent; +use crate::protocol::TokenCountEvent; use crate::protocol::TokenUsage; use crate::protocol::TokenUsageInfo; use crate::protocol::TurnDiffEvent; @@ -257,6 +259,7 @@ struct State { pending_input: Vec, history: ConversationHistory, token_info: Option, + latest_rate_limits: Option, } /// Context for an initialized model agent @@ -738,16 +741,30 @@ impl Session { async fn update_token_usage_info( &self, turn_context: &TurnContext, - token_usage: &Option, - ) -> Option { + token_usage: Option<&TokenUsage>, + ) { let mut state = self.state.lock().await; - let info = TokenUsageInfo::new_or_append( - &state.token_info, - token_usage, - turn_context.client.get_model_context_window(), - ); - state.token_info = info.clone(); - info + if let Some(token_usage) = token_usage { + let info = TokenUsageInfo::new_or_append( + &state.token_info, + &Some(token_usage.clone()), + turn_context.client.get_model_context_window(), + ); + state.token_info = info; + } + } + + async fn update_rate_limits(&self, new_rate_limits: RateLimitSnapshotEvent) { + let mut state = self.state.lock().await; + state.latest_rate_limits = Some(new_rate_limits); + } + + async fn get_token_count_event(&self) -> TokenCountEvent { + let state = self.state.lock().await; + TokenCountEvent { + info: state.token_info.clone(), + rate_limits: state.latest_rate_limits.clone(), + } } /// Record a user input item to conversation history and also persist a @@ -2136,17 +2153,22 @@ async fn try_run_turn( }) .await; } + ResponseEvent::RateLimits(snapshot) => { + // Update internal state with latest rate limits, but defer sending until + // token usage is available to avoid duplicate TokenCount events. + sess.update_rate_limits(snapshot).await; + } ResponseEvent::Completed { response_id: _, token_usage, } => { - let info = sess - .update_token_usage_info(turn_context, &token_usage) + sess.update_token_usage_info(turn_context, token_usage.as_ref()) .await; + let token_event = sess.get_token_count_event().await; let _ = sess .send_event(Event { id: sub_id.to_string(), - msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }), + msg: EventMsg::TokenCount(token_event), }) .await; diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index cfc6f5f4..d0ae608c 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -22,6 +22,7 @@ use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::WebSearchAction; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; +use core_test_support::responses; use core_test_support::wait_for_event; use futures::StreamExt; use serde_json::json; @@ -776,6 +777,100 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { assert_eq!(body["input"][5]["id"].as_str(), Some("custom-tool-id")); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn token_count_includes_rate_limits_snapshot() { + let server = MockServer::start().await; + + let sse_body = responses::sse(vec![responses::ev_completed_with_tokens("resp_rate", 123)]); + + let response = ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .insert_header("x-codex-primary-used-percent", "12.5") + .insert_header("x-codex-protection-used-percent", "40.0") + .insert_header("x-codex-primary-over-protection-limit-percent", "75.0") + .insert_header("x-codex-primary-window-minutes", "10") + .insert_header("x-codex-protection-window-minutes", "60") + .set_body_raw(sse_body, "text/event-stream"); + + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(response) + .expect(1) + .mount(&server) + .await; + + let mut provider = built_in_model_providers()["openai"].clone(); + provider.base_url = Some(format!("{}/v1", server.uri())); + + let home = TempDir::new().unwrap(); + let mut config = load_default_config_for_test(&home); + config.model_provider = provider; + + let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("test")); + let codex = conversation_manager + .new_conversation(config) + .await + .expect("create conversation") + .conversation; + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + let token_event = wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await; + let final_payload = match token_event { + EventMsg::TokenCount(ev) => ev, + _ => unreachable!(), + }; + // Assert full JSON for the final token count event (usage + rate limits) + let final_json = serde_json::to_value(&final_payload).unwrap(); + pretty_assertions::assert_eq!( + final_json, + json!({ + "info": { + "total_token_usage": { + "input_tokens": 123, + "cached_input_tokens": 0, + "output_tokens": 0, + "reasoning_output_tokens": 0, + "total_tokens": 123 + }, + "last_token_usage": { + "input_tokens": 123, + "cached_input_tokens": 0, + "output_tokens": 0, + "reasoning_output_tokens": 0, + "total_tokens": 123 + }, + // Default model is gpt-5 in tests → 272000 context window + "model_context_window": 272000 + }, + "rate_limits": { + "primary_used_percent": 12.5, + "weekly_used_percent": 40.0, + "primary_to_weekly_ratio_percent": 75.0, + "primary_window_minutes": 10, + "weekly_window_minutes": 60 + } + }) + ); + let usage = final_payload + .info + .expect("token usage info should be recorded after completion"); + assert_eq!(usage.total_token_usage.total_tokens, 123); + let final_snapshot = final_payload + .rate_limits + .expect("latest rate limit snapshot should be retained"); + assert_eq!(final_snapshot.primary_used_percent, 12.5); + + wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn azure_overrides_assign_properties_used_for_responses_url() { let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" }; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 8009ac9b..edcdcbeb 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -589,6 +589,21 @@ impl TokenUsageInfo { #[derive(Debug, Clone, Deserialize, Serialize, TS)] pub struct TokenCountEvent { pub info: Option, + pub rate_limits: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS)] +pub struct RateLimitSnapshotEvent { + /// Percentage (0-100) of the primary window that has been consumed. + pub primary_used_percent: f64, + /// Percentage (0-100) of the protection window that has been consumed. + pub weekly_used_percent: f64, + /// Size of the primary window relative to weekly (0-100). + pub primary_to_weekly_ratio_percent: f64, + /// Rolling window duration for the primary limit, in minutes. + pub primary_window_minutes: u64, + /// Rolling window duration for the weekly limit, in minutes. + pub weekly_window_minutes: u64, } // Includes prompts, tools and space to call compact.