diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 1ea1422b..3fb99d1f 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -29,6 +29,7 @@ mcp-types = { path = "../mcp-types" } mime_guess = "2.0" os_info = "3.12.0" rand = "0.9" +regex-lite = "0.1.6" reqwest = { version = "0.12", features = ["json", "stream"] } serde = { version = "1", features = ["derive"] } serde_bytes = "0.11" @@ -76,7 +77,6 @@ core_test_support = { path = "tests/common" } maplit = "1.0.2" predicates = "3" pretty_assertions = "1.4.1" -regex-lite = "0.1.6" tempfile = "3" tokio-test = "0.4" walkdir = "2.5.0" diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 840e808f..fc0ceca5 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -213,7 +213,9 @@ async fn process_chat_sse( let sse = match timeout(idle_timeout, stream.next()).await { Ok(Some(Ok(ev))) => ev, Ok(Some(Err(e))) => { - let _ = tx_event.send(Err(CodexErr::Stream(e.to_string()))).await; + let _ = tx_event + .send(Err(CodexErr::Stream(e.to_string(), None))) + .await; return; } Ok(None) => { @@ -228,7 +230,10 @@ async fn process_chat_sse( } Err(_) => { let _ = tx_event - .send(Err(CodexErr::Stream("idle timeout waiting for SSE".into()))) + .send(Err(CodexErr::Stream( + "idle timeout waiting for SSE".into(), + None, + ))) .await; return; } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index f0229d45..135782bc 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -1,5 +1,6 @@ use std::io::BufRead; use std::path::Path; +use std::sync::OnceLock; use std::time::Duration; use bytes::Bytes; @@ -7,6 +8,7 @@ use codex_login::AuthMode; use codex_login::CodexAuth; use eventsource_stream::Eventsource; use futures::prelude::*; +use regex_lite::Regex; use reqwest::StatusCode; use serde::Deserialize; use serde::Serialize; @@ -49,7 +51,9 @@ struct ErrorResponse { #[derive(Debug, Deserialize)] struct Error { - r#type: String, + r#type: Option, + code: Option, + message: Option, } #[derive(Clone)] @@ -263,14 +267,18 @@ impl ModelClient { if status == StatusCode::TOO_MANY_REQUESTS { let body = res.json::().await.ok(); if let Some(ErrorResponse { - error: Error { r#type, .. }, + error: + Error { + r#type: Some(error_type), + .. + }, }) = body { - if r#type == "usage_limit_reached" { + if error_type == "usage_limit_reached" { return Err(CodexErr::UsageLimitReached(UsageLimitReachedError { plan_type: auth.and_then(|a| a.get_plan_type()), })); - } else if r#type == "usage_not_included" { + } else if error_type == "usage_not_included" { return Err(CodexErr::UsageNotIncluded); } } @@ -366,13 +374,14 @@ async fn process_sse( // If the stream stays completely silent for an extended period treat it as disconnected. // The response id returned from the "complete" message. let mut response_completed: Option = None; + let mut response_error: Option = None; loop { let sse = match timeout(idle_timeout, stream.next()).await { Ok(Some(Ok(sse))) => sse, Ok(Some(Err(e))) => { debug!("SSE Error: {e:#}"); - let event = CodexErr::Stream(e.to_string()); + let event = CodexErr::Stream(e.to_string(), None); let _ = tx_event.send(Err(event)).await; return; } @@ -390,9 +399,10 @@ async fn process_sse( } None => { let _ = tx_event - .send(Err(CodexErr::Stream( + .send(Err(response_error.unwrap_or(CodexErr::Stream( "stream closed before response.completed".into(), - ))) + None, + )))) .await; } } @@ -400,7 +410,10 @@ async fn process_sse( } Err(_) => { let _ = tx_event - .send(Err(CodexErr::Stream("idle timeout waiting for SSE".into()))) + .send(Err(CodexErr::Stream( + "idle timeout waiting for SSE".into(), + None, + ))) .await; return; } @@ -478,15 +491,25 @@ async fn process_sse( } "response.failed" => { if let Some(resp_val) = event.response { - let error = resp_val - .get("error") - .and_then(|v| v.get("message")) - .and_then(|v| v.as_str()) - .unwrap_or("response.failed event received"); + response_error = Some(CodexErr::Stream( + "response.failed event received".to_string(), + None, + )); - let _ = tx_event - .send(Err(CodexErr::Stream(error.to_string()))) - .await; + let error = resp_val.get("error"); + + if let Some(error) = error { + match serde_json::from_value::(error.clone()) { + Ok(error) => { + let delay = try_parse_retry_after(&error); + let message = error.message.unwrap_or_default(); + response_error = Some(CodexErr::Stream(message, delay)); + } + Err(e) => { + debug!("failed to parse ErrorResponse: {e}"); + } + } + } } } // Final response completed – includes array of output items & id @@ -550,6 +573,40 @@ async fn stream_from_fixture( Ok(ResponseStream { rx_event }) } +fn rate_limit_regex() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + + #[expect(clippy::unwrap_used)] + RE.get_or_init(|| Regex::new(r"Please try again in (\d+(?:\.\d+)?)(s|ms)").unwrap()) +} + +fn try_parse_retry_after(err: &Error) -> Option { + if err.code != Some("rate_limit_exceeded".to_string()) { + return None; + } + + // parse the Please try again in 1.898s format using regex + let re = rate_limit_regex(); + if let Some(message) = &err.message + && let Some(captures) = re.captures(message) + { + let seconds = captures.get(1); + let unit = captures.get(2); + + if let (Some(value), Some(unit)) = (seconds, unit) { + let value = value.as_str().parse::().ok()?; + let unit = unit.as_str(); + + if unit == "s" { + return Some(Duration::from_secs_f64(value)); + } else if unit == "ms" { + return Some(Duration::from_millis(value as u64)); + } + } + } + None +} + #[cfg(test)] mod tests { #![allow(clippy::expect_used, clippy::unwrap_used)] @@ -735,13 +792,49 @@ mod tests { matches!(events[0], Ok(ResponseEvent::OutputItemDone(_))); match &events[1] { - Err(CodexErr::Stream(msg)) => { + Err(CodexErr::Stream(msg, _)) => { assert_eq!(msg, "stream closed before response.completed") } other => panic!("unexpected second event: {other:?}"), } } + #[tokio::test] + async fn error_when_error_event() { + let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_689bcf18d7f08194bf3440ba62fe05d803fee0cdac429894","object":"response","created_at":1755041560,"status":"failed","background":false,"error":{"code":"rate_limit_exceeded","message":"Rate limit reached for gpt-5 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."}, "usage":null,"user":null,"metadata":{}}}"#; + + let sse1 = format!("event: response.failed\ndata: {raw_error}\n\n"); + let provider = ModelProviderInfo { + name: "test".to_string(), + base_url: Some("https://test.com".to_string()), + env_key: Some("TEST_API_KEY".to_string()), + env_key_instructions: None, + wire_api: WireApi::Responses, + query_params: None, + http_headers: None, + env_http_headers: None, + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: Some(1000), + requires_openai_auth: false, + }; + + let events = collect_events(&[sse1.as_bytes()], provider).await; + + assert_eq!(events.len(), 1); + + match &events[0] { + Err(CodexErr::Stream(msg, delay)) => { + assert_eq!( + msg, + "Rate limit reached for gpt-5 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more." + ); + assert_eq!(*delay, Some(Duration::from_secs_f64(11.054))); + } + other => panic!("unexpected second event: {other:?}"), + } + } + // ──────────────────────────── // Table-driven test from `main` // ──────────────────────────── @@ -840,4 +933,27 @@ mod tests { ); } } + + #[test] + fn test_try_parse_retry_after() { + let err = Error { + r#type: None, + message: Some("Rate limit reached for gpt-5 in organization org- on tokens per min (TPM): Limit 1, Used 1, Requested 19304. Please try again in 28ms. Visit https://platform.openai.com/account/rate-limits to learn more.".to_string()), + code: Some("rate_limit_exceeded".to_string()), + }; + + let delay = try_parse_retry_after(&err); + assert_eq!(delay, Some(Duration::from_millis(28))); + } + + #[test] + fn test_try_parse_retry_after_no_delay() { + let err = Error { + r#type: None, + message: Some("Rate limit reached for gpt-5 in organization on tokens per min (TPM): Limit 30000, Used 6899, Requested 24050. Please try again in 1.898s. Visit https://platform.openai.com/account/rate-limits to learn more.".to_string()), + code: Some("rate_limit_exceeded".to_string()), + }; + let delay = try_parse_retry_after(&err); + assert_eq!(delay, Some(Duration::from_secs_f64(1.898))); + } } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5ac1392b..207f75a7 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1281,7 +1281,10 @@ async fn run_turn( let max_retries = sess.client.get_provider().stream_max_retries(); if retries < max_retries { retries += 1; - let delay = backoff(retries); + let delay = match e { + CodexErr::Stream(_, Some(delay)) => delay, + _ => backoff(retries), + }; warn!( "stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...", ); @@ -1391,6 +1394,7 @@ async fn try_run_turn( // Treat as a disconnected stream so the caller can retry. return Err(CodexErr::Stream( "stream closed before response.completed".into(), + None, )); }; @@ -2201,6 +2205,7 @@ async fn drain_to_completed(sess: &Session, sub_id: &str, prompt: &Prompt) -> Co let Some(event) = maybe_event else { return Err(CodexErr::Stream( "stream closed before response.completed".into(), + None, )); }; match event { @@ -2218,6 +2223,7 @@ async fn drain_to_completed(sess: &Session, sub_id: &str, prompt: &Prompt) -> Co None => { return Err(CodexErr::Stream( "token_usage was None in ResponseEvent::Completed".into(), + None, )); } }; diff --git a/codex-rs/core/src/error.rs b/codex-rs/core/src/error.rs index 4df66633..356c2301 100644 --- a/codex-rs/core/src/error.rs +++ b/codex-rs/core/src/error.rs @@ -1,6 +1,7 @@ use reqwest::StatusCode; use serde_json; use std::io; +use std::time::Duration; use thiserror::Error; use tokio::task::JoinError; use uuid::Uuid; @@ -42,8 +43,10 @@ pub enum CodexErr { /// handshake has succeeded but **before** it finished emitting `response.completed`. /// /// The Session loop treats this as a transient error and will automatically retry the turn. + /// + /// Optionally includes the requested delay before retrying the turn. #[error("stream disconnected before completion: {0}")] - Stream(String), + Stream(String, Option), #[error("no conversation with id: {0}")] ConversationNotFound(Uuid),