diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ff726a24..2ae1db5b 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -30,6 +30,7 @@ use tracing::trace; use tracing::warn; use uuid::Uuid; +use crate::ModelProviderInfo; use crate::apply_patch::ApplyPatchExec; use crate::apply_patch::CODEX_APPLY_PATCH_ARG1; use crate::apply_patch::InternalApplyPatchInvocation; @@ -41,6 +42,8 @@ use crate::client_common::EnvironmentContext; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; use crate::config::Config; +use crate::config_types::ReasoningEffort as ReasoningEffortConfig; +use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::config_types::ShellEnvironmentPolicy; use crate::conversation_history::ConversationHistory; use crate::error::CodexErr; @@ -118,22 +121,25 @@ pub struct Codex { /// unique session id. pub struct CodexSpawnOk { pub codex: Codex, - pub init_id: String, pub session_id: Uuid, } +pub(crate) const INITIAL_SUBMIT_ID: &str = ""; + impl Codex { /// Spawn a new [`Codex`] and initialize the session. pub async fn spawn(config: Config, auth: Option) -> CodexResult { - // experimental resume path (undocumented) - let resume_path = config.experimental_resume.clone(); - info!("resume_path: {resume_path:?}"); let (tx_sub, rx_sub) = async_channel::bounded(64); let (tx_event, rx_event) = async_channel::unbounded(); let user_instructions = get_user_instructions(&config).await; - let configure_session = Op::ConfigureSession { + let config = Arc::new(config); + let resume_path = config.experimental_resume.clone(); + + let session_id = Uuid::new_v4(); + let configure_session = ConfigureSession { + session_id, provider: config.model_provider.clone(), model: config.model.clone(), model_reasoning_effort: config.model_reasoning_effort, @@ -145,28 +151,26 @@ impl Codex { disable_response_storage: config.disable_response_storage, notify: config.notify.clone(), cwd: config.cwd.clone(), - resume_path: resume_path.clone(), + resume_path, }; - let config = Arc::new(config); - // Generate a unique ID for the lifetime of this Codex session. - let session_id = Uuid::new_v4(); + let session = Session::new(configure_session, config.clone(), auth, tx_event.clone()) + .await + .map_err(|e| { + error!("Failed to create session: {e:#}"); + CodexErr::InternalAgentDied + })?; // This task will run until Op::Shutdown is received. - tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event)); + tokio::spawn(submission_loop(session, config, rx_sub)); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, rx_event, }; - let init_id = codex.submit(configure_session).await?; - Ok(CodexSpawnOk { - codex, - init_id, - session_id, - }) + Ok(CodexSpawnOk { codex, session_id }) } /// Submit the `op` wrapped in a `Submission` with a unique ID. @@ -214,6 +218,7 @@ struct State { /// /// A session has at most 1 running task at a time, and can be interrupted by user input. pub(crate) struct Session { + session_id: Uuid, client: ModelClient, tx_event: Sender, @@ -246,7 +251,216 @@ pub(crate) struct Session { show_raw_agent_reasoning: bool, } +/// Configure the model session. +struct ConfigureSession { + session_id: Uuid, + + /// Provider identifier ("openai", "openrouter", ...). + provider: ModelProviderInfo, + + /// If not specified, server will use its default model. + model: String, + + model_reasoning_effort: ReasoningEffortConfig, + model_reasoning_summary: ReasoningSummaryConfig, + + /// Model instructions that are appended to the base instructions. + user_instructions: Option, + + /// Base instructions override. + base_instructions: Option, + + /// When to escalate for approval for execution + approval_policy: AskForApproval, + /// How to sandbox commands executed in the system + sandbox_policy: SandboxPolicy, + /// Disable server-side response storage (send full context each request) + disable_response_storage: bool, + + /// Optional external notifier command tokens. Present only when the + /// client wants the agent to spawn a program after each completed + /// turn. + notify: Option>, + + /// Working directory that should be treated as the *root* of the + /// session. All relative paths supplied by the model as well as the + /// execution sandbox are resolved against this directory **instead** + /// of the process-wide current working directory. CLI front-ends are + /// expected to expand this to an absolute path before sending the + /// `ConfigureSession` operation so that the business-logic layer can + /// operate deterministically. + cwd: PathBuf, + + resume_path: Option, +} + impl Session { + async fn new( + configure_session: ConfigureSession, + config: Arc, + auth: Option, + tx_event: Sender, + ) -> anyhow::Result> { + let ConfigureSession { + mut session_id, + provider, + model, + model_reasoning_effort, + model_reasoning_summary, + user_instructions, + base_instructions, + approval_policy, + sandbox_policy, + disable_response_storage, + notify, + cwd, + resume_path, + } = configure_session; + debug!("Configuring session: model={model}; provider={provider:?}"); + if !cwd.is_absolute() { + return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}")); + } + + // Error messages to dispatch after SessionConfigured is sent. + let mut post_session_configured_error_events = Vec::::new(); + + // If `resume_path` is specified, then fail if we cannot resume the + // existing rollout. Though if `resume_path` is not specified and we + // fail to create a `RolloutRecorder`, potentially due to some sort of + // I/O error in attempting to create the log file, then we should still + // create the `Session`, but we do log an error. + let mut restored_items: Option> = None; + let rollout_recorder: Option = match resume_path.as_ref() { + Some(path) => match RolloutRecorder::resume(path, cwd.clone()).await { + Ok((rec, saved)) => { + session_id = saved.session_id; + if !saved.items.is_empty() { + restored_items = Some(saved.items); + } + Some(rec) + } + Err(e) => { + return Err(anyhow::anyhow!( + "failed to resume rollout from {path:?}: {e}" + )); + } + }, + None => { + match RolloutRecorder::new(&config, session_id, user_instructions.clone()).await { + Ok(r) => Some(r), + Err(e) => { + let message = format!("failed to initialise rollout recorder: {e}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { + message: message.clone(), + }), + }); + warn!(message); + None + } + } + } + }; + + let client = ModelClient::new( + config.clone(), + auth.clone(), + provider.clone(), + model_reasoning_effort, + model_reasoning_summary, + session_id, + ); + + // Create the mutable state for the Session. + let mut state = State { + history: ConversationHistory::new(), + ..Default::default() + }; + if let Some(restored_items) = restored_items { + state.history.record_items(&restored_items); + } + + let writable_roots = get_writable_roots(&cwd); + + let (mcp_connection_manager, failed_clients) = + match McpConnectionManager::new(config.mcp_servers.clone()).await { + Ok((mgr, failures)) => (mgr, failures), + Err(e) => { + let message = format!("Failed to create MCP connection manager: {e:#}"); + error!("{message}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { message }), + }); + (McpConnectionManager::default(), Default::default()) + } + }; + + // Surface individual client start-up failures to the user. + if !failed_clients.is_empty() { + for (server_name, err) in failed_clients { + let message = format!("MCP client for `{server_name}` failed to start: {err:#}"); + error!("{message}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { message }), + }); + } + } + + let default_shell = shell::default_user_shell().await; + let sess = Arc::new(Session { + session_id, + client, + tools_config: ToolsConfig::new( + &config.model_family, + approval_policy, + sandbox_policy.clone(), + config.include_plan_tool, + ), + tx_event: tx_event.clone(), + user_instructions, + base_instructions, + approval_policy, + sandbox_policy, + shell_environment_policy: config.shell_environment_policy.clone(), + cwd, + writable_roots, + mcp_connection_manager, + notify, + state: Mutex::new(state), + rollout: Mutex::new(rollout_recorder), + codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), + disable_response_storage, + user_shell: default_shell, + show_raw_agent_reasoning: config.show_raw_agent_reasoning, + }); + + // Gather history metadata for SessionConfiguredEvent. + let (history_log_id, history_entry_count) = + crate::message_history::history_metadata(&config).await; + + // ack + let events = std::iter::once(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id, + model, + history_log_id, + history_entry_count, + }), + }) + .chain(post_session_configured_error_events.into_iter()); + for event in events { + if let Err(e) = tx_event.send(event).await { + error!("failed to send event: {e:?}"); + } + } + + Ok(sess) + } + pub(crate) fn get_writable_roots(&self) -> &[PathBuf] { &self.writable_roots } @@ -628,16 +842,6 @@ impl Drop for Session { } } -impl State { - pub fn partial_clone(&self) -> Self { - Self { - approved_commands: self.approved_commands.clone(), - history: self.history.clone(), - ..Default::default() - } - } -} - #[derive(Clone, Debug)] pub(crate) struct ExecCommandContext { pub(crate) sub_id: String, @@ -708,262 +912,36 @@ impl AgentTask { } } -async fn submission_loop( - mut session_id: Uuid, - config: Arc, - auth: Option, - rx_sub: Receiver, - tx_event: Sender, -) { - let mut sess: Option> = None; - // shorthand - send an event when there is no active session - let send_no_session_event = |sub_id: String| async { - let event = Event { - id: sub_id, - msg: EventMsg::Error(ErrorEvent { - message: "No session initialized, expected 'ConfigureSession' as first Op" - .to_string(), - }), - }; - tx_event.send(event).await.ok(); - }; - +async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiver) { // To break out of this loop, send Op::Shutdown. while let Ok(sub) = rx_sub.recv().await { debug!(?sub, "Submission"); match sub.op { Op::Interrupt => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; sess.abort(); } - Op::ConfigureSession { - provider, - model, - model_reasoning_effort, - model_reasoning_summary, - user_instructions, - base_instructions, - approval_policy, - sandbox_policy, - disable_response_storage, - notify, - cwd, - resume_path, - } => { - debug!( - "Configuring session: model={model}; provider={provider:?}; resume={resume_path:?}" - ); - if !cwd.is_absolute() { - let message = format!("cwd is not absolute: {cwd:?}"); - error!(message); - let event = Event { - id: sub.id, - msg: EventMsg::Error(ErrorEvent { message }), - }; - if let Err(e) = tx_event.send(event).await { - error!("failed to send error message: {e:?}"); - } - return; - } - // Optionally resume an existing rollout. - let mut restored_items: Option> = None; - let rollout_recorder: Option = - if let Some(path) = resume_path.as_ref() { - match RolloutRecorder::resume(path, cwd.clone()).await { - Ok((rec, saved)) => { - session_id = saved.session_id; - if !saved.items.is_empty() { - restored_items = Some(saved.items); - } - Some(rec) - } - Err(e) => { - warn!("failed to resume rollout from {path:?}: {e}"); - None - } - } - } else { - None - }; - - let rollout_recorder = match rollout_recorder { - Some(rec) => Some(rec), - None => { - match RolloutRecorder::new(&config, session_id, user_instructions.clone()) - .await - { - Ok(r) => Some(r), - Err(e) => { - warn!("failed to initialise rollout recorder: {e}"); - None - } - } - } - }; - - let client = ModelClient::new( - config.clone(), - auth.clone(), - provider.clone(), - model_reasoning_effort, - model_reasoning_summary, - session_id, - ); - - // abort any current running session and clone its state - let state = match sess.take() { - Some(sess) => { - sess.abort(); - sess.state.lock().unwrap().partial_clone() - } - None => State { - history: ConversationHistory::new(), - ..Default::default() - }, - }; - - let writable_roots = get_writable_roots(&cwd); - - // Error messages to dispatch after SessionConfigured is sent. - let mut mcp_connection_errors = Vec::::new(); - let (mcp_connection_manager, failed_clients) = - match McpConnectionManager::new(config.mcp_servers.clone()).await { - Ok((mgr, failures)) => (mgr, failures), - Err(e) => { - let message = format!("Failed to create MCP connection manager: {e:#}"); - error!("{message}"); - mcp_connection_errors.push(Event { - id: sub.id.clone(), - msg: EventMsg::Error(ErrorEvent { message }), - }); - (McpConnectionManager::default(), Default::default()) - } - }; - - // Surface individual client start-up failures to the user. - if !failed_clients.is_empty() { - for (server_name, err) in failed_clients { - let message = - format!("MCP client for `{server_name}` failed to start: {err:#}"); - error!("{message}"); - mcp_connection_errors.push(Event { - id: sub.id.clone(), - msg: EventMsg::Error(ErrorEvent { message }), - }); - } - } - let default_shell = shell::default_user_shell().await; - sess = Some(Arc::new(Session { - client, - tools_config: ToolsConfig::new( - &config.model_family, - approval_policy, - sandbox_policy.clone(), - config.include_plan_tool, - ), - tx_event: tx_event.clone(), - user_instructions, - base_instructions, - approval_policy, - sandbox_policy, - shell_environment_policy: config.shell_environment_policy.clone(), - cwd, - writable_roots, - mcp_connection_manager, - notify, - state: Mutex::new(state), - rollout: Mutex::new(rollout_recorder), - codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), - disable_response_storage, - user_shell: default_shell, - show_raw_agent_reasoning: config.show_raw_agent_reasoning, - })); - - // Patch restored state into the newly created session. - if let Some(sess_arc) = &sess { - if restored_items.is_some() { - let mut st = sess_arc.state.lock().unwrap(); - st.history.record_items(restored_items.unwrap().iter()); - } - } - - // Gather history metadata for SessionConfiguredEvent. - let (history_log_id, history_entry_count) = - crate::message_history::history_metadata(&config).await; - - // ack - let events = std::iter::once(Event { - id: sub.id.clone(), - msg: EventMsg::SessionConfigured(SessionConfiguredEvent { - session_id, - model, - history_log_id, - history_entry_count, - }), - }) - .chain(mcp_connection_errors.into_iter()); - for event in events { - if let Err(e) = tx_event.send(event).await { - error!("failed to send event: {e:?}"); - } - } - } Op::UserInput { items } => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - // attempt to inject input into current task if let Err(items) = sess.inject_input(items) { // no current task, spawn a new one - let task = AgentTask::spawn(Arc::clone(sess), sub.id, items); + let task = AgentTask::spawn(sess.clone(), sub.id, items); sess.set_task(task); } } - Op::ExecApproval { id, decision } => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - match decision { - ReviewDecision::Abort => { - sess.abort(); - } - other => sess.notify_approval(&id, other), + Op::ExecApproval { id, decision } => match decision { + ReviewDecision::Abort => { + sess.abort(); } - } - Op::PatchApproval { id, decision } => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - match decision { - ReviewDecision::Abort => { - sess.abort(); - } - other => sess.notify_approval(&id, other), + other => sess.notify_approval(&id, other), + }, + Op::PatchApproval { id, decision } => match decision { + ReviewDecision::Abort => { + sess.abort(); } - } + other => sess.notify_approval(&id, other), + }, Op::AddToHistory { text } => { - // TODO: What should we do if we got AddToHistory before ConfigureSession? - // currently, if ConfigureSession has resume path, this history will be ignored - let id = session_id; + let id = sess.session_id; let config = config.clone(); tokio::spawn(async move { if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await @@ -975,7 +953,7 @@ async fn submission_loop( Op::GetHistoryEntryRequest { offset, log_id } => { let config = config.clone(); - let tx_event = tx_event.clone(); + let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); tokio::spawn(async move { @@ -1003,14 +981,6 @@ async fn submission_loop( }); } Op::Compact => { - let sess = match sess.as_ref() { - Some(sess) => sess, - None => { - send_no_session_event(sub.id).await; - continue; - } - }; - // Create a summarization request as user input const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md"); @@ -1032,28 +1002,27 @@ async fn submission_loop( // Gracefully flush and shutdown rollout recorder on session end so tests // that inspect the rollout file do not race with the background writer. - if let Some(sess_arc) = sess { - let recorder_opt = sess_arc.rollout.lock().unwrap().take(); - if let Some(rec) = recorder_opt { - if let Err(e) = rec.shutdown().await { - warn!("failed to shutdown rollout recorder: {e}"); - let event = Event { - id: sub.id.clone(), - msg: EventMsg::Error(ErrorEvent { - message: "Failed to shutdown rollout recorder".to_string(), - }), - }; - if let Err(e) = tx_event.send(event).await { - warn!("failed to send error message: {e:?}"); - } + let recorder_opt = sess.rollout.lock().unwrap().take(); + if let Some(rec) = recorder_opt { + if let Err(e) = rec.shutdown().await { + warn!("failed to shutdown rollout recorder: {e}"); + let event = Event { + id: sub.id.clone(), + msg: EventMsg::Error(ErrorEvent { + message: "Failed to shutdown rollout recorder".to_string(), + }), + }; + if let Err(e) = sess.tx_event.send(event).await { + warn!("failed to send error message: {e:?}"); } } } + let event = Event { id: sub.id.clone(), msg: EventMsg::ShutdownComplete, }; - if let Err(e) = tx_event.send(event).await { + if let Err(e) = sess.tx_event.send(event).await { warn!("failed to send Shutdown event: {e}"); } break; diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 4a6cdc1b..48ccdddf 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -7,6 +7,7 @@ use uuid::Uuid; use crate::codex::Codex; use crate::codex::CodexSpawnOk; +use crate::codex::INITIAL_SUBMIT_ID; use crate::codex_conversation::CodexConversation; use crate::config::Config; use crate::error::CodexErr; @@ -52,7 +53,6 @@ impl ConversationManager { ) -> CodexResult { let CodexSpawnOk { codex, - init_id, session_id: conversation_id, } = Codex::spawn(config, auth).await?; @@ -64,7 +64,7 @@ impl ConversationManager { Event { id, msg: EventMsg::SessionConfigured(session_configured), - } if id == init_id => session_configured, + } if id == INITIAL_SUBMIT_ID => session_configured, _ => { return Err(CodexErr::SessionConfiguredNotFirstEvent); } diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index a7e6b1ed..f32d9ccf 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -17,10 +17,7 @@ use serde_bytes::ByteBuf; use strum_macros::Display; use uuid::Uuid; -use crate::config_types::ReasoningEffort as ReasoningEffortConfig; -use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::message_history::HistoryEntry; -use crate::model_provider_info::ModelProviderInfo; use crate::parse_command::ParsedCommand; use crate::plan_tool::UpdatePlanArgs; @@ -39,52 +36,6 @@ pub struct Submission { #[allow(clippy::large_enum_variant)] #[non_exhaustive] pub enum Op { - /// Configure the model session. - ConfigureSession { - /// Provider identifier ("openai", "openrouter", ...). - provider: ModelProviderInfo, - - /// If not specified, server will use its default model. - model: String, - - model_reasoning_effort: ReasoningEffortConfig, - model_reasoning_summary: ReasoningSummaryConfig, - - /// Model instructions that are appended to the base instructions. - user_instructions: Option, - - /// Base instructions override. - base_instructions: Option, - - /// When to escalate for approval for execution - approval_policy: AskForApproval, - /// How to sandbox commands executed in the system - sandbox_policy: SandboxPolicy, - /// Disable server-side response storage (send full context each request) - #[serde(default)] - disable_response_storage: bool, - - /// Optional external notifier command tokens. Present only when the - /// client wants the agent to spawn a program after each completed - /// turn. - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - notify: Option>, - - /// Working directory that should be treated as the *root* of the - /// session. All relative paths supplied by the model as well as the - /// execution sandbox are resolved against this directory **instead** - /// of the process-wide current working directory. CLI front-ends are - /// expected to expand this to an absolute path before sending the - /// `ConfigureSession` operation so that the business-logic layer can - /// operate deterministically. - cwd: std::path::PathBuf, - - /// Path to a rollout file to resume from. - #[serde(skip_serializing_if = "Option::is_none")] - resume_path: Option, - }, - /// Abort current task. /// This server sends no corresponding Event Interrupt,