diff --git a/codex-rs/cli/src/proto.rs b/codex-rs/cli/src/proto.rs index 7c48b013..c1dbce8e 100644 --- a/codex-rs/cli/src/proto.rs +++ b/codex-rs/cli/src/proto.rs @@ -1,7 +1,10 @@ use std::io::IsTerminal; +use std::sync::Arc; use clap::Parser; use codex_core::Codex; +use codex_core::config::Config; +use codex_core::config::ConfigOverrides; use codex_core::protocol::Submission; use codex_core::util::notify_on_sigint; use tokio::io::AsyncBufReadExt; @@ -21,8 +24,10 @@ pub async fn run_main(_opts: ProtoCli) -> anyhow::Result<()> { .with_writer(std::io::stderr) .init(); + let config = Config::load_with_overrides(ConfigOverrides::default())?; let ctrl_c = notify_on_sigint(); - let codex = Codex::spawn(ctrl_c.clone())?; + let (codex, _init_id) = Codex::spawn(config, ctrl_c.clone()).await?; + let codex = Arc::new(codex); // Task that reads JSON lines from stdin and forwards to Submission Queue let sq_fut = { @@ -48,7 +53,7 @@ pub async fn run_main(_opts: ProtoCli) -> anyhow::Result<()> { } match serde_json::from_str::(line) { Ok(sub) => { - if let Err(e) = codex.submit(sub).await { + if let Err(e) = codex.submit_with_id(sub).await { error!("{e:#}"); break; } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 3a1ce6fb..7749ee7d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4,6 +4,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex; +use std::sync::atomic::AtomicU64; use std::time::Duration; use anyhow::Context; @@ -31,7 +32,6 @@ use crate::client::ModelClient; use crate::client::Prompt; use crate::client::ResponseEvent; use crate::config::Config; -use crate::config::ConfigOverrides; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::exec::ExecParams; @@ -66,25 +66,59 @@ use crate::zdr_transcript::ZdrTranscript; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. -#[derive(Clone)] pub struct Codex { + next_id: AtomicU64, tx_sub: Sender, rx_event: Receiver, } impl Codex { - pub fn spawn(ctrl_c: Arc) -> CodexResult { + /// Spawn a new [`Codex`] and initialize the session. Returns the instance + /// of `Codex` and the ID of the `SessionInitialized` event that was + /// submitted to start the session. + pub async fn spawn(config: Config, ctrl_c: Arc) -> CodexResult<(Codex, String)> { let (tx_sub, rx_sub) = async_channel::bounded(64); let (tx_event, rx_event) = async_channel::bounded(64); - tokio::spawn(submission_loop(rx_sub, tx_event, ctrl_c)); - Ok(Self { tx_sub, rx_event }) + let configure_session = Op::ConfigureSession { + model: config.model.clone(), + instructions: config.instructions.clone(), + approval_policy: config.approval_policy, + sandbox_policy: config.sandbox_policy.clone(), + disable_response_storage: config.disable_response_storage, + notify: config.notify.clone(), + cwd: config.cwd.clone(), + }; + + tokio::spawn(submission_loop(config, rx_sub, tx_event, ctrl_c)); + let codex = Codex { + next_id: AtomicU64::new(0), + tx_sub, + rx_event, + }; + let init_id = codex.submit(configure_session).await?; + + Ok((codex, init_id)) } - pub async fn submit(&self, sub: Submission) -> CodexResult<()> { + /// Submit the `op` wrapped in a `Submission` with a unique ID. + pub async fn submit(&self, op: Op) -> CodexResult { + let id = self + .next_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + .to_string(); + let sub = Submission { id: id.clone(), op }; + self.submit_with_id(sub).await?; + Ok(id) + } + + /// Use sparingly: prefer `submit()` so Codex is responsible for generating + /// unique IDs for each submission. + pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { self.tx_sub .send(sub) .await - .map_err(|_| CodexErr::InternalAgentDied) + .map_err(|_| CodexErr::InternalAgentDied)?; + Ok(()) } pub async fn next_event(&self) -> CodexResult { @@ -424,6 +458,7 @@ impl AgentTask { } async fn submission_loop( + config: Config, rx_sub: Receiver, tx_event: Sender, ctrl_c: Arc, @@ -511,16 +546,6 @@ async fn submission_loop( let writable_roots = Mutex::new(get_writable_roots(&cwd)); - // Load config to initialize the MCP connection manager. - let config = match Config::load_with_overrides(ConfigOverrides::default()) { - Ok(cfg) => cfg, - Err(e) => { - error!("Failed to load config for MCP servers: {e:#}"); - // Fall back to empty server map so the session can still proceed. - Config::load_default_config_for_test() - } - }; - let mcp_connection_manager = match McpConnectionManager::new(config.mcp_servers.clone()).await { Ok(mgr) => mgr, diff --git a/codex-rs/core/src/codex_wrapper.rs b/codex-rs/core/src/codex_wrapper.rs index b27cab71..431b580c 100644 --- a/codex-rs/core/src/codex_wrapper.rs +++ b/codex-rs/core/src/codex_wrapper.rs @@ -1,34 +1,20 @@ use std::sync::Arc; -use std::sync::atomic::AtomicU64; use crate::Codex; use crate::config::Config; use crate::protocol::Event; use crate::protocol::EventMsg; -use crate::protocol::Op; -use crate::protocol::Submission; use crate::util::notify_on_sigint; use tokio::sync::Notify; -/// Spawn a new [`Codex`] and initialise the session. +/// Spawn a new [`Codex`] and initialize the session. /// /// Returns the wrapped [`Codex`] **and** the `SessionInitialized` event that /// is received as a response to the initial `ConfigureSession` submission so /// that callers can surface the information to the UI. -pub async fn init_codex(config: Config) -> anyhow::Result<(CodexWrapper, Event, Arc)> { +pub async fn init_codex(config: Config) -> anyhow::Result<(Codex, Event, Arc)> { let ctrl_c = notify_on_sigint(); - let codex = CodexWrapper::new(Codex::spawn(ctrl_c.clone())?); - let init_id = codex - .submit(Op::ConfigureSession { - model: config.model.clone(), - instructions: config.instructions.clone(), - approval_policy: config.approval_policy, - sandbox_policy: config.sandbox_policy, - disable_response_storage: config.disable_response_storage, - notify: config.notify.clone(), - cwd: config.cwd.clone(), - }) - .await?; + let (codex, init_id) = Codex::spawn(config, ctrl_c.clone()).await?; // The first event must be `SessionInitialized`. Validate and forward it to // the caller so that they can display it in the conversation history. @@ -49,31 +35,3 @@ pub async fn init_codex(config: Config) -> anyhow::Result<(CodexWrapper, Event, Ok((codex, event, ctrl_c)) } - -pub struct CodexWrapper { - next_id: AtomicU64, - codex: Codex, -} - -impl CodexWrapper { - fn new(codex: Codex) -> Self { - Self { - next_id: AtomicU64::new(0), - codex, - } - } - - /// Returns the id of the Submission. - pub async fn submit(&self, op: Op) -> crate::error::Result { - let id = self - .next_id - .fetch_add(1, std::sync::atomic::Ordering::SeqCst) - .to_string(); - self.codex.submit(Submission { id: id.clone(), op }).await?; - Ok(id) - } - - pub async fn next_event(&self) -> crate::error::Result { - self.codex.next_event().await - } -} diff --git a/codex-rs/core/tests/live_agent.rs b/codex-rs/core/tests/live_agent.rs index 55476ecf..6d7d6085 100644 --- a/codex-rs/core/tests/live_agent.rs +++ b/codex-rs/core/tests/live_agent.rs @@ -22,8 +22,6 @@ use codex_core::config::Config; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; -use codex_core::protocol::SandboxPolicy; -use codex_core::protocol::Submission; use tokio::sync::Notify; use tokio::time::timeout; @@ -54,36 +52,10 @@ async fn spawn_codex() -> Codex { std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "2"); } - let agent = Codex::spawn(std::sync::Arc::new(Notify::new())).unwrap(); - let config = Config::load_default_config_for_test(); - agent - .submit(Submission { - id: "init".into(), - op: Op::ConfigureSession { - model: config.model, - instructions: None, - approval_policy: config.approval_policy, - sandbox_policy: SandboxPolicy::new_read_only_policy(), - disable_response_storage: false, - notify: None, - cwd: std::env::current_dir().unwrap(), - }, - }) + let (agent, _init_id) = Codex::spawn(config, std::sync::Arc::new(Notify::new())) .await - .expect("failed to submit init"); - - // Drain the SessionInitialized event so subsequent helper loops don't have - // to special‑case it. - loop { - let ev = timeout(Duration::from_secs(30), agent.next_event()) - .await - .expect("timeout waiting for init event") - .expect("agent channel closed"); - if matches!(ev.msg, EventMsg::SessionConfigured { .. }) { - break; - } - } + .unwrap(); agent } @@ -103,13 +75,10 @@ async fn live_streaming_and_prev_id_reset() { // ---------- Task 1 ---------- codex - .submit(Submission { - id: "task1".into(), - op: Op::UserInput { - items: vec![InputItem::Text { - text: "Say the words 'stream test'".into(), - }], - }, + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "Say the words 'stream test'".into(), + }], }) .await .unwrap(); @@ -136,13 +105,10 @@ async fn live_streaming_and_prev_id_reset() { // ---------- Task 2 (same session) ---------- codex - .submit(Submission { - id: "task2".into(), - op: Op::UserInput { - items: vec![InputItem::Text { - text: "Respond with exactly: second turn succeeded".into(), - }], - }, + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "Respond with exactly: second turn succeeded".into(), + }], }) .await .unwrap(); @@ -184,15 +150,12 @@ async fn live_shell_function_call() { const MARKER: &str = "codex_live_echo_ok"; codex - .submit(Submission { - id: "task_fn".into(), - op: Op::UserInput { - items: vec![InputItem::Text { - text: format!( - "Use the shell function to run the command `echo {MARKER}` and no other commands." - ), - }], - }, + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: format!( + "Use the shell function to run the command `echo {MARKER}` and no other commands." + ), + }], }) .await .unwrap(); diff --git a/codex-rs/core/tests/previous_response_id.rs b/codex-rs/core/tests/previous_response_id.rs index 5487b5e3..de1b1b2b 100644 --- a/codex-rs/core/tests/previous_response_id.rs +++ b/codex-rs/core/tests/previous_response_id.rs @@ -4,8 +4,6 @@ use codex_core::Codex; use codex_core::config::Config; use codex_core::protocol::InputItem; use codex_core::protocol::Op; -use codex_core::protocol::SandboxPolicy; -use codex_core::protocol::Submission; use serde_json::Value; use tokio::time::timeout; use wiremock::Match; @@ -88,37 +86,17 @@ async fn keeps_previous_response_id_between_tasks() { std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0"); } - let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap(); - // Init session let config = Config::load_default_config_for_test(); - codex - .submit(Submission { - id: "init".into(), - op: Op::ConfigureSession { - model: config.model, - instructions: None, - approval_policy: config.approval_policy, - sandbox_policy: SandboxPolicy::new_read_only_policy(), - disable_response_storage: false, - notify: None, - cwd: std::env::current_dir().unwrap(), - }, - }) - .await - .unwrap(); - // drain init event - let _ = codex.next_event().await.unwrap(); + let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); + let (codex, _init_id) = Codex::spawn(config, ctrl_c.clone()).await.unwrap(); // Task 1 – triggers first request (no previous_response_id) codex - .submit(Submission { - id: "task1".into(), - op: Op::UserInput { - items: vec![InputItem::Text { - text: "hello".into(), - }], - }, + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], }) .await .unwrap(); @@ -136,13 +114,10 @@ async fn keeps_previous_response_id_between_tasks() { // Task 2 – should include `previous_response_id` (triggers second request) codex - .submit(Submission { - id: "task2".into(), - op: Op::UserInput { - items: vec![InputItem::Text { - text: "again".into(), - }], - }, + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "again".into(), + }], }) .await .unwrap(); diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index 608516a0..061f9b2f 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -7,8 +7,6 @@ use codex_core::Codex; use codex_core::config::Config; use codex_core::protocol::InputItem; use codex_core::protocol::Op; -use codex_core::protocol::SandboxPolicy; -use codex_core::protocol::Submission; use tokio::time::timeout; use wiremock::Mock; use wiremock::MockServer; @@ -77,34 +75,15 @@ async fn retries_on_early_close() { std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000"); } - let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap(); - + let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); let config = Config::load_default_config_for_test(); - codex - .submit(Submission { - id: "init".into(), - op: Op::ConfigureSession { - model: config.model, - instructions: None, - approval_policy: config.approval_policy, - sandbox_policy: SandboxPolicy::new_read_only_policy(), - disable_response_storage: false, - notify: None, - cwd: std::env::current_dir().unwrap(), - }, - }) - .await - .unwrap(); - let _ = codex.next_event().await.unwrap(); + let (codex, _init_id) = Codex::spawn(config, ctrl_c).await.unwrap(); codex - .submit(Submission { - id: "task".into(), - op: Op::UserInput { - items: vec![InputItem::Text { - text: "hello".into(), - }], - }, + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], }) .await .unwrap();