Files
llmx/codex-rs/core/tests/stream_no_completed.rs
Michael Bolin 31d0d7a305 feat: initial import of Rust implementation of Codex CLI in codex-rs/ (#629)
As stated in `codex-rs/README.md`:

Today, Codex CLI is written in TypeScript and requires Node.js 22+ to
run it. For a number of users, this runtime requirement inhibits
adoption: they would be better served by a standalone executable. As
maintainers, we want Codex to run efficiently in a wide range of
environments with minimal overhead. We also want to take advantage of
operating system-specific APIs to provide better sandboxing, where
possible.

To that end, we are moving forward with a Rust implementation of Codex
CLI contained in this folder, which has the following benefits:

- The CLI compiles to small, standalone, platform-specific binaries.
- Can make direct, native calls to
[seccomp](https://man7.org/linux/man-pages/man2/seccomp.2.html) and
[landlock](https://man7.org/linux/man-pages/man7/landlock.7.html) in
order to support sandboxing on Linux.
- No runtime garbage collection, resulting in lower memory consumption
and better, more predictable performance.

Currently, the Rust implementation is materially behind the TypeScript
implementation in functionality, so continue to use the TypeScript
implmentation for the time being. We will publish native executables via
GitHub Releases as soon as we feel the Rust version is usable.
2025-04-24 13:31:40 -07:00

110 lines
3.3 KiB
Rust

//! Verifies that the agent retries when the SSE stream terminates before
//! delivering a `response.completed` event.
use std::time::Duration;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use codex_core::Codex;
use tokio::time::timeout;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
fn sse_incomplete() -> String {
// Only a single line; missing the completed event.
"event: response.output_item.done\n\n".to_string()
}
fn sse_completed(id: &str) -> String {
format!(
"event: response.completed\n\
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{}\",\"output\":[]}}}}\n\n\n",
id
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retries_on_early_close() {
let server = MockServer::start().await;
struct SeqResponder;
impl Respond for SeqResponder {
fn respond(&self, _: &Request) -> ResponseTemplate {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
static CALLS: AtomicUsize = AtomicUsize::new(0);
let n = CALLS.fetch_add(1, Ordering::SeqCst);
if n == 0 {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_incomplete(), "text/event-stream")
} else {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp_ok"), "text/event-stream")
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {})
.expect(2)
.mount(&server)
.await;
// Environment
std::env::set_var("OPENAI_API_KEY", "test-key");
std::env::set_var("OPENAI_API_BASE", server.uri());
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "1");
std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000");
let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap();
codex
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: None,
instructions: None,
approval_policy: AskForApproval::OnFailure,
sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted,
},
})
.await
.unwrap();
let _ = codex.next_event().await.unwrap();
codex
.submit(Submission {
id: "task".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
},
})
.await
.unwrap();
// Wait until TaskComplete (should succeed after retry).
loop {
let ev = timeout(Duration::from_secs(10), codex.next_event())
.await
.unwrap()
.unwrap();
if matches!(ev.msg, codex_core::protocol::EventMsg::TaskComplete) {
break;
}
}
}