From 8227a5ba1b0edb313264bedb1cc3e427fa7a46fd Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 23 Sep 2025 15:56:34 -0700 Subject: [PATCH] Send limits when getting rate limited (#4102) Users need visibility on rate limits when they are rate limited. --- codex-rs/core/src/client.rs | 8 ++- codex-rs/core/src/client_common.rs | 4 +- codex-rs/core/src/codex.rs | 69 +++++++++++-------- codex-rs/core/src/error.rs | 22 ++++++ codex-rs/core/tests/suite/client.rs | 103 +++++++++++++++++++++++++++- codex-rs/protocol/src/protocol.rs | 4 +- codex-rs/tui/src/chatwidget.rs | 16 +++-- codex-rs/tui/src/history_cell.rs | 6 +- 8 files changed, 186 insertions(+), 46 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index f15983e8..73f36109 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -42,7 +42,7 @@ use crate::model_provider_info::ModelProviderInfo; use crate::model_provider_info::WireApi; use crate::openai_model_info::get_model_info; use crate::openai_tools::create_tools_json_for_responses_api; -use crate::protocol::RateLimitSnapshotEvent; +use crate::protocol::RateLimitSnapshot; use crate::protocol::TokenUsage; use crate::token_data::PlanType; use crate::util::backoff; @@ -330,6 +330,7 @@ impl ModelClient { } if status == StatusCode::TOO_MANY_REQUESTS { + let rate_limit_snapshot = parse_rate_limit_snapshot(res.headers()); let body = res.json::().await.ok(); if let Some(ErrorResponse { error }) = body { if error.r#type.as_deref() == Some("usage_limit_reached") { @@ -343,6 +344,7 @@ impl ModelClient { return Err(CodexErr::UsageLimitReached(UsageLimitReachedError { plan_type, resets_in_seconds, + rate_limits: rate_limit_snapshot, })); } else if error.r#type.as_deref() == Some("usage_not_included") { return Err(CodexErr::UsageNotIncluded); @@ -485,7 +487,7 @@ fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) { } } -fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option { +fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option { let primary_used_percent = parse_header_f64(headers, "x-codex-primary-used-percent")?; let secondary_used_percent = parse_header_f64(headers, "x-codex-secondary-used-percent")?; let primary_to_secondary_ratio_percent = @@ -493,7 +495,7 @@ fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option, history: ConversationHistory, token_info: Option, - latest_rate_limits: Option, + latest_rate_limits: Option, } /// Context for an initialized model agent @@ -739,31 +739,42 @@ impl Session { async fn update_token_usage_info( &self, + sub_id: &str, turn_context: &TurnContext, token_usage: Option<&TokenUsage>, ) { - let mut state = self.state.lock().await; - if let Some(token_usage) = token_usage { - let info = TokenUsageInfo::new_or_append( - &state.token_info, - &Some(token_usage.clone()), - turn_context.client.get_model_context_window(), - ); - state.token_info = info; + { + let mut state = self.state.lock().await; + if let Some(token_usage) = token_usage { + let info = TokenUsageInfo::new_or_append( + &state.token_info, + &Some(token_usage.clone()), + turn_context.client.get_model_context_window(), + ); + state.token_info = info; + } } + self.send_token_count_event(sub_id).await; } - async fn update_rate_limits(&self, new_rate_limits: RateLimitSnapshotEvent) { - let mut state = self.state.lock().await; - state.latest_rate_limits = Some(new_rate_limits); + async fn update_rate_limits(&self, sub_id: &str, new_rate_limits: RateLimitSnapshot) { + { + let mut state = self.state.lock().await; + state.latest_rate_limits = Some(new_rate_limits); + } + self.send_token_count_event(sub_id).await; } - async fn get_token_count_event(&self) -> TokenCountEvent { - let state = self.state.lock().await; - TokenCountEvent { - info: state.token_info.clone(), - rate_limits: state.latest_rate_limits.clone(), - } + async fn send_token_count_event(&self, sub_id: &str) { + let (info, rate_limits) = { + let state = self.state.lock().await; + (state.token_info.clone(), state.latest_rate_limits.clone()) + }; + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::TokenCount(TokenCountEvent { info, rate_limits }), + }; + self.send_event(event).await; } /// Record a user input item to conversation history and also persist a @@ -1957,9 +1968,14 @@ async fn run_turn( Ok(output) => return Ok(output), Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted), Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), - Err(e @ (CodexErr::UsageLimitReached(_) | CodexErr::UsageNotIncluded)) => { - return Err(e); + Err(CodexErr::UsageLimitReached(e)) => { + let rate_limits = e.rate_limits.clone(); + if let Some(rate_limits) = rate_limits { + sess.update_rate_limits(&sub_id, rate_limits).await; + } + return Err(CodexErr::UsageLimitReached(e)); } + Err(CodexErr::UsageNotIncluded) => return Err(CodexErr::UsageNotIncluded), Err(e) => { // Use the configured provider-specific stream retry budget. let max_retries = turn_context.client.get_provider().stream_max_retries(); @@ -2132,20 +2148,13 @@ async fn try_run_turn( ResponseEvent::RateLimits(snapshot) => { // Update internal state with latest rate limits, but defer sending until // token usage is available to avoid duplicate TokenCount events. - sess.update_rate_limits(snapshot).await; + sess.update_rate_limits(sub_id, snapshot).await; } ResponseEvent::Completed { response_id: _, token_usage, } => { - sess.update_token_usage_info(turn_context, token_usage.as_ref()) - .await; - let token_event = sess.get_token_count_event().await; - let _ = sess - .send_event(Event { - id: sub_id.to_string(), - msg: EventMsg::TokenCount(token_event), - }) + sess.update_token_usage_info(sub_id, turn_context, token_usage.as_ref()) .await; let unified_diff = turn_diff_tracker.get_unified_diff(); diff --git a/codex-rs/core/src/error.rs b/codex-rs/core/src/error.rs index a5150abd..9840e167 100644 --- a/codex-rs/core/src/error.rs +++ b/codex-rs/core/src/error.rs @@ -2,6 +2,7 @@ use crate::exec::ExecToolCallOutput; use crate::token_data::KnownPlan; use crate::token_data::PlanType; use codex_protocol::mcp_protocol::ConversationId; +use codex_protocol::protocol::RateLimitSnapshot; use reqwest::StatusCode; use serde_json; use std::io; @@ -138,6 +139,7 @@ pub enum CodexErr { pub struct UsageLimitReachedError { pub(crate) plan_type: Option, pub(crate) resets_in_seconds: Option, + pub(crate) rate_limits: Option, } impl std::fmt::Display for UsageLimitReachedError { @@ -266,11 +268,22 @@ pub fn get_error_message_ui(e: &CodexErr) -> String { mod tests { use super::*; + fn rate_limit_snapshot() -> RateLimitSnapshot { + RateLimitSnapshot { + primary_used_percent: 0.5, + secondary_used_percent: 0.3, + primary_to_secondary_ratio_percent: 0.7, + primary_window_minutes: 60, + secondary_window_minutes: 120, + } + } + #[test] fn usage_limit_reached_error_formats_plus_plan() { let err = UsageLimitReachedError { plan_type: Some(PlanType::Known(KnownPlan::Plus)), resets_in_seconds: None, + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -283,6 +296,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: Some(PlanType::Known(KnownPlan::Free)), resets_in_seconds: Some(3600), + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -295,6 +309,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: None, resets_in_seconds: None, + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -307,6 +322,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: Some(PlanType::Known(KnownPlan::Team)), resets_in_seconds: Some(3600), + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -319,6 +335,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: Some(PlanType::Known(KnownPlan::Business)), resets_in_seconds: None, + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -331,6 +348,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: Some(PlanType::Known(KnownPlan::Pro)), resets_in_seconds: None, + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -343,6 +361,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: None, resets_in_seconds: Some(5 * 60), + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -355,6 +374,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: Some(PlanType::Known(KnownPlan::Plus)), resets_in_seconds: Some(3 * 3600 + 32 * 60), + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -367,6 +387,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: None, resets_in_seconds: Some(2 * 86_400 + 3 * 3600 + 5 * 60), + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), @@ -379,6 +400,7 @@ mod tests { let err = UsageLimitReachedError { plan_type: None, resets_in_seconds: Some(30), + rate_limits: Some(rate_limit_snapshot()), }; assert_eq!( err.to_string(), diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index d885faaf..f6a73d13 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -23,6 +23,7 @@ use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; use core_test_support::non_sandbox_test; use core_test_support::responses; +use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use futures::StreamExt; use serde_json::json; @@ -797,7 +798,33 @@ async fn token_count_includes_rate_limits_snapshot() { .await .unwrap(); - let token_event = wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await; + let first_token_event = + wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await; + let rate_limit_only = match first_token_event { + EventMsg::TokenCount(ev) => ev, + _ => unreachable!(), + }; + + let rate_limit_json = serde_json::to_value(&rate_limit_only).unwrap(); + pretty_assertions::assert_eq!( + rate_limit_json, + json!({ + "info": null, + "rate_limits": { + "primary_used_percent": 12.5, + "secondary_used_percent": 40.0, + "primary_to_secondary_ratio_percent": 75.0, + "primary_window_minutes": 10, + "secondary_window_minutes": 60 + } + }) + ); + + let token_event = wait_for_event( + &codex, + |msg| matches!(msg, EventMsg::TokenCount(ev) if ev.info.is_some()), + ) + .await; let final_payload = match token_event { EventMsg::TokenCount(ev) => ev, _ => unreachable!(), @@ -846,6 +873,80 @@ async fn token_count_includes_rate_limits_snapshot() { wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn usage_limit_error_emits_rate_limit_event() -> anyhow::Result<()> { + let server = MockServer::start().await; + + let response = ResponseTemplate::new(429) + .insert_header("x-codex-primary-used-percent", "100.0") + .insert_header("x-codex-secondary-used-percent", "87.5") + .insert_header("x-codex-primary-over-secondary-limit-percent", "95.0") + .insert_header("x-codex-primary-window-minutes", "15") + .insert_header("x-codex-secondary-window-minutes", "60") + .set_body_json(json!({ + "error": { + "type": "usage_limit_reached", + "message": "limit reached", + "resets_in_seconds": 42, + "plan_type": "pro" + } + })); + + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(response) + .expect(1) + .mount(&server) + .await; + + let mut builder = test_codex(); + let codex_fixture = builder.build(&server).await?; + let codex = codex_fixture.codex.clone(); + + let expected_limits = json!({ + "primary_used_percent": 100.0, + "secondary_used_percent": 87.5, + "primary_to_secondary_ratio_percent": 95.0, + "primary_window_minutes": 15, + "secondary_window_minutes": 60 + }); + + let submission_id = codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .expect("submission should succeed while emitting usage limit error events"); + + let token_event = wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await; + let EventMsg::TokenCount(event) = token_event else { + unreachable!(); + }; + + let event_json = serde_json::to_value(&event).expect("serialize token count event"); + pretty_assertions::assert_eq!( + event_json, + json!({ + "info": null, + "rate_limits": expected_limits + }) + ); + + let error_event = wait_for_event(&codex, |msg| matches!(msg, EventMsg::Error(_))).await; + let EventMsg::Error(error_event) = error_event else { + unreachable!(); + }; + assert!( + error_event.message.to_lowercase().contains("usage limit"), + "unexpected error message for submission {submission_id}: {}", + error_event.message + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn azure_overrides_assign_properties_used_for_responses_url() { let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" }; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index c45a89d7..70159bd1 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -592,11 +592,11 @@ impl TokenUsageInfo { #[derive(Debug, Clone, Deserialize, Serialize, TS)] pub struct TokenCountEvent { pub info: Option, - pub rate_limits: Option, + pub rate_limits: Option, } #[derive(Debug, Clone, Deserialize, Serialize, TS)] -pub struct RateLimitSnapshotEvent { +pub struct RateLimitSnapshot { /// Percentage (0-100) of the primary window that has been consumed. pub primary_used_percent: f64, /// Percentage (0-100) of the secondary window that has been consumed. diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index b665d525..6e4026b4 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -30,7 +30,7 @@ use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::Op; use codex_core::protocol::PatchApplyBeginEvent; -use codex_core::protocol::RateLimitSnapshotEvent; +use codex_core::protocol::RateLimitSnapshot; use codex_core::protocol::ReviewRequest; use codex_core::protocol::StreamErrorEvent; use codex_core::protocol::TaskCompleteEvent; @@ -132,6 +132,10 @@ impl RateLimitWarningState { secondary_used_percent: f64, primary_used_percent: f64, ) -> Vec { + if secondary_used_percent == 100.0 || primary_used_percent == 100.0 { + return Vec::new(); + } + let mut warnings = Vec::new(); let mut highest_secondary: Option = None; @@ -185,7 +189,7 @@ pub(crate) struct ChatWidget { session_header: SessionHeader, initial_user_message: Option, token_info: Option, - rate_limit_snapshot: Option, + rate_limit_snapshot: Option, rate_limit_warnings: RateLimitWarningState, // Stream lifecycle controller stream_controller: Option, @@ -353,11 +357,13 @@ impl ChatWidget { } pub(crate) fn set_token_info(&mut self, info: Option) { - self.bottom_pane.set_token_usage(info.clone()); - self.token_info = info; + if info.is_some() { + self.bottom_pane.set_token_usage(info.clone()); + self.token_info = info; + } } - fn on_rate_limit_snapshot(&mut self, snapshot: Option) { + fn on_rate_limit_snapshot(&mut self, snapshot: Option) { if let Some(snapshot) = snapshot { let warnings = self.rate_limit_warnings.take_warnings( snapshot.secondary_used_percent, diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 1a55b2a8..7406f98d 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -24,7 +24,7 @@ use codex_core::plan_tool::UpdatePlanArgs; use codex_core::project_doc::discover_project_doc_paths; use codex_core::protocol::FileChange; use codex_core::protocol::McpInvocation; -use codex_core::protocol::RateLimitSnapshotEvent; +use codex_core::protocol::RateLimitSnapshot; use codex_core::protocol::SandboxPolicy; use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::TokenUsage; @@ -1082,7 +1082,7 @@ pub(crate) fn new_status_output( config: &Config, usage: &TokenUsage, session_id: &Option, - rate_limits: Option<&RateLimitSnapshotEvent>, + rate_limits: Option<&RateLimitSnapshot>, ) -> PlainHistoryCell { let mut lines: Vec> = Vec::new(); lines.push("/status".magenta().into()); @@ -1611,7 +1611,7 @@ fn format_mcp_invocation<'a>(invocation: McpInvocation) -> Line<'a> { invocation_spans.into() } -fn build_status_limit_lines(snapshot: Option<&RateLimitSnapshotEvent>) -> Vec> { +fn build_status_limit_lines(snapshot: Option<&RateLimitSnapshot>) -> Vec> { let mut lines: Vec> = vec![vec![padded_emoji("⏱️").into(), "Usage Limits".bold()].into()];