feat: initial import of Rust implementation of Codex CLI in codex-rs/ (#629)

As stated in `codex-rs/README.md`:

Today, Codex CLI is written in TypeScript and requires Node.js 22+ to
run it. For a number of users, this runtime requirement inhibits
adoption: they would be better served by a standalone executable. As
maintainers, we want Codex to run efficiently in a wide range of
environments with minimal overhead. We also want to take advantage of
operating system-specific APIs to provide better sandboxing, where
possible.

To that end, we are moving forward with a Rust implementation of Codex
CLI contained in this folder, which has the following benefits:

- The CLI compiles to small, standalone, platform-specific binaries.
- Can make direct, native calls to
[seccomp](https://man7.org/linux/man-pages/man2/seccomp.2.html) and
[landlock](https://man7.org/linux/man-pages/man7/landlock.7.html) in
order to support sandboxing on Linux.
- No runtime garbage collection, resulting in lower memory consumption
and better, more predictable performance.

Currently, the Rust implementation is materially behind the TypeScript
implementation in functionality, so continue to use the TypeScript
implmentation for the time being. We will publish native executables via
GitHub Releases as soon as we feel the Rust version is usable.
This commit is contained in:
Michael Bolin
2025-04-24 13:31:40 -07:00
committed by GitHub
parent acc4acc81e
commit 31d0d7a305
71 changed files with 14099 additions and 0 deletions

View File

@@ -0,0 +1,219 @@
//! Live integration tests that exercise the full [`Agent`] stack **against the real
//! OpenAI `/v1/responses` API**. These tests complement the lightweight mockbased
//! unit tests by verifying that the agent can drive an endtoend conversation,
//! stream incremental events, execute functioncall tool invocations and safely
//! chain multiple turns inside a single session the exact scenarios that have
//! historically been brittle.
//!
//! The live tests are **ignored by default** so CI remains deterministic and free
//! of external dependencies. Developers can optin locally with e.g.
//!
//! ```bash
//! OPENAI_API_KEY=sk... cargo test --test live_agent -- --ignored --nocapture
//! ```
//!
//! Make sure your key has access to the experimental *Responses* API and that
//! any billable usage is acceptable.
use std::time::Duration;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use codex_core::Codex;
use tokio::sync::Notify;
use tokio::time::timeout;
fn api_key_available() -> bool {
std::env::var("OPENAI_API_KEY").is_ok()
}
/// Helper that spawns a fresh Agent and sends the mandatory *ConfigureSession*
/// submission. The caller receives the constructed [`Agent`] plus the unique
/// submission id used for the initialization message.
async fn spawn_codex() -> Codex {
assert!(
api_key_available(),
"OPENAI_API_KEY must be set for live tests"
);
// Environment tweaks to keep the tests snappy and inexpensive while still
// exercising retry/robustness logic.
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "2");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "2");
let agent = Codex::spawn(std::sync::Arc::new(Notify::new())).unwrap();
agent
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: None,
instructions: None,
approval_policy: AskForApproval::OnFailure,
sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted,
},
})
.await
.expect("failed to submit init");
// Drain the SessionInitialized event so subsequent helper loops don't have
// to specialcase it.
loop {
let ev = timeout(Duration::from_secs(30), agent.next_event())
.await
.expect("timeout waiting for init event")
.expect("agent channel closed");
if matches!(ev.msg, EventMsg::SessionConfigured { .. }) {
break;
}
}
agent
}
/// Verifies that the agent streams incremental *AgentMessage* events **before**
/// emitting `TaskComplete` and that a second task inside the same session does
/// not get tripped up by a stale `previous_response_id`.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_streaming_and_prev_id_reset() {
if !api_key_available() {
eprintln!("skipping live_streaming_and_prev_id_reset OPENAI_API_KEY not set");
return;
}
let codex = spawn_codex().await;
// ---------- Task 1 ----------
codex
.submit(Submission {
id: "task1".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "Say the words 'stream test'".into(),
}],
},
})
.await
.unwrap();
let mut saw_message_before_complete = false;
loop {
let ev = timeout(Duration::from_secs(60), codex.next_event())
.await
.expect("timeout waiting for task1 events")
.expect("agent closed");
match ev.msg {
EventMsg::AgentMessage { .. } => saw_message_before_complete = true,
EventMsg::TaskComplete => break,
EventMsg::Error { message } => panic!("agent reported error in task1: {message}"),
_ => (),
}
}
assert!(
saw_message_before_complete,
"Agent did not stream any AgentMessage before TaskComplete"
);
// ---------- Task 2 (same session) ----------
codex
.submit(Submission {
id: "task2".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "Respond with exactly: second turn succeeded".into(),
}],
},
})
.await
.unwrap();
let mut got_expected = false;
loop {
let ev = timeout(Duration::from_secs(60), codex.next_event())
.await
.expect("timeout waiting for task2 events")
.expect("agent closed");
match &ev.msg {
EventMsg::AgentMessage { message } if message.contains("second turn succeeded") => {
got_expected = true;
}
EventMsg::TaskComplete => break,
EventMsg::Error { message } => panic!("agent reported error in task2: {message}"),
_ => (),
}
}
assert!(got_expected, "second task did not receive expected answer");
}
/// Exercises a *functioncall → shell execution* roundtrip by instructing the
/// model to run a harmless `echo` command. The test asserts that:
/// 1. the function call is executed (we see `ExecCommandBegin`/`End` events)
/// 2. the captured stdout reaches the client unchanged.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_shell_function_call() {
if !api_key_available() {
eprintln!("skipping live_shell_function_call OPENAI_API_KEY not set");
return;
}
let codex = spawn_codex().await;
const MARKER: &str = "codex_live_echo_ok";
codex
.submit(Submission {
id: "task_fn".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: format!(
"Use the shell function to run the command `echo {MARKER}` and no other commands."
),
}],
},
})
.await
.unwrap();
let mut saw_begin = false;
let mut saw_end_with_output = false;
loop {
let ev = timeout(Duration::from_secs(60), codex.next_event())
.await
.expect("timeout waiting for functioncall events")
.expect("agent closed");
match ev.msg {
EventMsg::ExecCommandBegin { command, .. } => {
assert_eq!(command, vec!["echo", MARKER]);
saw_begin = true;
}
EventMsg::ExecCommandEnd {
stdout, exit_code, ..
} => {
assert_eq!(exit_code, 0, "echo returned nonzero exit code");
assert!(stdout.contains(MARKER));
saw_end_with_output = true;
}
EventMsg::TaskComplete => break,
EventMsg::Error { message } => panic!("agent error during shell test: {message}"),
_ => (),
}
}
assert!(saw_begin, "ExecCommandBegin event missing");
assert!(
saw_end_with_output,
"ExecCommandEnd with expected output missing"
);
}

