Forward Rate limits to the UI (#3965)
We currently get information about rate limits in the response headers. We want to forward them to the clients to have better transparency. UI/UX plans have been discussed and this information is needed.
This commit is contained in:
@@ -716,6 +716,9 @@ where
|
||||
// Not an assistant message – forward immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
|
||||
@@ -11,6 +11,7 @@ use eventsource_stream::Eventsource;
|
||||
use futures::prelude::*;
|
||||
use regex_lite::Regex;
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::header::HeaderMap;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
@@ -40,6 +41,7 @@ use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::WireApi;
|
||||
use crate::openai_model_info::get_model_info;
|
||||
use crate::openai_tools::create_tools_json_for_responses_api;
|
||||
use crate::protocol::RateLimitSnapshotEvent;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::token_data::PlanType;
|
||||
use crate::util::backoff;
|
||||
@@ -274,6 +276,15 @@ impl ModelClient {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
|
||||
if let Some(snapshot) = parse_rate_limit_snapshot(resp.headers())
|
||||
&& tx_event
|
||||
.send(Ok(ResponseEvent::RateLimits(snapshot)))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug!("receiver dropped rate limit snapshot event");
|
||||
}
|
||||
|
||||
// spawn task to process SSE
|
||||
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
|
||||
tokio::spawn(process_sse(
|
||||
@@ -473,6 +484,38 @@ fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option<RateLimitSnapshotEvent> {
|
||||
let primary_used_percent = parse_header_f64(headers, "x-codex-primary-used-percent")?;
|
||||
let weekly_used_percent = parse_header_f64(headers, "x-codex-protection-used-percent")?;
|
||||
let primary_to_weekly_ratio_percent =
|
||||
parse_header_f64(headers, "x-codex-primary-over-protection-limit-percent")?;
|
||||
let primary_window_minutes = parse_header_u64(headers, "x-codex-primary-window-minutes")?;
|
||||
let weekly_window_minutes = parse_header_u64(headers, "x-codex-protection-window-minutes")?;
|
||||
|
||||
Some(RateLimitSnapshotEvent {
|
||||
primary_used_percent,
|
||||
weekly_used_percent,
|
||||
primary_to_weekly_ratio_percent,
|
||||
primary_window_minutes,
|
||||
weekly_window_minutes,
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_header_f64(headers: &HeaderMap, name: &str) -> Option<f64> {
|
||||
parse_header_str(headers, name)?
|
||||
.parse::<f64>()
|
||||
.ok()
|
||||
.filter(|v| v.is_finite())
|
||||
}
|
||||
|
||||
fn parse_header_u64(headers: &HeaderMap, name: &str) -> Option<u64> {
|
||||
parse_header_str(headers, name)?.parse::<u64>().ok()
|
||||
}
|
||||
|
||||
fn parse_header_str<'a>(headers: &'a HeaderMap, name: &str) -> Option<&'a str> {
|
||||
headers.get(name)?.to_str().ok()
|
||||
}
|
||||
|
||||
async fn process_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::error::Result;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::openai_tools::OpenAiTool;
|
||||
use crate::protocol::RateLimitSnapshotEvent;
|
||||
use crate::protocol::TokenUsage;
|
||||
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
|
||||
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
@@ -78,6 +79,7 @@ pub enum ResponseEvent {
|
||||
WebSearchCallBegin {
|
||||
call_id: String,
|
||||
},
|
||||
RateLimits(RateLimitSnapshotEvent),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
|
||||
@@ -98,6 +98,7 @@ use crate::protocol::ListCustomPromptsResponseEvent;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::PatchApplyBeginEvent;
|
||||
use crate::protocol::PatchApplyEndEvent;
|
||||
use crate::protocol::RateLimitSnapshotEvent;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::ReviewOutputEvent;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
@@ -105,6 +106,7 @@ use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::protocol::StreamErrorEvent;
|
||||
use crate::protocol::Submission;
|
||||
use crate::protocol::TaskCompleteEvent;
|
||||
use crate::protocol::TokenCountEvent;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
use crate::protocol::TurnDiffEvent;
|
||||
@@ -257,6 +259,7 @@ struct State {
|
||||
pending_input: Vec<ResponseInputItem>,
|
||||
history: ConversationHistory,
|
||||
token_info: Option<TokenUsageInfo>,
|
||||
latest_rate_limits: Option<RateLimitSnapshotEvent>,
|
||||
}
|
||||
|
||||
/// Context for an initialized model agent
|
||||
@@ -738,16 +741,30 @@ impl Session {
|
||||
async fn update_token_usage_info(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
token_usage: &Option<TokenUsage>,
|
||||
) -> Option<TokenUsageInfo> {
|
||||
token_usage: Option<&TokenUsage>,
|
||||
) {
|
||||
let mut state = self.state.lock().await;
|
||||
let info = TokenUsageInfo::new_or_append(
|
||||
&state.token_info,
|
||||
token_usage,
|
||||
turn_context.client.get_model_context_window(),
|
||||
);
|
||||
state.token_info = info.clone();
|
||||
info
|
||||
if let Some(token_usage) = token_usage {
|
||||
let info = TokenUsageInfo::new_or_append(
|
||||
&state.token_info,
|
||||
&Some(token_usage.clone()),
|
||||
turn_context.client.get_model_context_window(),
|
||||
);
|
||||
state.token_info = info;
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_rate_limits(&self, new_rate_limits: RateLimitSnapshotEvent) {
|
||||
let mut state = self.state.lock().await;
|
||||
state.latest_rate_limits = Some(new_rate_limits);
|
||||
}
|
||||
|
||||
async fn get_token_count_event(&self) -> TokenCountEvent {
|
||||
let state = self.state.lock().await;
|
||||
TokenCountEvent {
|
||||
info: state.token_info.clone(),
|
||||
rate_limits: state.latest_rate_limits.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a user input item to conversation history and also persist a
|
||||
@@ -2136,17 +2153,22 @@ async fn try_run_turn(
|
||||
})
|
||||
.await;
|
||||
}
|
||||
ResponseEvent::RateLimits(snapshot) => {
|
||||
// Update internal state with latest rate limits, but defer sending until
|
||||
// token usage is available to avoid duplicate TokenCount events.
|
||||
sess.update_rate_limits(snapshot).await;
|
||||
}
|
||||
ResponseEvent::Completed {
|
||||
response_id: _,
|
||||
token_usage,
|
||||
} => {
|
||||
let info = sess
|
||||
.update_token_usage_info(turn_context, &token_usage)
|
||||
sess.update_token_usage_info(turn_context, token_usage.as_ref())
|
||||
.await;
|
||||
let token_event = sess.get_token_count_event().await;
|
||||
let _ = sess
|
||||
.send_event(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
|
||||
msg: EventMsg::TokenCount(token_event),
|
||||
})
|
||||
.await;
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
use codex_protocol::models::WebSearchAction;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::load_sse_fixture_with_id;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::wait_for_event;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
@@ -776,6 +777,100 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
||||
assert_eq!(body["input"][5]["id"].as_str(), Some("custom-tool-id"));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn token_count_includes_rate_limits_snapshot() {
|
||||
let server = MockServer::start().await;
|
||||
|
||||
let sse_body = responses::sse(vec![responses::ev_completed_with_tokens("resp_rate", 123)]);
|
||||
|
||||
let response = ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.insert_header("x-codex-primary-used-percent", "12.5")
|
||||
.insert_header("x-codex-protection-used-percent", "40.0")
|
||||
.insert_header("x-codex-primary-over-protection-limit-percent", "75.0")
|
||||
.insert_header("x-codex-primary-window-minutes", "10")
|
||||
.insert_header("x-codex-protection-window-minutes", "60")
|
||||
.set_body_raw(sse_body, "text/event-stream");
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/responses"))
|
||||
.respond_with(response)
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let mut provider = built_in_model_providers()["openai"].clone();
|
||||
provider.base_url = Some(format!("{}/v1", server.uri()));
|
||||
|
||||
let home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&home);
|
||||
config.model_provider = provider;
|
||||
|
||||
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("test"));
|
||||
let codex = conversation_manager
|
||||
.new_conversation(config)
|
||||
.await
|
||||
.expect("create conversation")
|
||||
.conversation;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
text: "hello".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let token_event = wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await;
|
||||
let final_payload = match token_event {
|
||||
EventMsg::TokenCount(ev) => ev,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
// Assert full JSON for the final token count event (usage + rate limits)
|
||||
let final_json = serde_json::to_value(&final_payload).unwrap();
|
||||
pretty_assertions::assert_eq!(
|
||||
final_json,
|
||||
json!({
|
||||
"info": {
|
||||
"total_token_usage": {
|
||||
"input_tokens": 123,
|
||||
"cached_input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
"reasoning_output_tokens": 0,
|
||||
"total_tokens": 123
|
||||
},
|
||||
"last_token_usage": {
|
||||
"input_tokens": 123,
|
||||
"cached_input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
"reasoning_output_tokens": 0,
|
||||
"total_tokens": 123
|
||||
},
|
||||
// Default model is gpt-5 in tests → 272000 context window
|
||||
"model_context_window": 272000
|
||||
},
|
||||
"rate_limits": {
|
||||
"primary_used_percent": 12.5,
|
||||
"weekly_used_percent": 40.0,
|
||||
"primary_to_weekly_ratio_percent": 75.0,
|
||||
"primary_window_minutes": 10,
|
||||
"weekly_window_minutes": 60
|
||||
}
|
||||
})
|
||||
);
|
||||
let usage = final_payload
|
||||
.info
|
||||
.expect("token usage info should be recorded after completion");
|
||||
assert_eq!(usage.total_token_usage.total_tokens, 123);
|
||||
let final_snapshot = final_payload
|
||||
.rate_limits
|
||||
.expect("latest rate limit snapshot should be retained");
|
||||
assert_eq!(final_snapshot.primary_used_percent, 12.5);
|
||||
|
||||
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn azure_overrides_assign_properties_used_for_responses_url() {
|
||||
let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" };
|
||||
|
||||
@@ -589,6 +589,21 @@ impl TokenUsageInfo {
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||
pub struct TokenCountEvent {
|
||||
pub info: Option<TokenUsageInfo>,
|
||||
pub rate_limits: Option<RateLimitSnapshotEvent>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||
pub struct RateLimitSnapshotEvent {
|
||||
/// Percentage (0-100) of the primary window that has been consumed.
|
||||
pub primary_used_percent: f64,
|
||||
/// Percentage (0-100) of the protection window that has been consumed.
|
||||
pub weekly_used_percent: f64,
|
||||
/// Size of the primary window relative to weekly (0-100).
|
||||
pub primary_to_weekly_ratio_percent: f64,
|
||||
/// Rolling window duration for the primary limit, in minutes.
|
||||
pub primary_window_minutes: u64,
|
||||
/// Rolling window duration for the weekly limit, in minutes.
|
||||
pub weekly_window_minutes: u64,
|
||||
}
|
||||
|
||||
// Includes prompts, tools and space to call compact.
|
||||
|
||||
Reference in New Issue
Block a user