[codex-rs] Reliability pass on networking (#658)

We currently see a behavior that looks like this:
```
2025-04-25T16:52:24.552789Z  WARN codex_core::codex: stream disconnected - retrying turn (1/10 in 232ms)...
codex> event: BackgroundEvent { message: "stream error: stream disconnected before completion: Transport error: error decoding response body; retrying 1/10 in 232ms…" }
2025-04-25T16:52:54.789885Z  WARN codex_core::codex: stream disconnected - retrying turn (2/10 in 418ms)...
codex> event: BackgroundEvent { message: "stream error: stream disconnected before completion: Transport error: error decoding response body; retrying 2/10 in 418ms…" }
```

This PR contains a few different fixes that attempt to resolve/improve
this:
1. **Remove overall client timeout.** I think
[this](https://github.com/openai/codex/pull/658/files#diff-c39945d3c42f29b506ff54b7fa2be0795b06d7ad97f1bf33956f60e3c6f19c19L173)
is perhaps the big fix -- it looks to me like this was actually timing
out even if events were still coming through, and that was causing a
disconnect right in the middle of a healthy stream.
2. **Cap response sizes.** We were frequently sending MUCH larger
responses than the upstream typescript `codex`, and that was definitely
not helping. [Fix
here](https://github.com/openai/codex/pull/658/files#diff-d792bef59aa3ee8cb0cbad8b176dbfefe451c227ac89919da7c3e536a9d6cdc0R21-R26)
for that one.
3. **Much higher idle timeout.** Our idle timeout value was much lower
than typescript.
4. **Sub-linear backoff.** We were much too aggressively backing off,
[this](https://github.com/openai/codex/pull/658/files#diff-5d5959b95c6239e6188516da5c6b7eb78154cd9cfedfb9f753d30a7b6d6b8b06R30-R33)
makes it sub-exponential but maintains the jitter and such.

I was seeing that `stream error: stream disconnected` behavior
constantly, and anecdotally I can no longer reproduce. It feels much
snappier.
This commit is contained in:
oai-ragona
2025-04-25 11:44:22 -07:00
committed by GitHub
parent 4760aa1eb9
commit d7a40195e6
4 changed files with 41 additions and 19 deletions

View File

@@ -28,7 +28,6 @@ use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::flags::OPENAI_API_BASE; use crate::flags::OPENAI_API_BASE;
use crate::flags::OPENAI_REQUEST_MAX_RETRIES; use crate::flags::OPENAI_REQUEST_MAX_RETRIES;
use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS; use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS;
use crate::flags::OPENAI_TIMEOUT_MS;
use crate::models::ResponseInputItem; use crate::models::ResponseInputItem;
use crate::models::ResponseItem; use crate::models::ResponseItem;
use crate::util::backoff; use crate::util::backoff;
@@ -170,7 +169,6 @@ impl ModelClient {
.header("OpenAI-Beta", "responses=experimental") .header("OpenAI-Beta", "responses=experimental")
.header(reqwest::header::ACCEPT, "text/event-stream") .header(reqwest::header::ACCEPT, "text/event-stream")
.json(&payload) .json(&payload)
.timeout(*OPENAI_TIMEOUT_MS)
.send() .send()
.await; .await;
match res { match res {

View File

@@ -7,6 +7,7 @@ use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use serde::Deserialize; use serde::Deserialize;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::io::BufReader; use tokio::io::BufReader;
use tokio::process::Command; use tokio::process::Command;
@@ -17,9 +18,11 @@ use crate::error::Result;
use crate::error::SandboxErr; use crate::error::SandboxErr;
use crate::protocol::SandboxPolicy; use crate::protocol::SandboxPolicy;
/// Maximum we keep for each stream (100 KiB). /// Maximum we send for each stream, which is either:
/// TODO(ragona) this should be reduced /// - 10KiB OR
const MAX_STREAM_OUTPUT: usize = 100 * 1024; /// - 256 lines
const MAX_STREAM_OUTPUT: usize = 10 * 1024;
const MAX_STREAM_OUTPUT_LINES: usize = 256;
const DEFAULT_TIMEOUT_MS: u64 = 10_000; const DEFAULT_TIMEOUT_MS: u64 = 10_000;
@@ -222,10 +225,12 @@ pub async fn exec(
let stdout_handle = tokio::spawn(read_capped( let stdout_handle = tokio::spawn(read_capped(
BufReader::new(child.stdout.take().expect("stdout is not piped")), BufReader::new(child.stdout.take().expect("stdout is not piped")),
MAX_STREAM_OUTPUT, MAX_STREAM_OUTPUT,
MAX_STREAM_OUTPUT_LINES,
)); ));
let stderr_handle = tokio::spawn(read_capped( let stderr_handle = tokio::spawn(read_capped(
BufReader::new(child.stderr.take().expect("stderr is not piped")), BufReader::new(child.stderr.take().expect("stderr is not piped")),
MAX_STREAM_OUTPUT, MAX_STREAM_OUTPUT,
MAX_STREAM_OUTPUT_LINES,
)); ));
let interrupted = ctrl_c.notified(); let interrupted = ctrl_c.notified();
@@ -259,23 +264,41 @@ pub async fn exec(
}) })
} }
async fn read_capped<R: AsyncReadExt + Unpin>( async fn read_capped<R: AsyncRead + Unpin>(
mut reader: R, mut reader: R,
max_output: usize, max_output: usize,
max_lines: usize,
) -> io::Result<Vec<u8>> { ) -> io::Result<Vec<u8>> {
let mut buf = Vec::with_capacity(max_output.min(8 * 1024)); let mut buf = Vec::with_capacity(max_output.min(8 * 1024));
let mut tmp = [0u8; 8192]; let mut tmp = [0u8; 8192];
let mut remaining_bytes = max_output;
let mut remaining_lines = max_lines;
loop { loop {
let n = reader.read(&mut tmp).await?; let n = reader.read(&mut tmp).await?;
if n == 0 { if n == 0 {
break; break;
} }
if buf.len() < max_output {
let remaining = max_output - buf.len(); // Copy into the buffer only while we still have byte and line budget.
buf.extend_from_slice(&tmp[..remaining.min(n)]); if remaining_bytes > 0 && remaining_lines > 0 {
let mut copy_len = 0;
for &b in &tmp[..n] {
if remaining_bytes == 0 || remaining_lines == 0 {
break;
}
copy_len += 1;
remaining_bytes -= 1;
if b == b'\n' {
remaining_lines -= 1;
}
}
buf.extend_from_slice(&tmp[..copy_len]);
} }
// Continue reading to EOF to avoid back-pressure, but discard once caps are hit.
} }
Ok(buf) Ok(buf)
} }

View File

@@ -9,16 +9,15 @@ env_flags! {
pub OPENAI_DEFAULT_MODEL: &str = "o3"; pub OPENAI_DEFAULT_MODEL: &str = "o3";
pub OPENAI_API_BASE: &str = "https://api.openai.com"; pub OPENAI_API_BASE: &str = "https://api.openai.com";
pub OPENAI_API_KEY: Option<&str> = None; pub OPENAI_API_KEY: Option<&str> = None;
pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(30_000), |value| { pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
value.parse().map(Duration::from_millis) value.parse().map(Duration::from_millis)
}; };
pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4; pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4;
pub OPENAI_STREAM_MAX_RETRIES: u64 = 10; pub OPENAI_STREAM_MAX_RETRIES: u64 = 10;
/// Maximum idle time (no SSE events received) before the stream is treated as // We generally don't want to disconnect; this updates the timeout to be five minutes
/// disconnected and retried by the agent. The default of 75s is slightly // which matches the upstream typescript codex impl.
/// above OpenAIs documented 60s loadbalancer timeout. pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| {
pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(75_000), |value| {
value.parse().map(Duration::from_millis) value.parse().map(Duration::from_millis)
}; };

View File

@@ -5,6 +5,9 @@ use rand::Rng;
use tokio::sync::Notify; use tokio::sync::Notify;
use tracing::debug; use tracing::debug;
const INITIAL_DELAY_MS: u64 = 200;
const BACKOFF_FACTOR: f64 = 1.3;
/// Make a CancellationToken that is fulfilled when SIGINT occurs. /// Make a CancellationToken that is fulfilled when SIGINT occurs.
pub fn notify_on_sigint() -> Arc<Notify> { pub fn notify_on_sigint() -> Arc<Notify> {
let notify = Arc::new(Notify::new()); let notify = Arc::new(Notify::new());
@@ -23,12 +26,11 @@ pub fn notify_on_sigint() -> Arc<Notify> {
notify notify
} }
/// Default exponential backoff schedule: 200ms → 400ms → 800ms → 1600ms.
pub(crate) fn backoff(attempt: u64) -> Duration { pub(crate) fn backoff(attempt: u64) -> Duration {
let base_delay_ms = 200u64 * (1u64 << (attempt - 1)); let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32);
let jitter = rand::rng().random_range(0.8..1.2); let base = (INITIAL_DELAY_MS as f64 * exp) as u64;
let delay_ms = (base_delay_ms as f64 * jitter) as u64; let jitter = rand::rng().random_range(0.9..1.1);
Duration::from_millis(delay_ms) Duration::from_millis((base as f64 * jitter) as u64)
} }
/// Return `true` if the current working directory is inside a Git repository. /// Return `true` if the current working directory is inside a Git repository.