Wait for requested delay in rate limit errors (#2266)

Fixes: https://github.com/openai/codex/issues/2131

Response doesn't have the delay in a separate field (yet) so parse the
message.
This commit is contained in:
pakrym-oai
2025-08-13 15:43:54 -07:00
committed by GitHub
parent 37fc4185ef
commit 41eb59a07d
5 changed files with 152 additions and 22 deletions

View File

@@ -29,6 +29,7 @@ mcp-types = { path = "../mcp-types" }
mime_guess = "2.0"
os_info = "3.12.0"
rand = "0.9"
regex-lite = "0.1.6"
reqwest = { version = "0.12", features = ["json", "stream"] }
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11"
@@ -76,7 +77,6 @@ core_test_support = { path = "tests/common" }
maplit = "1.0.2"
predicates = "3"
pretty_assertions = "1.4.1"
regex-lite = "0.1.6"
tempfile = "3"
tokio-test = "0.4"
walkdir = "2.5.0"

View File

@@ -213,7 +213,9 @@ async fn process_chat_sse<S>(
let sse = match timeout(idle_timeout, stream.next()).await {
Ok(Some(Ok(ev))) => ev,
Ok(Some(Err(e))) => {
let _ = tx_event.send(Err(CodexErr::Stream(e.to_string()))).await;
let _ = tx_event
.send(Err(CodexErr::Stream(e.to_string(), None)))
.await;
return;
}
Ok(None) => {
@@ -228,7 +230,10 @@ async fn process_chat_sse<S>(
}
Err(_) => {
let _ = tx_event
.send(Err(CodexErr::Stream("idle timeout waiting for SSE".into())))
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".into(),
None,
)))
.await;
return;
}

View File

@@ -1,5 +1,6 @@
use std::io::BufRead;
use std::path::Path;
use std::sync::OnceLock;
use std::time::Duration;
use bytes::Bytes;
@@ -7,6 +8,7 @@ use codex_login::AuthMode;
use codex_login::CodexAuth;
use eventsource_stream::Eventsource;
use futures::prelude::*;
use regex_lite::Regex;
use reqwest::StatusCode;
use serde::Deserialize;
use serde::Serialize;
@@ -49,7 +51,9 @@ struct ErrorResponse {
#[derive(Debug, Deserialize)]
struct Error {
r#type: String,
r#type: Option<String>,
code: Option<String>,
message: Option<String>,
}
#[derive(Clone)]
@@ -263,14 +267,18 @@ impl ModelClient {
if status == StatusCode::TOO_MANY_REQUESTS {
let body = res.json::<ErrorResponse>().await.ok();
if let Some(ErrorResponse {
error: Error { r#type, .. },
error:
Error {
r#type: Some(error_type),
..
},
}) = body
{
if r#type == "usage_limit_reached" {
if error_type == "usage_limit_reached" {
return Err(CodexErr::UsageLimitReached(UsageLimitReachedError {
plan_type: auth.and_then(|a| a.get_plan_type()),
}));
} else if r#type == "usage_not_included" {
} else if error_type == "usage_not_included" {
return Err(CodexErr::UsageNotIncluded);
}
}
@@ -366,13 +374,14 @@ async fn process_sse<S>(
// If the stream stays completely silent for an extended period treat it as disconnected.
// The response id returned from the "complete" message.
let mut response_completed: Option<ResponseCompleted> = None;
let mut response_error: Option<CodexErr> = None;
loop {
let sse = match timeout(idle_timeout, stream.next()).await {
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(e))) => {
debug!("SSE Error: {e:#}");
let event = CodexErr::Stream(e.to_string());
let event = CodexErr::Stream(e.to_string(), None);
let _ = tx_event.send(Err(event)).await;
return;
}
@@ -390,9 +399,10 @@ async fn process_sse<S>(
}
None => {
let _ = tx_event
.send(Err(CodexErr::Stream(
.send(Err(response_error.unwrap_or(CodexErr::Stream(
"stream closed before response.completed".into(),
)))
None,
))))
.await;
}
}
@@ -400,7 +410,10 @@ async fn process_sse<S>(
}
Err(_) => {
let _ = tx_event
.send(Err(CodexErr::Stream("idle timeout waiting for SSE".into())))
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".into(),
None,
)))
.await;
return;
}
@@ -478,15 +491,25 @@ async fn process_sse<S>(
}
"response.failed" => {
if let Some(resp_val) = event.response {
let error = resp_val
.get("error")
.and_then(|v| v.get("message"))
.and_then(|v| v.as_str())
.unwrap_or("response.failed event received");
response_error = Some(CodexErr::Stream(
"response.failed event received".to_string(),
None,
));
let _ = tx_event
.send(Err(CodexErr::Stream(error.to_string())))
.await;
let error = resp_val.get("error");
if let Some(error) = error {
match serde_json::from_value::<Error>(error.clone()) {
Ok(error) => {
let delay = try_parse_retry_after(&error);
let message = error.message.unwrap_or_default();
response_error = Some(CodexErr::Stream(message, delay));
}
Err(e) => {
debug!("failed to parse ErrorResponse: {e}");
}
}
}
}
}
// Final response completed includes array of output items & id
@@ -550,6 +573,40 @@ async fn stream_from_fixture(
Ok(ResponseStream { rx_event })
}
fn rate_limit_regex() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
#[expect(clippy::unwrap_used)]
RE.get_or_init(|| Regex::new(r"Please try again in (\d+(?:\.\d+)?)(s|ms)").unwrap())
}
fn try_parse_retry_after(err: &Error) -> Option<Duration> {
if err.code != Some("rate_limit_exceeded".to_string()) {
return None;
}
// parse the Please try again in 1.898s format using regex
let re = rate_limit_regex();
if let Some(message) = &err.message
&& let Some(captures) = re.captures(message)
{
let seconds = captures.get(1);
let unit = captures.get(2);
if let (Some(value), Some(unit)) = (seconds, unit) {
let value = value.as_str().parse::<f64>().ok()?;
let unit = unit.as_str();
if unit == "s" {
return Some(Duration::from_secs_f64(value));
} else if unit == "ms" {
return Some(Duration::from_millis(value as u64));
}
}
}
None
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
@@ -735,13 +792,49 @@ mod tests {
matches!(events[0], Ok(ResponseEvent::OutputItemDone(_)));
match &events[1] {
Err(CodexErr::Stream(msg)) => {
Err(CodexErr::Stream(msg, _)) => {
assert_eq!(msg, "stream closed before response.completed")
}
other => panic!("unexpected second event: {other:?}"),
}
}
#[tokio::test]
async fn error_when_error_event() {
let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_689bcf18d7f08194bf3440ba62fe05d803fee0cdac429894","object":"response","created_at":1755041560,"status":"failed","background":false,"error":{"code":"rate_limit_exceeded","message":"Rate limit reached for gpt-5 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."}, "usage":null,"user":null,"metadata":{}}}"#;
let sse1 = format!("event: response.failed\ndata: {raw_error}\n\n");
let provider = ModelProviderInfo {
name: "test".to_string(),
base_url: Some("https://test.com".to_string()),
env_key: Some("TEST_API_KEY".to_string()),
env_key_instructions: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(1000),
requires_openai_auth: false,
};
let events = collect_events(&[sse1.as_bytes()], provider).await;
assert_eq!(events.len(), 1);
match &events[0] {
Err(CodexErr::Stream(msg, delay)) => {
assert_eq!(
msg,
"Rate limit reached for gpt-5 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."
);
assert_eq!(*delay, Some(Duration::from_secs_f64(11.054)));
}
other => panic!("unexpected second event: {other:?}"),
}
}
// ────────────────────────────
// Table-driven test from `main`
// ────────────────────────────
@@ -840,4 +933,27 @@ mod tests {
);
}
}
#[test]
fn test_try_parse_retry_after() {
let err = Error {
r#type: None,
message: Some("Rate limit reached for gpt-5 in organization org- on tokens per min (TPM): Limit 1, Used 1, Requested 19304. Please try again in 28ms. Visit https://platform.openai.com/account/rate-limits to learn more.".to_string()),
code: Some("rate_limit_exceeded".to_string()),
};
let delay = try_parse_retry_after(&err);
assert_eq!(delay, Some(Duration::from_millis(28)));
}
#[test]
fn test_try_parse_retry_after_no_delay() {
let err = Error {
r#type: None,
message: Some("Rate limit reached for gpt-5 in organization <ORG> on tokens per min (TPM): Limit 30000, Used 6899, Requested 24050. Please try again in 1.898s. Visit https://platform.openai.com/account/rate-limits to learn more.".to_string()),
code: Some("rate_limit_exceeded".to_string()),
};
let delay = try_parse_retry_after(&err);
assert_eq!(delay, Some(Duration::from_secs_f64(1.898)));
}
}

