Refactor env settings into config (#1601)

## Summary
- add OpenAI retry and timeout fields to Config
- inject these settings in tests instead of mutating env vars
- plumb Config values through client and chat completions logic
- document new configuration options

## Testing
- `cargo test -p codex-core --no-run`

------
https://chatgpt.com/codex/tasks/task_i_68792c5b04cc832195c03050c8b6ea94

---------

Co-authored-by: Michael Bolin <mbolin@openai.com>
This commit is contained in:
aibrahim-oai
2025-07-18 12:12:39 -07:00
committed by GitHub
parent d5a2148deb
commit 9846adeabf
12 changed files with 228 additions and 94 deletions

View File

@@ -21,8 +21,6 @@ use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::error::CodexErr;
use crate::error::Result;
use crate::flags::OPENAI_REQUEST_MAX_RETRIES;
use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS;
use crate::models::ContentItem;
use crate::models::ResponseItem;
use crate::openai_tools::create_tools_json_for_chat_completions_api;
@@ -121,6 +119,7 @@ pub(crate) async fn stream_chat_completions(
);
let mut attempt = 0;
let max_retries = provider.request_max_retries();
loop {
attempt += 1;
@@ -136,7 +135,11 @@ pub(crate) async fn stream_chat_completions(
Ok(resp) if resp.status().is_success() => {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
tokio::spawn(process_chat_sse(stream, tx_event));
tokio::spawn(process_chat_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
));
return Ok(ResponseStream { rx_event });
}
Ok(res) => {
@@ -146,7 +149,7 @@ pub(crate) async fn stream_chat_completions(
return Err(CodexErr::UnexpectedStatus(status, body));
}
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(CodexErr::RetryLimit(status));
}
@@ -162,7 +165,7 @@ pub(crate) async fn stream_chat_completions(
tokio::time::sleep(delay).await;
}
Err(e) => {
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(e.into());
}
let delay = backoff(attempt);
@@ -175,14 +178,15 @@ pub(crate) async fn stream_chat_completions(
/// Lightweight SSE processor for the Chat Completions streaming format. The
/// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest
/// of the pipeline can stay agnostic of the underlying wire format.
async fn process_chat_sse<S>(stream: S, tx_event: mpsc::Sender<Result<ResponseEvent>>)
where
async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
let mut stream = stream.eventsource();
let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS;
// State to accumulate a function call across streaming chunks.
// OpenAI may split the `arguments` string over multiple `delta` events
// until the chunk whose `finish_reason` is `tool_calls` is emitted. We

View File

@@ -30,8 +30,6 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
use crate::error::CodexErr;
use crate::error::Result;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::flags::OPENAI_REQUEST_MAX_RETRIES;
use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::models::ResponseItem;
@@ -113,7 +111,7 @@ impl ModelClient {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
// short circuit for tests
warn!(path, "Streaming from fixture");
return stream_from_fixture(path).await;
return stream_from_fixture(path, self.provider.clone()).await;
}
let full_instructions = prompt.get_full_instructions(&self.config.model);
@@ -140,6 +138,7 @@ impl ModelClient {
);
let mut attempt = 0;
let max_retries = self.provider.request_max_retries();
loop {
attempt += 1;
@@ -158,7 +157,11 @@ impl ModelClient {
// spawn task to process SSE
let stream = resp.bytes_stream().map_err(CodexErr::Reqwest);
tokio::spawn(process_sse(stream, tx_event));
tokio::spawn(process_sse(
stream,
tx_event,
self.provider.stream_idle_timeout(),
));
return Ok(ResponseStream { rx_event });
}
@@ -177,7 +180,7 @@ impl ModelClient {
return Err(CodexErr::UnexpectedStatus(status, body));
}
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(CodexErr::RetryLimit(status));
}
@@ -194,7 +197,7 @@ impl ModelClient {
tokio::time::sleep(delay).await;
}
Err(e) => {
if attempt > *OPENAI_REQUEST_MAX_RETRIES {
if attempt > max_retries {
return Err(e.into());
}
let delay = backoff(attempt);
@@ -203,6 +206,10 @@ impl ModelClient {
}
}
}
pub fn get_provider(&self) -> ModelProviderInfo {
self.provider.clone()
}
}
#[derive(Debug, Deserialize, Serialize)]
@@ -254,14 +261,16 @@ struct ResponseCompletedOutputTokensDetails {
reasoning_tokens: u64,
}
async fn process_sse<S>(stream: S, tx_event: mpsc::Sender<Result<ResponseEvent>>)
where
async fn process_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
let mut stream = stream.eventsource();
// If the stream stays completely silent for an extended period treat it as disconnected.
let idle_timeout = *OPENAI_STREAM_IDLE_TIMEOUT_MS;
// The response id returned from the "complete" message.
let mut response_completed: Option<ResponseCompleted> = None;
@@ -322,7 +331,7 @@ where
// duplicated `output` array embedded in the `response.completed`
// payload. That produced two concrete issues:
// 1. No realtime streaming the user only saw output after the
// entire turn had finished, which broke the typing UX and
// entire turn had finished, which broke the "typing" UX and
// made longrunning turns look stalled.
// 2. Duplicate `function_call_output` items both the
// individual *and* the completed array were forwarded, which
@@ -395,7 +404,10 @@ where
}
/// used in tests to stream from a text SSE file
async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
async fn stream_from_fixture(
path: impl AsRef<Path>,
provider: ModelProviderInfo,
) -> Result<ResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let f = std::fs::File::open(path.as_ref())?;
let lines = std::io::BufReader::new(f).lines();
@@ -409,7 +421,11 @@ async fn stream_from_fixture(path: impl AsRef<Path>) -> Result<ResponseStream> {
let rdr = std::io::Cursor::new(content);
let stream = ReaderStream::new(rdr).map_err(CodexErr::Io);
tokio::spawn(process_sse(stream, tx_event));
tokio::spawn(process_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
));
Ok(ResponseStream { rx_event })
}
@@ -429,7 +445,10 @@ mod tests {
/// Runs the SSE parser on pre-chunked byte slices and returns every event
/// (including any final `Err` from a stream-closure check).
async fn collect_events(chunks: &[&[u8]]) -> Vec<Result<ResponseEvent>> {
async fn collect_events(
chunks: &[&[u8]],
provider: ModelProviderInfo,
) -> Vec<Result<ResponseEvent>> {
let mut builder = IoBuilder::new();
for chunk in chunks {
builder.read(chunk);
@@ -438,7 +457,7 @@ mod tests {
let reader = builder.build();
let stream = ReaderStream::new(reader).map_err(CodexErr::Io);
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
tokio::spawn(process_sse(stream, tx));
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -449,7 +468,10 @@ mod tests {
/// Builds an in-memory SSE stream from JSON fixtures and returns only the
/// successfully parsed events (panics on internal channel errors).
async fn run_sse(events: Vec<serde_json::Value>) -> Vec<ResponseEvent> {
async fn run_sse(
events: Vec<serde_json::Value>,
provider: ModelProviderInfo,
) -> Vec<ResponseEvent> {
let mut body = String::new();
for e in events {
let kind = e
@@ -465,7 +487,7 @@ mod tests {
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
tokio::spawn(process_sse(stream, tx));
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
let mut out = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -510,7 +532,25 @@ mod tests {
let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n");
let sse3 = format!("event: response.completed\ndata: {completed}\n\n");
let events = collect_events(&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()]).await;
let provider = ModelProviderInfo {
name: "test".to_string(),
base_url: "https://test.com".to_string(),
env_key: Some("TEST_API_KEY".to_string()),
env_key_instructions: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(1000),
};
let events = collect_events(
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
provider,
)
.await;
assert_eq!(events.len(), 3);
@@ -551,8 +591,21 @@ mod tests {
.to_string();
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
let provider = ModelProviderInfo {
name: "test".to_string(),
base_url: "https://test.com".to_string(),
env_key: Some("TEST_API_KEY".to_string()),
env_key_instructions: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(1000),
};
let events = collect_events(&[sse1.as_bytes()]).await;
let events = collect_events(&[sse1.as_bytes()], provider).await;
assert_eq!(events.len(), 2);
@@ -640,7 +693,21 @@ mod tests {
let mut evs = vec![case.event];
evs.push(completed.clone());
let out = run_sse(evs).await;
let provider = ModelProviderInfo {
name: "test".to_string(),
base_url: "https://test.com".to_string(),
env_key: Some("TEST_API_KEY".to_string()),
env_key_instructions: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(1000),
};
let out = run_sse(evs, provider).await;
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
assert!(
(case.expect_first)(&out[0]),

View File

@@ -49,7 +49,6 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::process_exec_tool_call;
use crate::exec_env::create_env;
use crate::flags::OPENAI_STREAM_MAX_RETRIES;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_tool_call::handle_mcp_tool_call;
use crate::models::ContentItem;
@@ -1027,12 +1026,13 @@ async fn run_turn(
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
Err(e) => {
if retries < *OPENAI_STREAM_MAX_RETRIES {
// Use the configured provider-specific stream retry budget.
let max_retries = sess.client.get_provider().stream_max_retries();
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
warn!(
"stream disconnected - retrying turn ({retries}/{} in {delay:?})...",
*OPENAI_STREAM_MAX_RETRIES
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
);
// Surface retry information to any UI/frontend so the
@@ -1041,8 +1041,7 @@ async fn run_turn(
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{} in {:?}",
*OPENAI_STREAM_MAX_RETRIES, delay
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"
),
)
.await;
@@ -1124,7 +1123,28 @@ async fn try_run_turn(
let mut stream = sess.client.clone().stream(&prompt).await?;
let mut output = Vec::new();
while let Some(Ok(event)) = stream.next().await {
loop {
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = stream.next().await;
let Some(event) = event else {
// Channel closed without yielding a final Completed event or explicit error.
// Treat as a disconnected stream so the caller can retry.
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
));
};
let event = match event {
Ok(ev) => ev,
Err(e) => {
// Propagate the underlying stream error to the caller (run_turn), which
// will apply the configured `stream_max_retries` policy.
return Err(e);
}
};
match event {
ResponseEvent::Created => {
let mut state = sess.state.lock().unwrap();
@@ -1165,7 +1185,7 @@ async fn try_run_turn(
let mut state = sess.state.lock().unwrap();
state.previous_response_id = Some(response_id);
break;
return Ok(output);
}
ResponseEvent::OutputTextDelta(delta) => {
let event = Event {
@@ -1183,7 +1203,6 @@ async fn try_run_turn(
}
}
}
Ok(output)
}
async fn handle_response_item(

View File

@@ -682,6 +682,9 @@ name = "OpenAI using Chat Completions"
base_url = "https://api.openai.com/v1"
env_key = "OPENAI_API_KEY"
wire_api = "chat"
request_max_retries = 4 # retry failed HTTP requests
stream_max_retries = 10 # retry dropped SSE streams
stream_idle_timeout_ms = 300000 # 5m idle timeout
[profiles.o3]
model = "o3"
@@ -722,6 +725,9 @@ disable_response_storage = true
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(4),
stream_max_retries: Some(10),
stream_idle_timeout_ms: Some(300_000),
};
let model_provider_map = {
let mut model_provider_map = built_in_model_providers();

View File

@@ -11,14 +11,6 @@ env_flags! {
pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
value.parse().map(Duration::from_millis)
};
pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4;
pub OPENAI_STREAM_MAX_RETRIES: u64 = 10;
// We generally don't want to disconnect; this updates the timeout to be five minutes
// which matches the upstream typescript codex impl.
pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
value.parse().map(Duration::from_millis)
};
/// Fixture path for offline tests (see client.rs).
pub CODEX_RS_SSE_FIXTURE: Option<&str> = None;

View File

@@ -9,6 +9,7 @@ use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::env::VarError;
use std::time::Duration;
use crate::error::EnvVarError;
use crate::openai_api_key::get_openai_api_key;
@@ -16,6 +17,9 @@ use crate::openai_api_key::get_openai_api_key;
/// Value for the `OpenAI-Originator` header that is sent with requests to
/// OpenAI.
const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs";
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
const DEFAULT_STREAM_MAX_RETRIES: u64 = 10;
const DEFAULT_REQUEST_MAX_RETRIES: u64 = 4;
/// Wire protocol that the provider speaks. Most third-party services only
/// implement the classic OpenAI Chat Completions JSON schema, whereas OpenAI
@@ -26,7 +30,7 @@ const OPENAI_ORIGINATOR_HEADER: &str = "codex_cli_rs";
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum WireApi {
/// The experimental Responses API exposed by OpenAI at `/v1/responses`.
/// The experimental "Responses" API exposed by OpenAI at `/v1/responses`.
Responses,
/// Regular Chat Completions compatible with `/v1/chat/completions`.
@@ -64,6 +68,16 @@ pub struct ModelProviderInfo {
/// value should be used. If the environment variable is not set, or the
/// value is empty, the header will not be included in the request.
pub env_http_headers: Option<HashMap<String, String>>,
/// Maximum number of times to retry a failed HTTP request to this provider.
pub request_max_retries: Option<u64>,
/// Number of times to retry reconnecting a dropped streaming response before failing.
pub stream_max_retries: Option<u64>,
/// Idle timeout (in milliseconds) to wait for activity on a streaming response before treating
/// the connection as lost.
pub stream_idle_timeout_ms: Option<u64>,
}
impl ModelProviderInfo {
@@ -161,6 +175,25 @@ impl ModelProviderInfo {
None => Ok(None),
}
}
/// Effective maximum number of request retries for this provider.
pub fn request_max_retries(&self) -> u64 {
self.request_max_retries
.unwrap_or(DEFAULT_REQUEST_MAX_RETRIES)
}
/// Effective maximum number of stream reconnection attempts for this provider.
pub fn stream_max_retries(&self) -> u64 {
self.stream_max_retries
.unwrap_or(DEFAULT_STREAM_MAX_RETRIES)
}
/// Effective idle timeout for streaming responses.
pub fn stream_idle_timeout(&self) -> Duration {
self.stream_idle_timeout_ms
.map(Duration::from_millis)
.unwrap_or(Duration::from_millis(DEFAULT_STREAM_IDLE_TIMEOUT_MS))
}
}
/// Built-in default provider list.
@@ -205,6 +238,10 @@ pub fn built_in_model_providers() -> HashMap<String, ModelProviderInfo> {
.into_iter()
.collect(),
),
// Use global defaults for retry/timeout unless overridden in config.toml.
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
},
),
]
@@ -234,6 +271,9 @@ base_url = "http://localhost:11434/v1"
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
@@ -259,6 +299,9 @@ query_params = { api-version = "2025-04-01-preview" }
}),
http_headers: None,
env_http_headers: None,
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
@@ -287,6 +330,9 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
env_http_headers: Some(maplit::hashmap! {
"X-Example-Env-Header".to_string() => "EXAMPLE_ENV_VAR".to_string(),
}),
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();