fix: creating an instance of Codex requires a Config (#859)

I discovered that I accidentally introduced a change in
https://github.com/openai/codex/pull/829 where we load a fresh `Config`
in the middle of `codex.rs`:


c3e10e180a/codex-rs/core/src/codex.rs (L515-L522)

This is not good because the `Config` could differ from the one that has
the user's overrides specified from the CLI. Also, in unit tests, it
means the `Config` was picking up my personal settings as opposed to
using a vanilla config, which was problematic.

This PR cleans things up by moving the common case where
`Op::ConfigureSession` is derived from `Config` (originally done in
`codex_wrapper.rs`) and making it the standard way to initialize `Codex`
by putting it in `Codex::spawn()`. Note this also eliminates quite a bit
of boilerplate from the tests and relieves the caller of the
responsibility of minting out unique IDs when invoking `submit()`.
This commit is contained in:
Michael Bolin
2025-05-07 16:33:28 -07:00
committed by GitHub
parent c3e10e180a
commit cfe50c7107
6 changed files with 84 additions and 179 deletions

View File

@@ -1,7 +1,10 @@
use std::io::IsTerminal;
use std::sync::Arc;
use clap::Parser;
use codex_core::Codex;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::protocol::Submission;
use codex_core::util::notify_on_sigint;
use tokio::io::AsyncBufReadExt;
@@ -21,8 +24,10 @@ pub async fn run_main(_opts: ProtoCli) -> anyhow::Result<()> {
.with_writer(std::io::stderr)
.init();
let config = Config::load_with_overrides(ConfigOverrides::default())?;
let ctrl_c = notify_on_sigint();
let codex = Codex::spawn(ctrl_c.clone())?;
let (codex, _init_id) = Codex::spawn(config, ctrl_c.clone()).await?;
let codex = Arc::new(codex);
// Task that reads JSON lines from stdin and forwards to Submission Queue
let sq_fut = {
@@ -48,7 +53,7 @@ pub async fn run_main(_opts: ProtoCli) -> anyhow::Result<()> {
}
match serde_json::from_str::<Submission>(line) {
Ok(sub) => {
if let Err(e) = codex.submit(sub).await {
if let Err(e) = codex.submit_with_id(sub).await {
error!("{e:#}");
break;
}

View File

@@ -4,6 +4,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use anyhow::Context;
@@ -31,7 +32,6 @@ use crate::client::ModelClient;
use crate::client::Prompt;
use crate::client::ResponseEvent;
use crate::config::Config;
use crate::config::ConfigOverrides;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::exec::ExecParams;
@@ -66,25 +66,59 @@ use crate::zdr_transcript::ZdrTranscript;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
#[derive(Clone)]
pub struct Codex {
next_id: AtomicU64,
tx_sub: Sender<Submission>,
rx_event: Receiver<Event>,
}
impl Codex {
pub fn spawn(ctrl_c: Arc<Notify>) -> CodexResult<Self> {
/// Spawn a new [`Codex`] and initialize the session. Returns the instance
/// of `Codex` and the ID of the `SessionInitialized` event that was
/// submitted to start the session.
pub async fn spawn(config: Config, ctrl_c: Arc<Notify>) -> CodexResult<(Codex, String)> {
let (tx_sub, rx_sub) = async_channel::bounded(64);
let (tx_event, rx_event) = async_channel::bounded(64);
tokio::spawn(submission_loop(rx_sub, tx_event, ctrl_c));
Ok(Self { tx_sub, rx_event })
let configure_session = Op::ConfigureSession {
model: config.model.clone(),
instructions: config.instructions.clone(),
approval_policy: config.approval_policy,
sandbox_policy: config.sandbox_policy.clone(),
disable_response_storage: config.disable_response_storage,
notify: config.notify.clone(),
cwd: config.cwd.clone(),
};
tokio::spawn(submission_loop(config, rx_sub, tx_event, ctrl_c));
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event,
};
let init_id = codex.submit(configure_session).await?;
Ok((codex, init_id))
}
pub async fn submit(&self, sub: Submission) -> CodexResult<()> {
/// Submit the `op` wrapped in a `Submission` with a unique ID.
pub async fn submit(&self, op: Op) -> CodexResult<String> {
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
.to_string();
let sub = Submission { id: id.clone(), op };
self.submit_with_id(sub).await?;
Ok(id)
}
/// Use sparingly: prefer `submit()` so Codex is responsible for generating
/// unique IDs for each submission.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
self.tx_sub
.send(sub)
.await
.map_err(|_| CodexErr::InternalAgentDied)
.map_err(|_| CodexErr::InternalAgentDied)?;
Ok(())
}
pub async fn next_event(&self) -> CodexResult<Event> {
@@ -424,6 +458,7 @@ impl AgentTask {
}
async fn submission_loop(
config: Config,
rx_sub: Receiver<Submission>,
tx_event: Sender<Event>,
ctrl_c: Arc<Notify>,
@@ -511,16 +546,6 @@ async fn submission_loop(
let writable_roots = Mutex::new(get_writable_roots(&cwd));
// Load config to initialize the MCP connection manager.
let config = match Config::load_with_overrides(ConfigOverrides::default()) {
Ok(cfg) => cfg,
Err(e) => {
error!("Failed to load config for MCP servers: {e:#}");
// Fall back to empty server map so the session can still proceed.
Config::load_default_config_for_test()
}
};
let mcp_connection_manager =
match McpConnectionManager::new(config.mcp_servers.clone()).await {
Ok(mgr) => mgr,

View File

@@ -1,34 +1,20 @@
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::Codex;
use crate::config::Config;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::Op;
use crate::protocol::Submission;
use crate::util::notify_on_sigint;
use tokio::sync::Notify;
/// Spawn a new [`Codex`] and initialise the session.
/// Spawn a new [`Codex`] and initialize the session.
///
/// Returns the wrapped [`Codex`] **and** the `SessionInitialized` event that
/// is received as a response to the initial `ConfigureSession` submission so
/// that callers can surface the information to the UI.
pub async fn init_codex(config: Config) -> anyhow::Result<(CodexWrapper, Event, Arc<Notify>)> {
pub async fn init_codex(config: Config) -> anyhow::Result<(Codex, Event, Arc<Notify>)> {
let ctrl_c = notify_on_sigint();
let codex = CodexWrapper::new(Codex::spawn(ctrl_c.clone())?);
let init_id = codex
.submit(Op::ConfigureSession {
model: config.model.clone(),
instructions: config.instructions.clone(),
approval_policy: config.approval_policy,
sandbox_policy: config.sandbox_policy,
disable_response_storage: config.disable_response_storage,
notify: config.notify.clone(),
cwd: config.cwd.clone(),
})
.await?;
let (codex, init_id) = Codex::spawn(config, ctrl_c.clone()).await?;
// The first event must be `SessionInitialized`. Validate and forward it to
// the caller so that they can display it in the conversation history.
@@ -49,31 +35,3 @@ pub async fn init_codex(config: Config) -> anyhow::Result<(CodexWrapper, Event,
Ok((codex, event, ctrl_c))
}
pub struct CodexWrapper {
next_id: AtomicU64,
codex: Codex,
}
impl CodexWrapper {
fn new(codex: Codex) -> Self {
Self {
next_id: AtomicU64::new(0),
codex,
}
}
/// Returns the id of the Submission.
pub async fn submit(&self, op: Op) -> crate::error::Result<String> {
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
.to_string();
self.codex.submit(Submission { id: id.clone(), op }).await?;
Ok(id)
}
pub async fn next_event(&self) -> crate::error::Result<Event> {
self.codex.next_event().await
}
}

View File

@@ -22,8 +22,6 @@ use codex_core::config::Config;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use tokio::sync::Notify;
use tokio::time::timeout;
@@ -54,36 +52,10 @@ async fn spawn_codex() -> Codex {
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "2");
}
let agent = Codex::spawn(std::sync::Arc::new(Notify::new())).unwrap();
let config = Config::load_default_config_for_test();
agent
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: config.model,
instructions: None,
approval_policy: config.approval_policy,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
disable_response_storage: false,
notify: None,
cwd: std::env::current_dir().unwrap(),
},
})
let (agent, _init_id) = Codex::spawn(config, std::sync::Arc::new(Notify::new()))
.await
.expect("failed to submit init");
// Drain the SessionInitialized event so subsequent helper loops don't have
// to specialcase it.
loop {
let ev = timeout(Duration::from_secs(30), agent.next_event())
.await
.expect("timeout waiting for init event")
.expect("agent channel closed");
if matches!(ev.msg, EventMsg::SessionConfigured { .. }) {
break;
}
}
.unwrap();
agent
}
@@ -103,13 +75,10 @@ async fn live_streaming_and_prev_id_reset() {
// ---------- Task 1 ----------
codex
.submit(Submission {
id: "task1".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "Say the words 'stream test'".into(),
}],
},
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "Say the words 'stream test'".into(),
}],
})
.await
.unwrap();
@@ -136,13 +105,10 @@ async fn live_streaming_and_prev_id_reset() {
// ---------- Task 2 (same session) ----------
codex
.submit(Submission {
id: "task2".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "Respond with exactly: second turn succeeded".into(),
}],
},
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "Respond with exactly: second turn succeeded".into(),
}],
})
.await
.unwrap();
@@ -184,15 +150,12 @@ async fn live_shell_function_call() {
const MARKER: &str = "codex_live_echo_ok";
codex
.submit(Submission {
id: "task_fn".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: format!(
"Use the shell function to run the command `echo {MARKER}` and no other commands."
),
}],
},
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: format!(
"Use the shell function to run the command `echo {MARKER}` and no other commands."
),
}],
})
.await
.unwrap();