View File

@@ -1281,7 +1281,10 @@ async fn run_turn(
let max_retries = sess.client.get_provider().stream_max_retries();
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
let delay = match e {
CodexErr::Stream(_, Some(delay)) => delay,
_ => backoff(retries),
};
warn!(
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
);
@@ -1391,6 +1394,7 @@ async fn try_run_turn(
// Treat as a disconnected stream so the caller can retry.
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
};
@@ -2201,6 +2205,7 @@ async fn drain_to_completed(sess: &Session, sub_id: &str, prompt: &Prompt) -> Co
let Some(event) = maybe_event else {
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
};
match event {
@@ -2218,6 +2223,7 @@ async fn drain_to_completed(sess: &Session, sub_id: &str, prompt: &Prompt) -> Co
None => {
return Err(CodexErr::Stream(
"token_usage was None in ResponseEvent::Completed".into(),
None,
));
}
};

View File

@@ -1,6 +1,7 @@
use reqwest::StatusCode;
use serde_json;
use std::io;
use std::time::Duration;
use thiserror::Error;
use tokio::task::JoinError;
use uuid::Uuid;
@@ -42,8 +43,10 @@ pub enum CodexErr {
/// handshake has succeeded but **before** it finished emitting `response.completed`.
///
/// The Session loop treats this as a transient error and will automatically retry the turn.
///
/// Optionally includes the requested delay before retrying the turn.
#[error("stream disconnected before completion: {0}")]
Stream(String),
Stream(String, Option<Duration>),
#[error("no conversation with id: {0}")]
ConversationNotFound(Uuid),