View File

@@ -0,0 +1,143 @@
//! Optional smoke tests that hit the real OpenAI /v1/responses endpoint. They are `#[ignore]` by
//! default so CI stays deterministic and free. Developers can run them locally with
//! `cargo test --test live_cli -- --ignored` provided they set a valid `OPENAI_API_KEY`.
use assert_cmd::prelude::*;
use predicates::prelude::*;
use std::process::Command;
use std::process::Stdio;
use tempfile::TempDir;
fn require_api_key() -> String {
std::env::var("OPENAI_API_KEY")
.expect("OPENAI_API_KEY env var not set — skip running live tests")
}
/// Helper that spawns the binary inside a TempDir with minimal flags. Returns (Assert, TempDir).
fn run_live(prompt: &str) -> (assert_cmd::assert::Assert, TempDir) {
use std::io::Read;
use std::io::Write;
use std::thread;
let dir = TempDir::new().unwrap();
// Build a plain `std::process::Command` so we have full control over the underlying stdio
// handles. `assert_cmd`s own `Command` wrapper always forces stdout/stderr to be piped
// internally which prevents us from streaming them live to the terminal (see its `spawn`
// implementation). Instead we configure the std `Command` ourselves, then later hand the
// resulting `Output` to `assert_cmd` for the familiar assertions.
let mut cmd = Command::cargo_bin("codex-rs").unwrap();
cmd.current_dir(dir.path());
cmd.env("OPENAI_API_KEY", require_api_key());
// We want three things at once:
// 1. live streaming of the childs stdout/stderr while the test is running
// 2. captured output so we can keep using assert_cmds `Assert` helpers
// 3. crossplatform behavior (best effort)
//
// To get that we:
// • set both stdout and stderr to `piped()` so we can read them programmatically
// • spawn a thread for each stream that copies bytes into two sinks:
// the parent process stdout/stderr for live visibility
// an inmemory buffer so we can pass it to `assert_cmd` later
// Pass the prompt through the `--` separator so the CLI knows when user input ends.
cmd.arg("--allow-no-git-exec")
.arg("-v")
.arg("--")
.arg(prompt);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd.spawn().expect("failed to spawn codex-rs");
// Send the terminating newline so Session::run exits after the first turn.
child
.stdin
.as_mut()
.expect("child stdin unavailable")
.write_all(b"\n")
.expect("failed to write to child stdin");
// Helper that tees a ChildStdout/ChildStderr into both the parents stdio and a Vec<u8>.
fn tee<R: Read + Send + 'static>(
mut reader: R,
mut writer: impl Write + Send + 'static,
) -> thread::JoinHandle<Vec<u8>> {
thread::spawn(move || {
let mut buf = Vec::new();
let mut chunk = [0u8; 4096];
loop {
match reader.read(&mut chunk) {
Ok(0) => break,
Ok(n) => {
writer.write_all(&chunk[..n]).ok();
writer.flush().ok();
buf.extend_from_slice(&chunk[..n]);
}
Err(_) => break,
}
}
buf
})
}
let stdout_handle = tee(
child.stdout.take().expect("child stdout"),
std::io::stdout(),
);
let stderr_handle = tee(
child.stderr.take().expect("child stderr"),
std::io::stderr(),
);
let status = child.wait().expect("failed to wait on child");
let stdout = stdout_handle.join().expect("stdout thread panicked");
let stderr = stderr_handle.join().expect("stderr thread panicked");
let output = std::process::Output {
status,
stdout,
stderr,
};
(output.assert(), dir)
}
#[ignore]
#[test]
fn live_create_file_hello_txt() {
if std::env::var("OPENAI_API_KEY").is_err() {
eprintln!("skipping live_create_file_hello_txt OPENAI_API_KEY not set");
return;
}
let (assert, dir) = run_live("Use the shell tool with the apply_patch command to create a file named hello.txt containing the text 'hello'.");
assert.success();
let path = dir.path().join("hello.txt");
assert!(path.exists(), "hello.txt was not created by the model");
let contents = std::fs::read_to_string(path).unwrap();
assert_eq!(contents.trim(), "hello");
}
#[ignore]
#[test]
fn live_print_working_directory() {
if std::env::var("OPENAI_API_KEY").is_err() {
eprintln!("skipping live_print_working_directory OPENAI_API_KEY not set");
return;
}
let (assert, dir) = run_live("Print the current working directory using the shell function.");
assert
.success()
.stdout(predicate::str::contains(dir.path().to_string_lossy()));
}