View File

@@ -4,8 +4,6 @@ use codex_core::Codex;
use codex_core::config::Config;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use serde_json::Value;
use tokio::time::timeout;
use wiremock::Match;
@@ -88,37 +86,17 @@ async fn keeps_previous_response_id_between_tasks() {
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0");
}
let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap();
// Init session
let config = Config::load_default_config_for_test();
codex
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: config.model,
instructions: None,
approval_policy: config.approval_policy,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
disable_response_storage: false,
notify: None,
cwd: std::env::current_dir().unwrap(),
},
})
.await
.unwrap();
// drain init event
let _ = codex.next_event().await.unwrap();
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
let (codex, _init_id) = Codex::spawn(config, ctrl_c.clone()).await.unwrap();
// Task 1 triggers first request (no previous_response_id)
codex
.submit(Submission {
id: "task1".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
},
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
@@ -136,13 +114,10 @@ async fn keeps_previous_response_id_between_tasks() {
// Task 2 should include `previous_response_id` (triggers second request)
codex
.submit(Submission {
id: "task2".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "again".into(),
}],
},
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "again".into(),
}],
})
.await
.unwrap();

View File

@@ -7,8 +7,6 @@ use codex_core::Codex;
use codex_core::config::Config;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
@@ -77,34 +75,15 @@ async fn retries_on_early_close() {
std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000");
}
let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap();
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
let config = Config::load_default_config_for_test();
codex
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: config.model,
instructions: None,
approval_policy: config.approval_policy,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
disable_response_storage: false,
notify: None,
cwd: std::env::current_dir().unwrap(),
},
})
.await
.unwrap();
let _ = codex.next_event().await.unwrap();
let (codex, _init_id) = Codex::spawn(config, ctrl_c).await.unwrap();
codex
.submit(Submission {
id: "task".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
},
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();