Files
llmx/codex-rs/core/tests/live_agent.rs
Michael Bolin b323d10ea7 feat: add ZDR support to Rust implementation (#642)
This adds support for the `--disable-response-storage` flag across our
multiple Rust CLIs to support customers who have opted into Zero-Data
Retention (ZDR). The analogous changes to the TypeScript CLI were:

* https://github.com/openai/codex/pull/481
* https://github.com/openai/codex/pull/543

For a client using ZDR, `previous_response_id` will never be available,
so the `input` field of an API request must include the full transcript
of the conversation thus far. As such, this PR changes the type of
`Prompt.input` from `Vec<ResponseInputItem>` to `Vec<ResponseItem>`.

Practically speaking, `ResponseItem` was effectively a "superset" of
`ResponseInputItem` already. The main difference for us is that
`ResponseItem` includes the `FunctionCall` variant that we have to
include as part of the conversation history in the ZDR case.

Another key change in this PR is modifying `try_run_turn()` so that it
returns the `Vec<ResponseItem>` for the turn in addition to the
`Vec<ResponseInputItem>` produced by `try_run_turn()`. This is because
the caller of `run_turn()` needs to record the `Vec<ResponseItem>` when
ZDR is enabled.

To that end, this PR introduces `ZdrTranscript` (and adds
`zdr_transcript: Option<ZdrTranscript>` to `struct State` in `codex.rs`)
to take responsibility for maintaining the conversation transcript in
the ZDR case.
2025-04-25 12:08:18 -07:00

221 lines
7.2 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Live integration tests that exercise the full [`Agent`] stack **against the real
//! OpenAI `/v1/responses` API**. These tests complement the lightweight mockbased
//! unit tests by verifying that the agent can drive an endtoend conversation,
//! stream incremental events, execute functioncall tool invocations and safely
//! chain multiple turns inside a single session the exact scenarios that have
//! historically been brittle.
//!
//! The live tests are **ignored by default** so CI remains deterministic and free
//! of external dependencies. Developers can optin locally with e.g.
//!
//! ```bash
//! OPENAI_API_KEY=sk... cargo test --test live_agent -- --ignored --nocapture
//! ```
//!
//! Make sure your key has access to the experimental *Responses* API and that
//! any billable usage is acceptable.
use std::time::Duration;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use codex_core::Codex;
use tokio::sync::Notify;
use tokio::time::timeout;
fn api_key_available() -> bool {
std::env::var("OPENAI_API_KEY").is_ok()
}
/// Helper that spawns a fresh Agent and sends the mandatory *ConfigureSession*
/// submission. The caller receives the constructed [`Agent`] plus the unique
/// submission id used for the initialization message.
async fn spawn_codex() -> Codex {
assert!(
api_key_available(),
"OPENAI_API_KEY must be set for live tests"
);
// Environment tweaks to keep the tests snappy and inexpensive while still
// exercising retry/robustness logic.
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "2");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "2");
let agent = Codex::spawn(std::sync::Arc::new(Notify::new())).unwrap();
agent
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: None,
instructions: None,
approval_policy: AskForApproval::OnFailure,
sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted,
disable_response_storage: false,
},
})
.await
.expect("failed to submit init");
// Drain the SessionInitialized event so subsequent helper loops don't have
// to specialcase it.
loop {
let ev = timeout(Duration::from_secs(30), agent.next_event())
.await
.expect("timeout waiting for init event")
.expect("agent channel closed");
if matches!(ev.msg, EventMsg::SessionConfigured { .. }) {
break;
}
}
agent
}
/// Verifies that the agent streams incremental *AgentMessage* events **before**
/// emitting `TaskComplete` and that a second task inside the same session does
/// not get tripped up by a stale `previous_response_id`.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_streaming_and_prev_id_reset() {
if !api_key_available() {
eprintln!("skipping live_streaming_and_prev_id_reset OPENAI_API_KEY not set");
return;
}
let codex = spawn_codex().await;
// ---------- Task 1 ----------
codex
.submit(Submission {
id: "task1".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "Say the words 'stream test'".into(),
}],
},
})
.await
.unwrap();
let mut saw_message_before_complete = false;
loop {
let ev = timeout(Duration::from_secs(60), codex.next_event())
.await
.expect("timeout waiting for task1 events")
.expect("agent closed");
match ev.msg {
EventMsg::AgentMessage { .. } => saw_message_before_complete = true,
EventMsg::TaskComplete => break,
EventMsg::Error { message } => panic!("agent reported error in task1: {message}"),
_ => (),
}
}
assert!(
saw_message_before_complete,
"Agent did not stream any AgentMessage before TaskComplete"
);
// ---------- Task 2 (same session) ----------
codex
.submit(Submission {
id: "task2".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "Respond with exactly: second turn succeeded".into(),
}],
},
})
.await
.unwrap();
let mut got_expected = false;
loop {
let ev = timeout(Duration::from_secs(60), codex.next_event())
.await
.expect("timeout waiting for task2 events")
.expect("agent closed");
match &ev.msg {
EventMsg::AgentMessage { message } if message.contains("second turn succeeded") => {
got_expected = true;
}
EventMsg::TaskComplete => break,
EventMsg::Error { message } => panic!("agent reported error in task2: {message}"),
_ => (),
}
}
assert!(got_expected, "second task did not receive expected answer");
}
/// Exercises a *functioncall → shell execution* roundtrip by instructing the
/// model to run a harmless `echo` command. The test asserts that:
/// 1. the function call is executed (we see `ExecCommandBegin`/`End` events)
/// 2. the captured stdout reaches the client unchanged.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_shell_function_call() {
if !api_key_available() {
eprintln!("skipping live_shell_function_call OPENAI_API_KEY not set");
return;
}
let codex = spawn_codex().await;
const MARKER: &str = "codex_live_echo_ok";
codex
.submit(Submission {
id: "task_fn".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: format!(
"Use the shell function to run the command `echo {MARKER}` and no other commands."
),
}],
},
})
.await
.unwrap();
let mut saw_begin = false;
let mut saw_end_with_output = false;
loop {
let ev = timeout(Duration::from_secs(60), codex.next_event())
.await
.expect("timeout waiting for functioncall events")
.expect("agent closed");
match ev.msg {
EventMsg::ExecCommandBegin { command, .. } => {
assert_eq!(command, vec!["echo", MARKER]);
saw_begin = true;
}
EventMsg::ExecCommandEnd {
stdout, exit_code, ..
} => {
assert_eq!(exit_code, 0, "echo returned nonzero exit code");
assert!(stdout.contains(MARKER));
saw_end_with_output = true;
}
EventMsg::TaskComplete => break,
EventMsg::Error { message } => panic!("agent error during shell test: {message}"),
_ => (),
}
}
assert!(saw_begin, "ExecCommandBegin event missing");
assert!(
saw_end_with_output,
"ExecCommandEnd with expected output missing"
);
}