View File

@@ -0,0 +1,156 @@
use std::time::Duration;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use codex_core::Codex;
use serde_json::Value;
use tokio::time::timeout;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::Match;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::ResponseTemplate;
/// Matcher asserting that JSON body has NO `previous_response_id` field.
struct NoPrevId;
impl Match for NoPrevId {
fn matches(&self, req: &Request) -> bool {
serde_json::from_slice::<Value>(&req.body)
.map(|v| v.get("previous_response_id").is_none())
.unwrap_or(false)
}
}
/// Matcher asserting that JSON body HAS a `previous_response_id` field.
struct HasPrevId;
impl Match for HasPrevId {
fn matches(&self, req: &Request) -> bool {
serde_json::from_slice::<Value>(&req.body)
.map(|v| v.get("previous_response_id").is_some())
.unwrap_or(false)
}
}
/// Build minimal SSE stream with completed marker.
fn sse_completed(id: &str) -> String {
format!(
"event: response.completed\n\
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{}\",\"output\":[]}}}}\n\n\n",
id
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn keeps_previous_response_id_between_tasks() {
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.and(NoPrevId)
.respond_with(first)
.expect(1)
.mount(&server)
.await;
// Second request MUST include `previous_response_id`.
let second = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp2"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.and(HasPrevId)
.respond_with(second)
.expect(1)
.mount(&server)
.await;
// Environment
std::env::set_var("OPENAI_API_KEY", "test-key");
std::env::set_var("OPENAI_API_BASE", server.uri());
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0");
let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap();
// Init session
codex
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: None,
instructions: None,
approval_policy: AskForApproval::OnFailure,
sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted,
},
})
.await
.unwrap();
// drain init event
let _ = codex.next_event().await.unwrap();
// Task 1 triggers first request (no previous_response_id)
codex
.submit(Submission {
id: "task1".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
},
})
.await
.unwrap();
// Wait for TaskComplete
loop {
let ev = timeout(Duration::from_secs(1), codex.next_event())
.await
.unwrap()
.unwrap();
if matches!(ev.msg, codex_core::protocol::EventMsg::TaskComplete) {
break;
}
}
// Task 2 should include `previous_response_id` (triggers second request)
codex
.submit(Submission {
id: "task2".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "again".into(),
}],
},
})
.await
.unwrap();
// Wait for TaskComplete or error
loop {
let ev = timeout(Duration::from_secs(1), codex.next_event())
.await
.unwrap()
.unwrap();
match ev.msg {
codex_core::protocol::EventMsg::TaskComplete => break,
codex_core::protocol::EventMsg::Error { message } => {
panic!("unexpected error: {message}")
}
_ => (),
}
}
}

