From d7a40195e635d645119364b0a77b100946eea852 Mon Sep 17 00:00:00 2001 From: oai-ragona <144164704+oai-ragona@users.noreply.github.com> Date: Fri, 25 Apr 2025 11:44:22 -0700 Subject: [PATCH] [codex-rs] Reliability pass on networking (#658) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We currently see a behavior that looks like this: ``` 2025-04-25T16:52:24.552789Z WARN codex_core::codex: stream disconnected - retrying turn (1/10 in 232ms)... codex> event: BackgroundEvent { message: "stream error: stream disconnected before completion: Transport error: error decoding response body; retrying 1/10 in 232ms…" } 2025-04-25T16:52:54.789885Z WARN codex_core::codex: stream disconnected - retrying turn (2/10 in 418ms)... codex> event: BackgroundEvent { message: "stream error: stream disconnected before completion: Transport error: error decoding response body; retrying 2/10 in 418ms…" } ``` This PR contains a few different fixes that attempt to resolve/improve this: 1. **Remove overall client timeout.** I think [this](https://github.com/openai/codex/pull/658/files#diff-c39945d3c42f29b506ff54b7fa2be0795b06d7ad97f1bf33956f60e3c6f19c19L173) is perhaps the big fix -- it looks to me like this was actually timing out even if events were still coming through, and that was causing a disconnect right in the middle of a healthy stream. 2. **Cap response sizes.** We were frequently sending MUCH larger responses than the upstream typescript `codex`, and that was definitely not helping. [Fix here](https://github.com/openai/codex/pull/658/files#diff-d792bef59aa3ee8cb0cbad8b176dbfefe451c227ac89919da7c3e536a9d6cdc0R21-R26) for that one. 3. **Much higher idle timeout.** Our idle timeout value was much lower than typescript. 4. **Sub-linear backoff.** We were much too aggressively backing off, [this](https://github.com/openai/codex/pull/658/files#diff-5d5959b95c6239e6188516da5c6b7eb78154cd9cfedfb9f753d30a7b6d6b8b06R30-R33) makes it sub-exponential but maintains the jitter and such. I was seeing that `stream error: stream disconnected` behavior constantly, and anecdotally I can no longer reproduce. It feels much snappier. --- codex-rs/core/src/client.rs | 2 -- codex-rs/core/src/exec.rs | 37 ++++++++++++++++++++++++++++++------- codex-rs/core/src/flags.rs | 9 ++++----- codex-rs/core/src/util.rs | 12 +++++++----- 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 57f593a8..51e8b8e8 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -28,7 +28,6 @@ use crate::flags::CODEX_RS_SSE_FIXTURE; use crate::flags::OPENAI_API_BASE; use crate::flags::OPENAI_REQUEST_MAX_RETRIES; use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS; -use crate::flags::OPENAI_TIMEOUT_MS; use crate::models::ResponseInputItem; use crate::models::ResponseItem; use crate::util::backoff; @@ -170,7 +169,6 @@ impl ModelClient { .header("OpenAI-Beta", "responses=experimental") .header(reqwest::header::ACCEPT, "text/event-stream") .json(&payload) - .timeout(*OPENAI_TIMEOUT_MS) .send() .await; match res { diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index ae83dc84..6a9fd212 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -7,6 +7,7 @@ use std::time::Duration; use std::time::Instant; use serde::Deserialize; +use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::BufReader; use tokio::process::Command; @@ -17,9 +18,11 @@ use crate::error::Result; use crate::error::SandboxErr; use crate::protocol::SandboxPolicy; -/// Maximum we keep for each stream (100 KiB). -/// TODO(ragona) this should be reduced -const MAX_STREAM_OUTPUT: usize = 100 * 1024; +/// Maximum we send for each stream, which is either: +/// - 10KiB OR +/// - 256 lines +const MAX_STREAM_OUTPUT: usize = 10 * 1024; +const MAX_STREAM_OUTPUT_LINES: usize = 256; const DEFAULT_TIMEOUT_MS: u64 = 10_000; @@ -222,10 +225,12 @@ pub async fn exec( let stdout_handle = tokio::spawn(read_capped( BufReader::new(child.stdout.take().expect("stdout is not piped")), MAX_STREAM_OUTPUT, + MAX_STREAM_OUTPUT_LINES, )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(child.stderr.take().expect("stderr is not piped")), MAX_STREAM_OUTPUT, + MAX_STREAM_OUTPUT_LINES, )); let interrupted = ctrl_c.notified(); @@ -259,23 +264,41 @@ pub async fn exec( }) } -async fn read_capped( +async fn read_capped( mut reader: R, max_output: usize, + max_lines: usize, ) -> io::Result> { let mut buf = Vec::with_capacity(max_output.min(8 * 1024)); let mut tmp = [0u8; 8192]; + let mut remaining_bytes = max_output; + let mut remaining_lines = max_lines; + loop { let n = reader.read(&mut tmp).await?; if n == 0 { break; } - if buf.len() < max_output { - let remaining = max_output - buf.len(); - buf.extend_from_slice(&tmp[..remaining.min(n)]); + + // Copy into the buffer only while we still have byte and line budget. + if remaining_bytes > 0 && remaining_lines > 0 { + let mut copy_len = 0; + for &b in &tmp[..n] { + if remaining_bytes == 0 || remaining_lines == 0 { + break; + } + copy_len += 1; + remaining_bytes -= 1; + if b == b'\n' { + remaining_lines -= 1; + } + } + buf.extend_from_slice(&tmp[..copy_len]); } + // Continue reading to EOF to avoid back-pressure, but discard once caps are hit. } + Ok(buf) } diff --git a/codex-rs/core/src/flags.rs b/codex-rs/core/src/flags.rs index 41572f1a..4d0d4bbe 100644 --- a/codex-rs/core/src/flags.rs +++ b/codex-rs/core/src/flags.rs @@ -9,16 +9,15 @@ env_flags! { pub OPENAI_DEFAULT_MODEL: &str = "o3"; pub OPENAI_API_BASE: &str = "https://api.openai.com"; pub OPENAI_API_KEY: Option<&str> = None; - pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(30_000), |value| { + pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| { value.parse().map(Duration::from_millis) }; pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4; pub OPENAI_STREAM_MAX_RETRIES: u64 = 10; - /// Maximum idle time (no SSE events received) before the stream is treated as - /// disconnected and retried by the agent. The default of 75 s is slightly - /// above OpenAI’s documented 60 s load‑balancer timeout. - pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(75_000), |value| { + // We generally don't want to disconnect; this updates the timeout to be five minutes + // which matches the upstream typescript codex impl. + pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| { value.parse().map(Duration::from_millis) }; diff --git a/codex-rs/core/src/util.rs b/codex-rs/core/src/util.rs index 27241c77..14bcc16d 100644 --- a/codex-rs/core/src/util.rs +++ b/codex-rs/core/src/util.rs @@ -5,6 +5,9 @@ use rand::Rng; use tokio::sync::Notify; use tracing::debug; +const INITIAL_DELAY_MS: u64 = 200; +const BACKOFF_FACTOR: f64 = 1.3; + /// Make a CancellationToken that is fulfilled when SIGINT occurs. pub fn notify_on_sigint() -> Arc { let notify = Arc::new(Notify::new()); @@ -23,12 +26,11 @@ pub fn notify_on_sigint() -> Arc { notify } -/// Default exponential back‑off schedule: 200ms → 400ms → 800ms → 1600ms. pub(crate) fn backoff(attempt: u64) -> Duration { - let base_delay_ms = 200u64 * (1u64 << (attempt - 1)); - let jitter = rand::rng().random_range(0.8..1.2); - let delay_ms = (base_delay_ms as f64 * jitter) as u64; - Duration::from_millis(delay_ms) + let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32); + let base = (INITIAL_DELAY_MS as f64 * exp) as u64; + let jitter = rand::rng().random_range(0.9..1.1); + Duration::from_millis((base as f64 * jitter) as u64) } /// Return `true` if the current working directory is inside a Git repository.