From 9846adeabf8593458bc9e3cef28e0dd6c89a59d1 Mon Sep 17 00:00:00 2001 From: aibrahim-oai Date: Fri, 18 Jul 2025 12:12:39 -0700 Subject: [PATCH] Refactor env settings into config (#1601) ## Summary - add OpenAI retry and timeout fields to Config - inject these settings in tests instead of mutating env vars - plumb Config values through client and chat completions logic - document new configuration options ## Testing - `cargo test -p codex-core --no-run` ------ https://chatgpt.com/codex/tasks/task_i_68792c5b04cc832195c03050c8b6ea94 --------- Co-authored-by: Michael Bolin --- codex-rs/config.md | 28 +++++- codex-rs/core/src/chat_completions.rs | 22 ++-- codex-rs/core/src/client.rs | 105 ++++++++++++++++---- codex-rs/core/src/codex.rs | 37 +++++-- codex-rs/core/src/config.rs | 6 ++ codex-rs/core/src/flags.rs | 8 -- codex-rs/core/src/model_provider_info.rs | 48 ++++++++- codex-rs/core/tests/cli_stream.rs | 2 +- codex-rs/core/tests/client.rs | 10 +- codex-rs/core/tests/live_agent.rs | 22 +--- codex-rs/core/tests/previous_response_id.rs | 13 ++- codex-rs/core/tests/stream_no_completed.rs | 21 ++-- 12 files changed, 228 insertions(+), 94 deletions(-) diff --git a/codex-rs/config.md b/codex-rs/config.md index 438b7e76..3d38ded1 100644 --- a/codex-rs/config.md +++ b/codex-rs/config.md @@ -92,6 +92,32 @@ http_headers = { "X-Example-Header" = "example-value" } env_http_headers = { "X-Example-Features": "EXAMPLE_FEATURES" } ``` +### Per-provider network tuning + +The following optional settings control retry behaviour and streaming idle timeouts **per model provider**. They must be specified inside the corresponding `[model_providers.]` block in `config.toml`. (Older releases accepted top‑level keys; those are now ignored.) + +Example: + +```toml +[model_providers.openai] +name = "OpenAI" +base_url = "https://api.openai.com/v1" +env_key = "OPENAI_API_KEY" +# network tuning overrides (all optional; falls back to built‑in defaults) +request_max_retries = 4 # retry failed HTTP requests +stream_max_retries = 10 # retry dropped SSE streams +stream_idle_timeout_ms = 300000 # 5m idle timeout +``` + +#### request_max_retries +How many times Codex will retry a failed HTTP request to the model provider. Defaults to `4`. + +#### stream_max_retries +Number of times Codex will attempt to reconnect when a streaming response is interrupted. Defaults to `10`. + +#### stream_idle_timeout_ms +How long Codex will wait for activity on a streaming response before treating the connection as lost. Defaults to `300_000` (5 minutes). + ## model_provider Identifies which provider to use from the `model_providers` map. Defaults to `"openai"`. You can override the `base_url` for the built-in `openai` provider via the `OPENAI_BASE_URL` environment variable. @@ -444,7 +470,7 @@ Currently, `"vscode"` is the default, though Codex does not verify VS Code is in ## hide_agent_reasoning -Codex intermittently emits "reasoning" events that show the model’s internal "thinking" before it produces a final answer. Some users may find these events distracting, especially in CI logs or minimal terminal output. +Codex intermittently emits "reasoning" events that show the model's internal "thinking" before it produces a final answer. Some users may find these events distracting, especially in CI logs or minimal terminal output. Setting `hide_agent_reasoning` to `true` suppresses these events in **both** the TUI as well as the headless `exec` sub-command: diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index ad7b5595..35045c8e 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -21,8 +21,6 @@ use crate::client_common::ResponseEvent; use crate::client_common::ResponseStream; use crate::error::CodexErr; use crate::error::Result; -use crate::flags::OPENAI_REQUEST_MAX_RETRIES; -use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS; use crate::models::ContentItem; use crate::models::ResponseItem; use crate::openai_tools::create_tools_json_for_chat_completions_api; @@ -121,6 +119,7 @@ pub(crate) async fn stream_chat_completions( ); let mut attempt = 0; + let max_retries = provider.request_max_retries(); loop { attempt += 1; @@ -136,7 +135,11 @@ pub(crate) async fn stream_chat_completions( Ok(resp) if resp.status().is_success() => { let (tx_event, rx_event) = mpsc::channel::>(1600); let stream = resp.bytes_stream().map_err(CodexErr::Reqwest); - tokio::spawn(process_chat_sse(stream, tx_event)); + tokio::spawn(process_chat_sse( + stream, + tx_event, + provider.stream_idle_timeout(), + )); return Ok(ResponseStream { rx_event }); } Ok(res) => { @@ -146,7 +149,7 @@ pub(crate) async fn stream_chat_completions( return Err(CodexErr::UnexpectedStatus(status, body)); } - if attempt > *OPENAI_REQUEST_MAX_RETRIES { + if attempt > max_retries { return Err(CodexErr::RetryLimit(status)); } @@ -162,7 +165,7 @@ pub(crate) async fn stream_chat_completions( tokio::time::sleep(delay).await; } Err(e) => { - if attempt > *OPENAI_REQUEST_MAX_RETRIES { + if attempt > max_retries { return Err(e.into()); } let delay = backoff(attempt); @@ -175,14 +178,15 @@ pub(crate) async fn stream_chat_completions( /// Lightweight SSE processor for the Chat Completions streaming format. The /// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest /// of the pipeline can stay agnostic of the underlying wire format. -async fn process_chat_sse(stream: S, tx_event: mpsc::Sender>) -where +async fn process_chat_sse( + stream: S, + tx_event: mpsc::Sender>, + idle_timeout: Duration, +) where S: Stream> + Unpin, { let mut stream = stream.eventsource(); - let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS; - // State to accumulate a function call across streaming chunks. // OpenAI may split the `arguments` string over multiple `delta` events // until the chunk whose `finish_reason` is `tool_calls` is emitted. We diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index ae7904b8..62fcabe0 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -30,8 +30,6 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::error::CodexErr; use crate::error::Result; use crate::flags::CODEX_RS_SSE_FIXTURE; -use crate::flags::OPENAI_REQUEST_MAX_RETRIES; -use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS; use crate::model_provider_info::ModelProviderInfo; use crate::model_provider_info::WireApi; use crate::models::ResponseItem; @@ -113,7 +111,7 @@ impl ModelClient { if let Some(path) = &*CODEX_RS_SSE_FIXTURE { // short circuit for tests warn!(path, "Streaming from fixture"); - return stream_from_fixture(path).await; + return stream_from_fixture(path, self.provider.clone()).await; } let full_instructions = prompt.get_full_instructions(&self.config.model); @@ -140,6 +138,7 @@ impl ModelClient { ); let mut attempt = 0; + let max_retries = self.provider.request_max_retries(); loop { attempt += 1; @@ -158,7 +157,11 @@ impl ModelClient { // spawn task to process SSE let stream = resp.bytes_stream().map_err(CodexErr::Reqwest); - tokio::spawn(process_sse(stream, tx_event)); + tokio::spawn(process_sse( + stream, + tx_event, + self.provider.stream_idle_timeout(), + )); return Ok(ResponseStream { rx_event }); } @@ -177,7 +180,7 @@ impl ModelClient { return Err(CodexErr::UnexpectedStatus(status, body)); } - if attempt > *OPENAI_REQUEST_MAX_RETRIES { + if attempt > max_retries { return Err(CodexErr::RetryLimit(status)); } @@ -194,7 +197,7 @@ impl ModelClient { tokio::time::sleep(delay).await; } Err(e) => { - if attempt > *OPENAI_REQUEST_MAX_RETRIES { + if attempt > max_retries { return Err(e.into()); } let delay = backoff(attempt); @@ -203,6 +206,10 @@ impl ModelClient { } } } + + pub fn get_provider(&self) -> ModelProviderInfo { + self.provider.clone() + } } #[derive(Debug, Deserialize, Serialize)] @@ -254,14 +261,16 @@ struct ResponseCompletedOutputTokensDetails { reasoning_tokens: u64, } -async fn process_sse(stream: S, tx_event: mpsc::Sender>) -where +async fn process_sse( + stream: S, + tx_event: mpsc::Sender>, + idle_timeout: Duration, +) where S: Stream> + Unpin, { let mut stream = stream.eventsource(); // If the stream stays completely silent for an extended period treat it as disconnected. - let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS; // The response id returned from the "complete" message. let mut response_completed: Option = None; @@ -322,7 +331,7 @@ where // duplicated `output` array embedded in the `response.completed` // payload. That produced two concrete issues: // 1. No real‑time streaming – the user only saw output after the - // entire turn had finished, which broke the “typing” UX and + // entire turn had finished, which broke the "typing" UX and // made long‑running turns look stalled. // 2. Duplicate `function_call_output` items – both the // individual *and* the completed array were forwarded, which @@ -395,7 +404,10 @@ where } /// used in tests to stream from a text SSE file -async fn stream_from_fixture(path: impl AsRef) -> Result { +async fn stream_from_fixture( + path: impl AsRef, + provider: ModelProviderInfo, +) -> Result { let (tx_event, rx_event) = mpsc::channel::>(1600); let f = std::fs::File::open(path.as_ref())?; let lines = std::io::BufReader::new(f).lines(); @@ -409,7 +421,11 @@ async fn stream_from_fixture(path: impl AsRef) -> Result { let rdr = std::io::Cursor::new(content); let stream = ReaderStream::new(rdr).map_err(CodexErr::Io); - tokio::spawn(process_sse(stream, tx_event)); + tokio::spawn(process_sse( + stream, + tx_event, + provider.stream_idle_timeout(), + )); Ok(ResponseStream { rx_event }) } @@ -429,7 +445,10 @@ mod tests { /// Runs the SSE parser on pre-chunked byte slices and returns every event /// (including any final `Err` from a stream-closure check). - async fn collect_events(chunks: &[&[u8]]) -> Vec> { + async fn collect_events( + chunks: &[&[u8]], + provider: ModelProviderInfo, + ) -> Vec> { let mut builder = IoBuilder::new(); for chunk in chunks { builder.read(chunk); @@ -438,7 +457,7 @@ mod tests { let reader = builder.build(); let stream = ReaderStream::new(reader).map_err(CodexErr::Io); let (tx, mut rx) = mpsc::channel::>(16); - tokio::spawn(process_sse(stream, tx)); + tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout())); let mut events = Vec::new(); while let Some(ev) = rx.recv().await { @@ -449,7 +468,10 @@ mod tests { /// Builds an in-memory SSE stream from JSON fixtures and returns only the /// successfully parsed events (panics on internal channel errors). - async fn run_sse(events: Vec) -> Vec { + async fn run_sse( + events: Vec, + provider: ModelProviderInfo, + ) -> Vec { let mut body = String::new(); for e in events { let kind = e @@ -465,7 +487,7 @@ mod tests { let (tx, mut rx) = mpsc::channel::>(8); let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io); - tokio::spawn(process_sse(stream, tx)); + tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout())); let mut out = Vec::new(); while let Some(ev) = rx.recv().await { @@ -510,7 +532,25 @@ mod tests { let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n"); let sse3 = format!("event: response.completed\ndata: {completed}\n\n"); - let events = collect_events(&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()]).await; + let provider = ModelProviderInfo { + name: "test".to_string(), + base_url: "https://test.com".to_string(), + env_key: Some("TEST_API_KEY".to_string()), + env_key_instructions: None, + wire_api: WireApi::Responses, + query_params: None, + http_headers: None, + env_http_headers: None, + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: Some(1000), + }; + + let events = collect_events( + &[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()], + provider, + ) + .await; assert_eq!(events.len(), 3); @@ -551,8 +591,21 @@ mod tests { .to_string(); let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n"); + let provider = ModelProviderInfo { + name: "test".to_string(), + base_url: "https://test.com".to_string(), + env_key: Some("TEST_API_KEY".to_string()), + env_key_instructions: None, + wire_api: WireApi::Responses, + query_params: None, + http_headers: None, + env_http_headers: None, + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: Some(1000), + }; - let events = collect_events(&[sse1.as_bytes()]).await; + let events = collect_events(&[sse1.as_bytes()], provider).await; assert_eq!(events.len(), 2); @@ -640,7 +693,21 @@ mod tests { let mut evs = vec![case.event]; evs.push(completed.clone()); - let out = run_sse(evs).await; + let provider = ModelProviderInfo { + name: "test".to_string(), + base_url: "https://test.com".to_string(), + env_key: Some("TEST_API_KEY".to_string()), + env_key_instructions: None, + wire_api: WireApi::Responses, + query_params: None, + http_headers: None, + env_http_headers: None, + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: Some(1000), + }; + + let out = run_sse(evs, provider).await; assert_eq!(out.len(), case.expected_len, "case {}", case.name); assert!( (case.expect_first)(&out[0]), diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 246198c0..df1ffad5 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -49,7 +49,6 @@ use crate::exec::ExecToolCallOutput; use crate::exec::SandboxType; use crate::exec::process_exec_tool_call; use crate::exec_env::create_env; -use crate::flags::OPENAI_STREAM_MAX_RETRIES; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_tool_call::handle_mcp_tool_call; use crate::models::ContentItem; @@ -1027,12 +1026,13 @@ async fn run_turn( Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted), Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), Err(e) => { - if retries < *OPENAI_STREAM_MAX_RETRIES { + // Use the configured provider-specific stream retry budget. + let max_retries = sess.client.get_provider().stream_max_retries(); + if retries < max_retries { retries += 1; let delay = backoff(retries); warn!( - "stream disconnected - retrying turn ({retries}/{} in {delay:?})...", - *OPENAI_STREAM_MAX_RETRIES + "stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...", ); // Surface retry information to any UI/front‑end so the @@ -1041,8 +1041,7 @@ async fn run_turn( sess.notify_background_event( &sub_id, format!( - "stream error: {e}; retrying {retries}/{} in {:?}…", - *OPENAI_STREAM_MAX_RETRIES, delay + "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…" ), ) .await; @@ -1124,7 +1123,28 @@ async fn try_run_turn( let mut stream = sess.client.clone().stream(&prompt).await?; let mut output = Vec::new(); - while let Some(Ok(event)) = stream.next().await { + loop { + // Poll the next item from the model stream. We must inspect *both* Ok and Err + // cases so that transient stream failures (e.g., dropped SSE connection before + // `response.completed`) bubble up and trigger the caller's retry logic. + let event = stream.next().await; + let Some(event) = event else { + // Channel closed without yielding a final Completed event or explicit error. + // Treat as a disconnected stream so the caller can retry. + return Err(CodexErr::Stream( + "stream closed before response.completed".into(), + )); + }; + + let event = match event { + Ok(ev) => ev, + Err(e) => { + // Propagate the underlying stream error to the caller (run_turn), which + // will apply the configured `stream_max_retries` policy. + return Err(e); + } + }; + match event { ResponseEvent::Created => { let mut state = sess.state.lock().unwrap(); @@ -1165,7 +1185,7 @@ async fn try_run_turn( let mut state = sess.state.lock().unwrap(); state.previous_response_id = Some(response_id); - break; + return Ok(output); } ResponseEvent::OutputTextDelta(delta) => { let event = Event { @@ -1183,7 +1203,6 @@ async fn try_run_turn( } } } - Ok(output) } async fn handle_response_item( diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index d67e692f..d5b28453 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -682,6 +682,9 @@ name = "OpenAI using Chat Completions" base_url = "https://api.openai.com/v1" env_key = "OPENAI_API_KEY" wire_api = "chat" +request_max_retries = 4 # retry failed HTTP requests +stream_max_retries = 10 # retry dropped SSE streams +stream_idle_timeout_ms = 300000 # 5m idle timeout [profiles.o3] model = "o3" @@ -722,6 +725,9 @@ disable_response_storage = true query_params: None, http_headers: None, env_http_headers: None, + request_max_retries: Some(4), + stream_max_retries: Some(10), + stream_idle_timeout_ms: Some(300_000), }; let model_provider_map = { let mut model_provider_map = built_in_model_providers(); diff --git a/codex-rs/core/src/flags.rs b/codex-rs/core/src/flags.rs index c21ef670..c1504054 100644 --- a/codex-rs/core/src/flags.rs +++ b/codex-rs/core/src/flags.rs @@ -11,14 +11,6 @@ env_flags! { pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| { value.parse().map(Duration::from_millis) }; - pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4; - pub OPENAI_STREAM_MAX_RETRIES: u64 = 10; - - // We generally don't want to disconnect; this updates the timeout to be five minutes - // which matches the upstream typescript codex impl. - pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| { - value.parse().map(Duration::from_millis) - }; /// Fixture path for offline tests (see client.rs). pub CODEX_RS_SSE_FIXTURE: Option<&str> = None; diff --git a/codex-rs/core/src/model_provider_info.rs b/codex-rs/core/src/model_provider_info.rs index b38c912d..72ef58c6 100644 --- a/codex-rs/core/src/model_provider_info.rs +++ b/codex-rs/core/src/model_provider_info.rs @@ -9,6 +9,7 @@ use serde::Deserialize; use serde::Serialize; use std::collections::HashMap; use std::env::VarError; +use std::time::Duration; use crate::error::EnvVarError; use crate::openai_api_key::get_openai_api_key; @@ -16,6 +17,9 @@ use crate::openai_api_key::get_openai_api_key; /// Value for the `OpenAI-Originator` header that is sent with requests to /// OpenAI. const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs"; +const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000; +const DEFAULT_STREAM_MAX_RETRIES: u64 = 10; +const DEFAULT_REQUEST_MAX_RETRIES: u64 = 4; /// Wire protocol that the provider speaks. Most third-party services only /// implement the classic OpenAI Chat Completions JSON schema, whereas OpenAI @@ -26,7 +30,7 @@ const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs"; #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum WireApi { - /// The experimental “Responses” API exposed by OpenAI at `/v1/responses`. + /// The experimental "Responses" API exposed by OpenAI at `/v1/responses`. Responses, /// Regular Chat Completions compatible with `/v1/chat/completions`. @@ -64,6 +68,16 @@ pub struct ModelProviderInfo { /// value should be used. If the environment variable is not set, or the /// value is empty, the header will not be included in the request. pub env_http_headers: Option>, + + /// Maximum number of times to retry a failed HTTP request to this provider. + pub request_max_retries: Option, + + /// Number of times to retry reconnecting a dropped streaming response before failing. + pub stream_max_retries: Option, + + /// Idle timeout (in milliseconds) to wait for activity on a streaming response before treating + /// the connection as lost. + pub stream_idle_timeout_ms: Option, } impl ModelProviderInfo { @@ -161,6 +175,25 @@ impl ModelProviderInfo { None => Ok(None), } } + + /// Effective maximum number of request retries for this provider. + pub fn request_max_retries(&self) -> u64 { + self.request_max_retries + .unwrap_or(DEFAULT_REQUEST_MAX_RETRIES) + } + + /// Effective maximum number of stream reconnection attempts for this provider. + pub fn stream_max_retries(&self) -> u64 { + self.stream_max_retries + .unwrap_or(DEFAULT_STREAM_MAX_RETRIES) + } + + /// Effective idle timeout for streaming responses. + pub fn stream_idle_timeout(&self) -> Duration { + self.stream_idle_timeout_ms + .map(Duration::from_millis) + .unwrap_or(Duration::from_millis(DEFAULT_STREAM_IDLE_TIMEOUT_MS)) + } } /// Built-in default provider list. @@ -205,6 +238,10 @@ pub fn built_in_model_providers() -> HashMap { .into_iter() .collect(), ), + // Use global defaults for retry/timeout unless overridden in config.toml. + request_max_retries: None, + stream_max_retries: None, + stream_idle_timeout_ms: None, }, ), ] @@ -234,6 +271,9 @@ base_url = "http://localhost:11434/v1" query_params: None, http_headers: None, env_http_headers: None, + request_max_retries: None, + stream_max_retries: None, + stream_idle_timeout_ms: None, }; let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap(); @@ -259,6 +299,9 @@ query_params = { api-version = "2025-04-01-preview" } }), http_headers: None, env_http_headers: None, + request_max_retries: None, + stream_max_retries: None, + stream_idle_timeout_ms: None, }; let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap(); @@ -287,6 +330,9 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" } env_http_headers: Some(maplit::hashmap! { "X-Example-Env-Header".to_string() => "EXAMPLE_ENV_VAR".to_string(), }), + request_max_retries: None, + stream_max_retries: None, + stream_idle_timeout_ms: None, }; let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap(); diff --git a/codex-rs/core/tests/cli_stream.rs b/codex-rs/core/tests/cli_stream.rs index 3669b93f..23ee0a3c 100644 --- a/codex-rs/core/tests/cli_stream.rs +++ b/codex-rs/core/tests/cli_stream.rs @@ -173,7 +173,7 @@ async fn integration_creates_and_checks_session_file() { // 5. Sessions are written asynchronously; wait briefly for the directory to appear. let sessions_dir = home.path().join("sessions"); let start = Instant::now(); - while !sessions_dir.exists() && start.elapsed() < Duration::from_secs(2) { + while !sessions_dir.exists() && start.elapsed() < Duration::from_secs(3) { std::thread::sleep(Duration::from_millis(50)); } diff --git a/codex-rs/core/tests/client.rs b/codex-rs/core/tests/client.rs index f4fb58f5..964710b8 100644 --- a/codex-rs/core/tests/client.rs +++ b/codex-rs/core/tests/client.rs @@ -49,13 +49,6 @@ async fn includes_session_id_and_model_headers_in_request() { .mount(&server) .await; - // Environment - // Update environment – `set_var` is `unsafe` starting with the 2024 - // edition so we group the calls into a single `unsafe { … }` block. - unsafe { - std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0"); - std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0"); - } let model_provider = ModelProviderInfo { name: "openai".into(), base_url: format!("{}/v1", server.uri()), @@ -72,6 +65,9 @@ async fn includes_session_id_and_model_headers_in_request() { .collect(), ), env_http_headers: None, + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: None, }; // Init session diff --git a/codex-rs/core/tests/live_agent.rs b/codex-rs/core/tests/live_agent.rs index c21f9d00..26a5539d 100644 --- a/codex-rs/core/tests/live_agent.rs +++ b/codex-rs/core/tests/live_agent.rs @@ -45,22 +45,10 @@ async fn spawn_codex() -> Result { "OPENAI_API_KEY must be set for live tests" ); - // Environment tweaks to keep the tests snappy and inexpensive while still - // exercising retry/robustness logic. - // - // NOTE: Starting with the 2024 edition `std::env::set_var` is `unsafe` - // because changing the process environment races with any other threads - // that might be performing environment look-ups at the same time. - // Restrict the unsafety to this tiny block that happens at the very - // beginning of the test, before we spawn any background tasks that could - // observe the environment. - unsafe { - std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "2"); - std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "2"); - } - let codex_home = TempDir::new().unwrap(); - let config = load_default_config_for_test(&codex_home); + let mut config = load_default_config_for_test(&codex_home); + config.model_provider.request_max_retries = Some(2); + config.model_provider.stream_max_retries = Some(2); let (agent, _init_id) = Codex::spawn(config, std::sync::Arc::new(Notify::new())).await?; Ok(agent) @@ -79,7 +67,7 @@ async fn live_streaming_and_prev_id_reset() { let codex = spawn_codex().await.unwrap(); - // ---------- Task 1 ---------- + // ---------- Task 1 ---------- codex .submit(Op::UserInput { items: vec![InputItem::Text { @@ -113,7 +101,7 @@ async fn live_streaming_and_prev_id_reset() { "Agent did not stream any AgentMessage before TaskComplete" ); - // ---------- Task 2 (same session) ---------- + // ---------- Task 2 (same session) ---------- codex .submit(Op::UserInput { items: vec![InputItem::Text { diff --git a/codex-rs/core/tests/previous_response_id.rs b/codex-rs/core/tests/previous_response_id.rs index e64271a0..9630cc10 100644 --- a/codex-rs/core/tests/previous_response_id.rs +++ b/codex-rs/core/tests/previous_response_id.rs @@ -88,13 +88,8 @@ async fn keeps_previous_response_id_between_tasks() { .mount(&server) .await; - // Environment - // Update environment – `set_var` is `unsafe` starting with the 2024 - // edition so we group the calls into a single `unsafe { … }` block. - unsafe { - std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0"); - std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0"); - } + // Configure retry behavior explicitly to avoid mutating process-wide + // environment variables. let model_provider = ModelProviderInfo { name: "openai".into(), base_url: format!("{}/v1", server.uri()), @@ -107,6 +102,10 @@ async fn keeps_previous_response_id_between_tasks() { query_params: None, http_headers: None, env_http_headers: None, + // disable retries so we don't get duplicate calls in this test + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: None, }; // Init session diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index 8883eff3..f2de5de1 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -32,8 +32,6 @@ fn sse_completed(id: &str) -> String { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -// this test is flaky (has race conditions), so we ignore it for now -#[ignore] async fn retries_on_early_close() { #![allow(clippy::unwrap_used)] @@ -72,19 +70,8 @@ async fn retries_on_early_close() { .mount(&server) .await; - // Environment - // - // As of Rust 2024 `std::env::set_var` has been made `unsafe` because - // mutating the process environment is inherently racy when other threads - // are running. We therefore have to wrap every call in an explicit - // `unsafe` block. These are limited to the test-setup section so the - // scope is very small and clearly delineated. - - unsafe { - std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0"); - std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "1"); - std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000"); - } + // Configure retry behavior explicitly to avoid mutating process-wide + // environment variables. let model_provider = ModelProviderInfo { name: "openai".into(), @@ -98,6 +85,10 @@ async fn retries_on_early_close() { query_params: None, http_headers: None, env_http_headers: None, + // exercise retry path: first attempt yields incomplete stream, so allow 1 retry + request_max_retries: Some(0), + stream_max_retries: Some(1), + stream_idle_timeout_ms: Some(2000), }; let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());