From cf7a7e63a376a90a51bd793319b5a323ab62834d Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Thu, 14 Aug 2025 09:55:28 -0700 Subject: [PATCH] exploration: create Session as part of Codex::spawn() (#2291) Historically, `Codex::spawn()` would create the instance of `Codex` and enforce, by construction, that `Op::ConfigureSession` was the first `Op` submitted via `submit()`. Then over in `submission_loop()`, it would handle the case for taking the parameters of `Op::ConfigureSession` and turning it into a `Session`. This approach has two challenges from a state management perspective: https://github.com/openai/codex/blob/f968a1327ad39a7786759ea8f1d1c088fe41e91b/codex-rs/core/src/codex.rs#L718 - The local `sess` variable in `submission_loop()` has to be `mut` and `Option>` because it is not invariant that a `Session` is present for the lifetime of the loop, so there is a lot of logic to deal with the case where `sess` is `None` (e.g., the `send_no_session_event` function and all of its callsites). - `submission_loop()` is written in such a way that `Op::ConfigureSession` could be observed multiple times, but in practice, it is only observed exactly once at the start of the loop. In this PR, we try to simplify the state management by _removing_ the `Op::ConfigureSession` enum variant and constructing the `Session` as part of `Codex::spawn()` so that it can be passed to `submission_loop()` as `Arc`. The original logic from the `Op::ConfigureSession` has largely been moved to the new `Session::new()` constructor. --- Incidentally, I also noticed that the handling of `Op::ConfigureSession` can result in events being dispatched in addition to `EventMsg::SessionConfigured`, as an `EventMsg::Error` is created for every MCP initialization error, so it is important to preserve that behavior: https://github.com/openai/codex/blob/f968a1327ad39a7786759ea8f1d1c088fe41e91b/codex-rs/core/src/codex.rs#L901-L916 Though admittedly, I believe this does not play nice with #2264, as these error messages will likely be dispatched before the client has a chance to call `addConversationListener`, so we likely need to make it so `newConversation` automatically creates the subscription, but we must also guarantee that the "ack" from `newConversation` is returned before any other conversation-related notifications are sent so the client knows what `conversation_id` to match on. --- codex-rs/core/src/codex.rs | 547 ++++++++++------------ codex-rs/core/src/conversation_manager.rs | 4 +- codex-rs/core/src/protocol.rs | 49 -- 3 files changed, 260 insertions(+), 340 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ff726a24..2ae1db5b 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -30,6 +30,7 @@ use tracing::trace; use tracing::warn; use uuid::Uuid; +use crate::ModelProviderInfo; use crate::apply_patch::ApplyPatchExec; use crate::apply_patch::CODEX_APPLY_PATCH_ARG1; use crate::apply_patch::InternalApplyPatchInvocation; @@ -41,6 +42,8 @@ use crate::client_common::EnvironmentContext; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; use crate::config::Config; +use crate::config_types::ReasoningEffort as ReasoningEffortConfig; +use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::config_types::ShellEnvironmentPolicy; use crate::conversation_history::ConversationHistory; use crate::error::CodexErr; @@ -118,22 +121,25 @@ pub struct Codex { /// unique session id. pub struct CodexSpawnOk { pub codex: Codex, - pub init_id: String, pub session_id: Uuid, } +pub(crate) const INITIAL_SUBMIT_ID: &str = ""; + impl Codex { /// Spawn a new [`Codex`] and initialize the session. pub async fn spawn(config: Config, auth: Option) -> CodexResult { - // experimental resume path (undocumented) - let resume_path = config.experimental_resume.clone(); - info!("resume_path: {resume_path:?}"); let (tx_sub, rx_sub) = async_channel::bounded(64); let (tx_event, rx_event) = async_channel::unbounded(); let user_instructions = get_user_instructions(&config).await; - let configure_session = Op::ConfigureSession { + let config = Arc::new(config); + let resume_path = config.experimental_resume.clone(); + + let session_id = Uuid::new_v4(); + let configure_session = ConfigureSession { + session_id, provider: config.model_provider.clone(), model: config.model.clone(), model_reasoning_effort: config.model_reasoning_effort, @@ -145,28 +151,26 @@ impl Codex { disable_response_storage: config.disable_response_storage, notify: config.notify.clone(), cwd: config.cwd.clone(), - resume_path: resume_path.clone(), + resume_path, }; - let config = Arc::new(config); - // Generate a unique ID for the lifetime of this Codex session. - let session_id = Uuid::new_v4(); + let session = Session::new(configure_session, config.clone(), auth, tx_event.clone()) + .await + .map_err(|e| { + error!("Failed to create session: {e:#}"); + CodexErr::InternalAgentDied + })?; // This task will run until Op::Shutdown is received. - tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event)); + tokio::spawn(submission_loop(session, config, rx_sub)); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, rx_event, }; - let init_id = codex.submit(configure_session).await?; - Ok(CodexSpawnOk { - codex, - init_id, - session_id, - }) + Ok(CodexSpawnOk { codex, session_id }) } /// Submit the `op` wrapped in a `Submission` with a unique ID. @@ -214,6 +218,7 @@ struct State { /// /// A session has at most 1 running task at a time, and can be interrupted by user input. pub(crate) struct Session { + session_id: Uuid, client: ModelClient, tx_event: Sender, @@ -246,7 +251,216 @@ pub(crate) struct Session { show_raw_agent_reasoning: bool, } +/// Configure the model session. +struct ConfigureSession { + session_id: Uuid, + + /// Provider identifier ("openai", "openrouter", ...). + provider: ModelProviderInfo, + + /// If not specified, server will use its default model. + model: String, + + model_reasoning_effort: ReasoningEffortConfig, + model_reasoning_summary: ReasoningSummaryConfig, + + /// Model instructions that are appended to the base instructions. + user_instructions: Option, + + /// Base instructions override. + base_instructions: Option, + + /// When to escalate for approval for execution + approval_policy: AskForApproval, + /// How to sandbox commands executed in the system + sandbox_policy: SandboxPolicy, + /// Disable server-side response storage (send full context each request) + disable_response_storage: bool, + + /// Optional external notifier command tokens. Present only when the + /// client wants the agent to spawn a program after each completed + /// turn. + notify: Option>, + + /// Working directory that should be treated as the *root* of the + /// session. All relative paths supplied by the model as well as the + /// execution sandbox are resolved against this directory **instead** + /// of the process-wide current working directory. CLI front-ends are + /// expected to expand this to an absolute path before sending the + /// `ConfigureSession` operation so that the business-logic layer can + /// operate deterministically. + cwd: PathBuf, + + resume_path: Option, +} + impl Session { + async fn new( + configure_session: ConfigureSession, + config: Arc, + auth: Option, + tx_event: Sender, + ) -> anyhow::Result> { + let ConfigureSession { + mut session_id, + provider, + model, + model_reasoning_effort, + model_reasoning_summary, + user_instructions, + base_instructions, + approval_policy, + sandbox_policy, + disable_response_storage, + notify, + cwd, + resume_path, + } = configure_session; + debug!("Configuring session: model={model}; provider={provider:?}"); + if !cwd.is_absolute() { + return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}")); + } + + // Error messages to dispatch after SessionConfigured is sent. + let mut post_session_configured_error_events = Vec::::new(); + + // If `resume_path` is specified, then fail if we cannot resume the + // existing rollout. Though if `resume_path` is not specified and we + // fail to create a `RolloutRecorder`, potentially due to some sort of + // I/O error in attempting to create the log file, then we should still + // create the `Session`, but we do log an error. + let mut restored_items: Option> = None; + let rollout_recorder: Option = match resume_path.as_ref() { + Some(path) => match RolloutRecorder::resume(path, cwd.clone()).await { + Ok((rec, saved)) => { + session_id = saved.session_id; + if !saved.items.is_empty() { + restored_items = Some(saved.items); + } + Some(rec) + } + Err(e) => { + return Err(anyhow::anyhow!( + "failed to resume rollout from {path:?}: {e}" + )); + } + }, + None => { + match RolloutRecorder::new(&config, session_id, user_instructions.clone()).await { + Ok(r) => Some(r), + Err(e) => { + let message = format!("failed to initialise rollout recorder: {e}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { + message: message.clone(), + }), + }); + warn!(message); + None + } + } + } + }; + + let client = ModelClient::new( + config.clone(), + auth.clone(), + provider.clone(), + model_reasoning_effort, + model_reasoning_summary, + session_id, + ); + + // Create the mutable state for the Session. + let mut state = State { + history: ConversationHistory::new(), + ..Default::default() + }; + if let Some(restored_items) = restored_items { + state.history.record_items(&restored_items); + } + + let writable_roots = get_writable_roots(&cwd); + + let (mcp_connection_manager, failed_clients) = + match McpConnectionManager::new(config.mcp_servers.clone()).await { + Ok((mgr, failures)) => (mgr, failures), + Err(e) => { + let message = format!("Failed to create MCP connection manager: {e:#}"); + error!("{message}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { message }), + }); + (McpConnectionManager::default(), Default::default()) + } + }; + + // Surface individual client start-up failures to the user. + if !failed_clients.is_empty() { + for (server_name, err) in failed_clients { + let message = format!("MCP client for `{server_name}` failed to start: {err:#}"); + error!("{message}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { message }), + }); + } + } + + let default_shell = shell::default_user_shell().await; + let sess = Arc::new(Session { + session_id, + client, + tools_config: ToolsConfig::new( + &config.model_family, + approval_policy, + sandbox_policy.clone(), + config.include_plan_tool, + ), + tx_event: tx_event.clone(), + user_instructions, + base_instructions, + approval_policy, + sandbox_policy, + shell_environment_policy: config.shell_environment_policy.clone(), + cwd, + writable_roots, + mcp_connection_manager, + notify, + state: Mutex::new(state), + rollout: Mutex::new(rollout_recorder), + codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), + disable_response_storage, + user_shell: default_shell, + show_raw_agent_reasoning: config.show_raw_agent_reasoning, + }); + + // Gather history metadata for SessionConfiguredEvent. + let (history_log_id, history_entry_count) = + crate::message_history::history_metadata(&config).await; + + // ack + let events = std::iter::once(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id, + model, + history_log_id, + history_entry_count, + }), + }) + .chain(post_session_configured_error_events.into_iter()); + for event in events { + if let Err(e) = tx_event.send(event).await { + error!("failed to send event: {e:?}"); + } + } + + Ok(sess) + } + pub(crate) fn get_writable_roots(&self) -> &[PathBuf] { &self.writable_roots } @@ -628,16 +842,6 @@ impl Drop for Session { } } -impl State { - pub fn partial_clone(&self) -> Self { - Self { - approved_commands: self.approved_commands.clone(), - history: self.history.clone(), - ..Default::default() - } - } -} - #[derive(Clone, Debug)] pub(crate) struct ExecCommandContext { pub(crate) sub_id: String, @@ -708,262 +912,36 @@ impl AgentTask { } } -async fn submission_loop( - mut session_id: Uuid, - config: Arc, - auth: Option, - rx_sub: Receiver, - tx_event: Sender, -) { - let mut sess: Option> = None; - // shorthand - send an event when there is no active session - let send_no_session_event = |sub_id: String| async { - let event = Event { - id: sub_id, - msg: EventMsg::Error(ErrorEvent { - message: "No session initialized, expected 'ConfigureSession' as first Op" - .to_string(), - }), - }; - tx_event.send(event).await.ok(); - }; - +async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiver) { // To break out of this loop, send Op::Shutdown. while let Ok(sub) = rx_sub.recv().await { debug!(?sub, "Submission"); match sub.op { Op::Interrupt => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; sess.abort(); } - Op::ConfigureSession { - provider, - model, - model_reasoning_effort, - model_reasoning_summary, - user_instructions, - base_instructions, - approval_policy, - sandbox_policy, - disable_response_storage, - notify, - cwd, - resume_path, - } => { - debug!( - "Configuring session: model={model}; provider={provider:?}; resume={resume_path:?}" - ); - if !cwd.is_absolute() { - let message = format!("cwd is not absolute: {cwd:?}"); - error!(message); - let event = Event { - id: sub.id, - msg: EventMsg::Error(ErrorEvent { message }), - }; - if let Err(e) = tx_event.send(event).await { - error!("failed to send error message: {e:?}"); - } - return; - } - // Optionally resume an existing rollout. - let mut restored_items: Option> = None; - let rollout_recorder: Option = - if let Some(path) = resume_path.as_ref() { - match RolloutRecorder::resume(path, cwd.clone()).await { - Ok((rec, saved)) => { - session_id = saved.session_id; - if !saved.items.is_empty() { - restored_items = Some(saved.items); - } - Some(rec) - } - Err(e) => { - warn!("failed to resume rollout from {path:?}: {e}"); - None - } - } - } else { - None - }; - - let rollout_recorder = match rollout_recorder { - Some(rec) => Some(rec), - None => { - match RolloutRecorder::new(&config, session_id, user_instructions.clone()) - .await - { - Ok(r) => Some(r), - Err(e) => { - warn!("failed to initialise rollout recorder: {e}"); - None - } - } - } - }; - - let client = ModelClient::new( - config.clone(), - auth.clone(), - provider.clone(), - model_reasoning_effort, - model_reasoning_summary, - session_id, - ); - - // abort any current running session and clone its state - let state = match sess.take() { - Some(sess) => { - sess.abort(); - sess.state.lock().unwrap().partial_clone() - } - None => State { - history: ConversationHistory::new(), - ..Default::default() - }, - }; - - let writable_roots = get_writable_roots(&cwd); - - // Error messages to dispatch after SessionConfigured is sent. - let mut mcp_connection_errors = Vec::::new(); - let (mcp_connection_manager, failed_clients) = - match McpConnectionManager::new(config.mcp_servers.clone()).await { - Ok((mgr, failures)) => (mgr, failures), - Err(e) => { - let message = format!("Failed to create MCP connection manager: {e:#}"); - error!("{message}"); - mcp_connection_errors.push(Event { - id: sub.id.clone(), - msg: EventMsg::Error(ErrorEvent { message }), - }); - (McpConnectionManager::default(), Default::default()) - } - }; - - // Surface individual client start-up failures to the user. - if !failed_clients.is_empty() { - for (server_name, err) in failed_clients { - let message = - format!("MCP client for `{server_name}` failed to start: {err:#}"); - error!("{message}"); - mcp_connection_errors.push(Event { - id: sub.id.clone(), - msg: EventMsg::Error(ErrorEvent { message }), - }); - } - } - let default_shell = shell::default_user_shell().await; - sess = Some(Arc::new(Session { - client, - tools_config: ToolsConfig::new( - &config.model_family, - approval_policy, - sandbox_policy.clone(), - config.include_plan_tool, - ), - tx_event: tx_event.clone(), - user_instructions, - base_instructions, - approval_policy, - sandbox_policy, - shell_environment_policy: config.shell_environment_policy.clone(), - cwd, - writable_roots, - mcp_connection_manager, - notify, - state: Mutex::new(state), - rollout: Mutex::new(rollout_recorder), - codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), - disable_response_storage, - user_shell: default_shell, - show_raw_agent_reasoning: config.show_raw_agent_reasoning, - })); - - // Patch restored state into the newly created session. - if let Some(sess_arc) = &sess { - if restored_items.is_some() { - let mut st = sess_arc.state.lock().unwrap(); - st.history.record_items(restored_items.unwrap().iter()); - } - } - - // Gather history metadata for SessionConfiguredEvent. - let (history_log_id, history_entry_count) = - crate::message_history::history_metadata(&config).await; - - // ack - let events = std::iter::once(Event { - id: sub.id.clone(), - msg: EventMsg::SessionConfigured(SessionConfiguredEvent { - session_id, - model, - history_log_id, - history_entry_count, - }), - }) - .chain(mcp_connection_errors.into_iter()); - for event in events { - if let Err(e) = tx_event.send(event).await { - error!("failed to send event: {e:?}"); - } - } - } Op::UserInput { items } => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - // attempt to inject input into current task if let Err(items) = sess.inject_input(items) { // no current task, spawn a new one - let task = AgentTask::spawn(Arc::clone(sess), sub.id, items); + let task = AgentTask::spawn(sess.clone(), sub.id, items); sess.set_task(task); } } - Op::ExecApproval { id, decision } => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - match decision { - ReviewDecision::Abort => { - sess.abort(); - } - other => sess.notify_approval(&id, other), + Op::ExecApproval { id, decision } => match decision { + ReviewDecision::Abort => { + sess.abort(); } - } - Op::PatchApproval { id, decision } => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - match decision { - ReviewDecision::Abort => { - sess.abort(); - } - other => sess.notify_approval(&id, other), + other => sess.notify_approval(&id, other), + }, + Op::PatchApproval { id, decision } => match decision { + ReviewDecision::Abort => { + sess.abort(); } - } + other => sess.notify_approval(&id, other), + }, Op::AddToHistory { text } => { - // TODO: What should we do if we got AddToHistory before ConfigureSession? - // currently, if ConfigureSession has resume path, this history will be ignored - let id = session_id; + let id = sess.session_id; let config = config.clone(); tokio::spawn(async move { if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await @@ -975,7 +953,7 @@ async fn submission_loop( Op::GetHistoryEntryRequest { offset, log_id } => { let config = config.clone(); - let tx_event = tx_event.clone(); + let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); tokio::spawn(async move { @@ -1003,14 +981,6 @@ async fn submission_loop( }); } Op::Compact => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - // Create a summarization request as user input const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md"); @@ -1032,28 +1002,27 @@ async fn submission_loop( // Gracefully flush and shutdown rollout recorder on session end so tests // that inspect the rollout file do not race with the background writer. - if let Some(sess_arc) = sess { - let recorder_opt = sess_arc.rollout.lock().unwrap().take(); - if let Some(rec) = recorder_opt { - if let Err(e) = rec.shutdown().await { - warn!("failed to shutdown rollout recorder: {e}"); - let event = Event { - id: sub.id.clone(), - msg: EventMsg::Error(ErrorEvent { - message: "Failed to shutdown rollout recorder".to_string(), - }), - }; - if let Err(e) = tx_event.send(event).await { - warn!("failed to send error message: {e:?}"); - } + let recorder_opt = sess.rollout.lock().unwrap().take(); + if let Some(rec) = recorder_opt { + if let Err(e) = rec.shutdown().await { + warn!("failed to shutdown rollout recorder: {e}"); + let event = Event { + id: sub.id.clone(), + msg: EventMsg::Error(ErrorEvent { + message: "Failed to shutdown rollout recorder".to_string(), + }), + }; + if let Err(e) = sess.tx_event.send(event).await { + warn!("failed to send error message: {e:?}"); } } } + let event = Event { id: sub.id.clone(), msg: EventMsg::ShutdownComplete, }; - if let Err(e) = tx_event.send(event).await { + if let Err(e) = sess.tx_event.send(event).await { warn!("failed to send Shutdown event: {e}"); } break; diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 4a6cdc1b..48ccdddf 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -7,6 +7,7 @@ use uuid::Uuid; use crate::codex::Codex; use crate::codex::CodexSpawnOk; +use crate::codex::INITIAL_SUBMIT_ID; use crate::codex_conversation::CodexConversation; use crate::config::Config; use crate::error::CodexErr; @@ -52,7 +53,6 @@ impl ConversationManager { ) -> CodexResult { let CodexSpawnOk { codex, - init_id, session_id: conversation_id, } = Codex::spawn(config, auth).await?; @@ -64,7 +64,7 @@ impl ConversationManager { Event { id, msg: EventMsg::SessionConfigured(session_configured), - } if id == init_id => session_configured, + } if id == INITIAL_SUBMIT_ID => session_configured, _ => { return Err(CodexErr::SessionConfiguredNotFirstEvent); } diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index a7e6b1ed..f32d9ccf 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -17,10 +17,7 @@ use serde_bytes::ByteBuf; use strum_macros::Display; use uuid::Uuid; -use crate::config_types::ReasoningEffort as ReasoningEffortConfig; -use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::message_history::HistoryEntry; -use crate::model_provider_info::ModelProviderInfo; use crate::parse_command::ParsedCommand; use crate::plan_tool::UpdatePlanArgs; @@ -39,52 +36,6 @@ pub struct Submission { #[allow(clippy::large_enum_variant)] #[non_exhaustive] pub enum Op { - /// Configure the model session. - ConfigureSession { - /// Provider identifier ("openai", "openrouter", ...). - provider: ModelProviderInfo, - - /// If not specified, server will use its default model. - model: String, - - model_reasoning_effort: ReasoningEffortConfig, - model_reasoning_summary: ReasoningSummaryConfig, - - /// Model instructions that are appended to the base instructions. - user_instructions: Option, - - /// Base instructions override. - base_instructions: Option, - - /// When to escalate for approval for execution - approval_policy: AskForApproval, - /// How to sandbox commands executed in the system - sandbox_policy: SandboxPolicy, - /// Disable server-side response storage (send full context each request) - #[serde(default)] - disable_response_storage: bool, - - /// Optional external notifier command tokens. Present only when the - /// client wants the agent to spawn a program after each completed - /// turn. - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - notify: Option>, - - /// Working directory that should be treated as the *root* of the - /// session. All relative paths supplied by the model as well as the - /// execution sandbox are resolved against this directory **instead** - /// of the process-wide current working directory. CLI front-ends are - /// expected to expand this to an absolute path before sending the - /// `ConfigureSession` operation so that the business-logic layer can - /// operate deterministically. - cwd: std::path::PathBuf, - - /// Path to a rollout file to resume from. - #[serde(skip_serializing_if = "Option::is_none")] - resume_path: Option, - }, - /// Abort current task. /// This server sends no corresponding Event Interrupt,