From 8f11652458e0efc6e0a3969c087370ed958d3861 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Thu, 14 Aug 2025 13:29:58 -0700 Subject: [PATCH] fix: parallelize logic in Session::new() (#2305) #2291 made it so that `Session::new()` is on the critical path to `Codex::spawn()`, which means it is on the hot path to CLI startup. This refactors `Session::new()` to run a number of async tasks in parallel that were previously run serially to try to reduce latency. --- codex-rs/core/src/codex.rs | 192 +++++++++++++++++++++---------------- 1 file changed, 108 insertions(+), 84 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index bca5af43..098ff115 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -137,9 +137,7 @@ impl Codex { let config = Arc::new(config); let resume_path = config.experimental_resume.clone(); - let session_id = Uuid::new_v4(); let configure_session = ConfigureSession { - session_id, provider: config.model_provider.clone(), model: config.model.clone(), model_reasoning_effort: config.model_reasoning_effort, @@ -161,6 +159,7 @@ impl Codex { error!("Failed to create session: {e:#}"); CodexErr::InternalAgentDied })?; + let session_id = session.session_id; // This task will run until Op::Shutdown is received. tokio::spawn(submission_loop(session, config, rx_sub)); @@ -253,8 +252,6 @@ pub(crate) struct Session { /// Configure the model session. struct ConfigureSession { - session_id: Uuid, - /// Provider identifier ("openai", "openrouter", ...). provider: ModelProviderInfo, @@ -302,7 +299,6 @@ impl Session { tx_event: Sender, ) -> anyhow::Result> { let ConfigureSession { - mut session_id, provider, model, model_reasoning_effort, @@ -324,53 +320,85 @@ impl Session { // Error messages to dispatch after SessionConfigured is sent. let mut post_session_configured_error_events = Vec::::new(); - // If `resume_path` is specified, then fail if we cannot resume the - // existing rollout. Though if `resume_path` is not specified and we - // fail to create a `RolloutRecorder`, potentially due to some sort of - // I/O error in attempting to create the log file, then we should still - // create the `Session`, but we do log an error. - let mut restored_items: Option> = None; - let rollout_recorder: Option = match resume_path.as_ref() { - Some(path) => match RolloutRecorder::resume(path, cwd.clone()).await { - Ok((rec, saved)) => { - session_id = saved.session_id; - if !saved.items.is_empty() { - restored_items = Some(saved.items); - } - Some(rec) - } - Err(e) => { - return Err(anyhow::anyhow!( - "failed to resume rollout from {path:?}: {e}" - )); - } - }, - None => { - match RolloutRecorder::new(&config, session_id, user_instructions.clone()).await { - Ok(r) => Some(r), - Err(e) => { - let message = format!("failed to initialise rollout recorder: {e}"); - post_session_configured_error_events.push(Event { - id: INITIAL_SUBMIT_ID.to_owned(), - msg: EventMsg::Error(ErrorEvent { - message: message.clone(), - }), - }); - warn!(message); - None - } + // Kick off independent async setup tasks in parallel to reduce startup latency. + // + // - initialize RolloutRecorder with new or resumed session info + // - spin up MCP connection manager + // - perform default shell discovery + // - load history metadata + let rollout_fut = async { + match resume_path.as_ref() { + Some(path) => RolloutRecorder::resume(path, cwd.clone()) + .await + .map(|(rec, saved)| (saved.session_id, Some(saved), rec)), + None => { + let session_id = Uuid::new_v4(); + RolloutRecorder::new(&config, session_id, user_instructions.clone()) + .await + .map(|rec| (session_id, None, rec)) } } }; - let client = ModelClient::new( - config.clone(), - auth.clone(), - provider.clone(), - model_reasoning_effort, - model_reasoning_summary, + let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone()); + let default_shell_fut = shell::default_user_shell(); + let history_meta_fut = crate::message_history::history_metadata(&config); + + // Join all independent futures. + let (rollout_res, mcp_res, default_shell, (history_log_id, history_entry_count)) = + tokio::join!(rollout_fut, mcp_fut, default_shell_fut, history_meta_fut); + + // Handle rollout result, which determines the session_id. + struct RolloutResult { + session_id: Uuid, + rollout_recorder: Option, + restored_items: Option>, + } + let rollout_result = match rollout_res { + Ok((session_id, maybe_saved, recorder)) => { + let restored_items: Option> = + maybe_saved.and_then(|saved_session| { + if saved_session.items.is_empty() { + None + } else { + Some(saved_session.items) + } + }); + RolloutResult { + session_id, + rollout_recorder: Some(recorder), + restored_items, + } + } + Err(e) => { + if let Some(path) = resume_path.as_ref() { + return Err(anyhow::anyhow!( + "failed to resume rollout from {path:?}: {e}" + )); + } + + let message = format!("failed to initialize rollout recorder: {e}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { + message: message.clone(), + }), + }); + warn!("{message}"); + + RolloutResult { + session_id: Uuid::new_v4(), + rollout_recorder: None, + restored_items: None, + } + } + }; + + let RolloutResult { session_id, - ); + rollout_recorder, + restored_items, + } = rollout_result; // Create the mutable state for the Session. let mut state = State { @@ -383,19 +411,19 @@ impl Session { let writable_roots = get_writable_roots(&cwd); - let (mcp_connection_manager, failed_clients) = - match McpConnectionManager::new(config.mcp_servers.clone()).await { - Ok((mgr, failures)) => (mgr, failures), - Err(e) => { - let message = format!("Failed to create MCP connection manager: {e:#}"); - error!("{message}"); - post_session_configured_error_events.push(Event { - id: INITIAL_SUBMIT_ID.to_owned(), - msg: EventMsg::Error(ErrorEvent { message }), - }); - (McpConnectionManager::default(), Default::default()) - } - }; + // Handle MCP manager result and record any startup failures. + let (mcp_connection_manager, failed_clients) = match mcp_res { + Ok((mgr, failures)) => (mgr, failures), + Err(e) => { + let message = format!("Failed to create MCP connection manager: {e:#}"); + error!("{message}"); + post_session_configured_error_events.push(Event { + id: INITIAL_SUBMIT_ID.to_owned(), + msg: EventMsg::Error(ErrorEvent { message }), + }); + (McpConnectionManager::default(), Default::default()) + } + }; // Surface individual client start-up failures to the user. if !failed_clients.is_empty() { @@ -409,7 +437,16 @@ impl Session { } } - let default_shell = shell::default_user_shell().await; + // Now that `session_id` is final (may have been updated by resume), + // construct the model client. + let client = ModelClient::new( + config.clone(), + auth.clone(), + provider.clone(), + model_reasoning_effort, + model_reasoning_summary, + session_id, + ); let sess = Arc::new(Session { session_id, client, @@ -437,25 +474,20 @@ impl Session { show_raw_agent_reasoning: config.show_raw_agent_reasoning, }); - // record the initial user instructions and environment context, regardless of whether we restored items. - if let Some(user_instructions) = sess.get_user_instructions().clone() { - sess.record_conversation_items(&[Prompt::format_user_instructions_message( - &user_instructions, - )]) - .await; + // record the initial user instructions and environment context, + // regardless of whether we restored items. + let mut conversation_items = Vec::::with_capacity(2); + if let Some(user_instructions) = sess.user_instructions.as_deref() { + conversation_items.push(Prompt::format_user_instructions_message(user_instructions)); } - sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new( + conversation_items.push(ResponseItem::from(EnvironmentContext::new( sess.get_cwd().to_path_buf(), sess.get_approval_policy(), - sess.get_sandbox_policy().clone(), - ))]) - .await; + sess.sandbox_policy.clone(), + ))); + sess.record_conversation_items(&conversation_items).await; - // Gather history metadata for SessionConfiguredEvent. - let (history_log_id, history_entry_count) = - crate::message_history::history_metadata(&config).await; - - // ack + // Dispatch the SessionConfiguredEvent first and then report any errors. let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { @@ -487,14 +519,6 @@ impl Session { &self.cwd } - pub(crate) fn get_user_instructions(&self) -> Option { - self.user_instructions.clone() - } - - pub(crate) fn get_sandbox_policy(&self) -> &SandboxPolicy { - &self.sandbox_policy - } - fn resolve_path(&self, path: Option) -> PathBuf { path.as_ref() .map(PathBuf::from)