This introduces an experimental `--output-last-message` flag that can be used to identify a file where the final message from the agent will be written. Two use cases: - Ultimately, we will likely add a `--quiet` option to `exec`, but even if the user does not want any output written to the terminal, they probably want to know what the agent did. Writing the output to a file makes it possible to get that information in a clean way. - Relatedly, when using `exec` in CI, it is easier to review the transcript written "normally," (i.e., not as JSON or something with extra escapes), but getting programmatic access to the last message is likely helpful, so writing the last message to a file gets the best of both worlds. I am calling this "experimental" because it is possible that we are overfitting and will want a more general solution to this problem that would justify removing this flag.
127 lines
4.1 KiB
Rust
127 lines
4.1 KiB
Rust
//! Verifies that the agent retries when the SSE stream terminates before
|
|
//! delivering a `response.completed` event.
|
|
|
|
use std::time::Duration;
|
|
|
|
use codex_core::Codex;
|
|
use codex_core::ModelProviderInfo;
|
|
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
|
use codex_core::protocol::EventMsg;
|
|
use codex_core::protocol::InputItem;
|
|
use codex_core::protocol::Op;
|
|
mod test_support;
|
|
use tempfile::TempDir;
|
|
use test_support::load_default_config_for_test;
|
|
use tokio::time::timeout;
|
|
use wiremock::Mock;
|
|
use wiremock::MockServer;
|
|
use wiremock::Request;
|
|
use wiremock::Respond;
|
|
use wiremock::ResponseTemplate;
|
|
use wiremock::matchers::method;
|
|
use wiremock::matchers::path;
|
|
|
|
fn sse_incomplete() -> String {
|
|
// Only a single line; missing the completed event.
|
|
"event: response.output_item.done\n\n".to_string()
|
|
}
|
|
|
|
fn sse_completed(id: &str) -> String {
|
|
format!(
|
|
"event: response.completed\n\
|
|
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{}\",\"output\":[]}}}}\n\n\n",
|
|
id
|
|
)
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn retries_on_early_close() {
|
|
#![allow(clippy::unwrap_used)]
|
|
|
|
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
|
|
println!(
|
|
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
|
|
);
|
|
return;
|
|
}
|
|
|
|
let server = MockServer::start().await;
|
|
|
|
struct SeqResponder;
|
|
impl Respond for SeqResponder {
|
|
fn respond(&self, _: &Request) -> ResponseTemplate {
|
|
use std::sync::atomic::AtomicUsize;
|
|
use std::sync::atomic::Ordering;
|
|
static CALLS: AtomicUsize = AtomicUsize::new(0);
|
|
let n = CALLS.fetch_add(1, Ordering::SeqCst);
|
|
if n == 0 {
|
|
ResponseTemplate::new(200)
|
|
.insert_header("content-type", "text/event-stream")
|
|
.set_body_raw(sse_incomplete(), "text/event-stream")
|
|
} else {
|
|
ResponseTemplate::new(200)
|
|
.insert_header("content-type", "text/event-stream")
|
|
.set_body_raw(sse_completed("resp_ok"), "text/event-stream")
|
|
}
|
|
}
|
|
}
|
|
|
|
Mock::given(method("POST"))
|
|
.and(path("/v1/responses"))
|
|
.respond_with(SeqResponder {})
|
|
.expect(2)
|
|
.mount(&server)
|
|
.await;
|
|
|
|
// Environment
|
|
//
|
|
// As of Rust 2024 `std::env::set_var` has been made `unsafe` because
|
|
// mutating the process environment is inherently racy when other threads
|
|
// are running. We therefore have to wrap every call in an explicit
|
|
// `unsafe` block. These are limited to the test-setup section so the
|
|
// scope is very small and clearly delineated.
|
|
|
|
unsafe {
|
|
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
|
|
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "1");
|
|
std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000");
|
|
}
|
|
|
|
let model_provider = ModelProviderInfo {
|
|
name: "openai".into(),
|
|
base_url: format!("{}/v1", server.uri()),
|
|
// Environment variable that should exist in the test environment.
|
|
// ModelClient will return an error if the environment variable for the
|
|
// provider is not set.
|
|
env_key: Some("PATH".into()),
|
|
env_key_instructions: None,
|
|
wire_api: codex_core::WireApi::Responses,
|
|
};
|
|
|
|
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
|
|
let codex_home = TempDir::new().unwrap();
|
|
let mut config = load_default_config_for_test(&codex_home);
|
|
config.model_provider = model_provider;
|
|
let (codex, _init_id) = Codex::spawn(config, ctrl_c).await.unwrap();
|
|
|
|
codex
|
|
.submit(Op::UserInput {
|
|
items: vec![InputItem::Text {
|
|
text: "hello".into(),
|
|
}],
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// Wait until TaskComplete (should succeed after retry).
|
|
loop {
|
|
let ev = timeout(Duration::from_secs(10), codex.next_event())
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
if matches!(ev.msg, EventMsg::TaskComplete(_)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|