diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 57f593a8..51e8b8e8 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -28,7 +28,6 @@ use crate::flags::CODEX_RS_SSE_FIXTURE; use crate::flags::OPENAI_API_BASE; use crate::flags::OPENAI_REQUEST_MAX_RETRIES; use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS; -use crate::flags::OPENAI_TIMEOUT_MS; use crate::models::ResponseInputItem; use crate::models::ResponseItem; use crate::util::backoff; @@ -170,7 +169,6 @@ impl ModelClient { .header("OpenAI-Beta", "responses=experimental") .header(reqwest::header::ACCEPT, "text/event-stream") .json(&payload) - .timeout(*OPENAI_TIMEOUT_MS) .send() .await; match res { diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index ae83dc84..6a9fd212 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -7,6 +7,7 @@ use std::time::Duration; use std::time::Instant; use serde::Deserialize; +use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::BufReader; use tokio::process::Command; @@ -17,9 +18,11 @@ use crate::error::Result; use crate::error::SandboxErr; use crate::protocol::SandboxPolicy; -/// Maximum we keep for each stream (100 KiB). -/// TODO(ragona) this should be reduced -const MAX_STREAM_OUTPUT: usize = 100 * 1024; +/// Maximum we send for each stream, which is either: +/// - 10KiB OR +/// - 256 lines +const MAX_STREAM_OUTPUT: usize = 10 * 1024; +const MAX_STREAM_OUTPUT_LINES: usize = 256; const DEFAULT_TIMEOUT_MS: u64 = 10_000; @@ -222,10 +225,12 @@ pub async fn exec( let stdout_handle = tokio::spawn(read_capped( BufReader::new(child.stdout.take().expect("stdout is not piped")), MAX_STREAM_OUTPUT, + MAX_STREAM_OUTPUT_LINES, )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(child.stderr.take().expect("stderr is not piped")), MAX_STREAM_OUTPUT, + MAX_STREAM_OUTPUT_LINES, )); let interrupted = ctrl_c.notified(); @@ -259,23 +264,41 @@ pub async fn exec( }) } -async fn read_capped( +async fn read_capped( mut reader: R, max_output: usize, + max_lines: usize, ) -> io::Result> { let mut buf = Vec::with_capacity(max_output.min(8 * 1024)); let mut tmp = [0u8; 8192]; + let mut remaining_bytes = max_output; + let mut remaining_lines = max_lines; + loop { let n = reader.read(&mut tmp).await?; if n == 0 { break; } - if buf.len() < max_output { - let remaining = max_output - buf.len(); - buf.extend_from_slice(&tmp[..remaining.min(n)]); + + // Copy into the buffer only while we still have byte and line budget. + if remaining_bytes > 0 && remaining_lines > 0 { + let mut copy_len = 0; + for &b in &tmp[..n] { + if remaining_bytes == 0 || remaining_lines == 0 { + break; + } + copy_len += 1; + remaining_bytes -= 1; + if b == b'\n' { + remaining_lines -= 1; + } + } + buf.extend_from_slice(&tmp[..copy_len]); } + // Continue reading to EOF to avoid back-pressure, but discard once caps are hit. } + Ok(buf) } diff --git a/codex-rs/core/src/flags.rs b/codex-rs/core/src/flags.rs index 41572f1a..4d0d4bbe 100644 --- a/codex-rs/core/src/flags.rs +++ b/codex-rs/core/src/flags.rs @@ -9,16 +9,15 @@ env_flags! { pub OPENAI_DEFAULT_MODEL: &str = "o3"; pub OPENAI_API_BASE: &str = "https://api.openai.com"; pub OPENAI_API_KEY: Option<&str> = None; - pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(30_000), |value| { + pub OPENAI_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| { value.parse().map(Duration::from_millis) }; pub OPENAI_REQUEST_MAX_RETRIES: u64 = 4; pub OPENAI_STREAM_MAX_RETRIES: u64 = 10; - /// Maximum idle time (no SSE events received) before the stream is treated as - /// disconnected and retried by the agent. The default of 75 s is slightly - /// above OpenAI’s documented 60 s load‑balancer timeout. - pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(75_000), |value| { + // We generally don't want to disconnect; this updates the timeout to be five minutes + // which matches the upstream typescript codex impl. + pub OPENAI_STREAM_IDLE_TIMEOUT_MS: Duration = Duration::from_millis(300_000), |value| { value.parse().map(Duration::from_millis) }; diff --git a/codex-rs/core/src/util.rs b/codex-rs/core/src/util.rs index 27241c77..14bcc16d 100644 --- a/codex-rs/core/src/util.rs +++ b/codex-rs/core/src/util.rs @@ -5,6 +5,9 @@ use rand::Rng; use tokio::sync::Notify; use tracing::debug; +const INITIAL_DELAY_MS: u64 = 200; +const BACKOFF_FACTOR: f64 = 1.3; + /// Make a CancellationToken that is fulfilled when SIGINT occurs. pub fn notify_on_sigint() -> Arc { let notify = Arc::new(Notify::new()); @@ -23,12 +26,11 @@ pub fn notify_on_sigint() -> Arc { notify } -/// Default exponential back‑off schedule: 200ms → 400ms → 800ms → 1600ms. pub(crate) fn backoff(attempt: u64) -> Duration { - let base_delay_ms = 200u64 * (1u64 << (attempt - 1)); - let jitter = rand::rng().random_range(0.8..1.2); - let delay_ms = (base_delay_ms as f64 * jitter) as u64; - Duration::from_millis(delay_ms) + let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32); + let base = (INITIAL_DELAY_MS as f64 * exp) as u64; + let jitter = rand::rng().random_range(0.9..1.1); + Duration::from_millis((base as f64 * jitter) as u64) } /// Return `true` if the current working directory is inside a Git repository.