View File

@@ -0,0 +1,109 @@
//! Verifies that the agent retries when the SSE stream terminates before
//! delivering a `response.completed` event.
use std::time::Duration;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use codex_core::Codex;
use tokio::time::timeout;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
fn sse_incomplete() -> String {
// Only a single line; missing the completed event.
"event: response.output_item.done\n\n".to_string()
}
fn sse_completed(id: &str) -> String {
format!(
"event: response.completed\n\
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{}\",\"output\":[]}}}}\n\n\n",
id
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retries_on_early_close() {
let server = MockServer::start().await;
struct SeqResponder;
impl Respond for SeqResponder {
fn respond(&self, _: &Request) -> ResponseTemplate {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
static CALLS: AtomicUsize = AtomicUsize::new(0);
let n = CALLS.fetch_add(1, Ordering::SeqCst);
if n == 0 {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_incomplete(), "text/event-stream")
} else {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp_ok"), "text/event-stream")
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {})
.expect(2)
.mount(&server)
.await;
// Environment
std::env::set_var("OPENAI_API_KEY", "test-key");
std::env::set_var("OPENAI_API_BASE", server.uri());
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "1");
std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000");
let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap();
codex
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: None,
instructions: None,
approval_policy: AskForApproval::OnFailure,
sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted,
},
})
.await
.unwrap();
let _ = codex.next_event().await.unwrap();
codex
.submit(Submission {
id: "task".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
},
})
.await
.unwrap();
// Wait until TaskComplete (should succeed after retry).
loop {
let ev = timeout(Duration::from_secs(10), codex.next_event())
.await
.unwrap()
.unwrap();
if matches!(ev.msg, codex_core::protocol::EventMsg::TaskComplete) {
break;
}
}
}