fix: parallelize logic in Session::new() (#2305)

#2291 made it so that `Session::new()` is on the critical path to
`Codex::spawn()`, which means it is on the hot path to CLI startup. This
refactors `Session::new()` to run a number of async tasks in parallel
that were previously run serially to try to reduce latency.
This commit is contained in:
Michael Bolin
2025-08-14 13:29:58 -07:00
committed by GitHub
parent b62c2d9552
commit 8f11652458

View File

@@ -137,9 +137,7 @@ impl Codex {
let config = Arc::new(config); let config = Arc::new(config);
let resume_path = config.experimental_resume.clone(); let resume_path = config.experimental_resume.clone();
let session_id = Uuid::new_v4();
let configure_session = ConfigureSession { let configure_session = ConfigureSession {
session_id,
provider: config.model_provider.clone(), provider: config.model_provider.clone(),
model: config.model.clone(), model: config.model.clone(),
model_reasoning_effort: config.model_reasoning_effort, model_reasoning_effort: config.model_reasoning_effort,
@@ -161,6 +159,7 @@ impl Codex {
error!("Failed to create session: {e:#}"); error!("Failed to create session: {e:#}");
CodexErr::InternalAgentDied CodexErr::InternalAgentDied
})?; })?;
let session_id = session.session_id;
// This task will run until Op::Shutdown is received. // This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(session, config, rx_sub)); tokio::spawn(submission_loop(session, config, rx_sub));
@@ -253,8 +252,6 @@ pub(crate) struct Session {
/// Configure the model session. /// Configure the model session.
struct ConfigureSession { struct ConfigureSession {
session_id: Uuid,
/// Provider identifier ("openai", "openrouter", ...). /// Provider identifier ("openai", "openrouter", ...).
provider: ModelProviderInfo, provider: ModelProviderInfo,
@@ -302,7 +299,6 @@ impl Session {
tx_event: Sender<Event>, tx_event: Sender<Event>,
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>> {
let ConfigureSession { let ConfigureSession {
mut session_id,
provider, provider,
model, model,
model_reasoning_effort, model_reasoning_effort,
@@ -324,53 +320,85 @@ impl Session {
// Error messages to dispatch after SessionConfigured is sent. // Error messages to dispatch after SessionConfigured is sent.
let mut post_session_configured_error_events = Vec::<Event>::new(); let mut post_session_configured_error_events = Vec::<Event>::new();
// If `resume_path` is specified, then fail if we cannot resume the // Kick off independent async setup tasks in parallel to reduce startup latency.
// existing rollout. Though if `resume_path` is not specified and we //
// fail to create a `RolloutRecorder`, potentially due to some sort of // - initialize RolloutRecorder with new or resumed session info
// I/O error in attempting to create the log file, then we should still // - spin up MCP connection manager
// create the `Session`, but we do log an error. // - perform default shell discovery
let mut restored_items: Option<Vec<ResponseItem>> = None; // - load history metadata
let rollout_recorder: Option<RolloutRecorder> = match resume_path.as_ref() { let rollout_fut = async {
Some(path) => match RolloutRecorder::resume(path, cwd.clone()).await { match resume_path.as_ref() {
Ok((rec, saved)) => { Some(path) => RolloutRecorder::resume(path, cwd.clone())
session_id = saved.session_id; .await
if !saved.items.is_empty() { .map(|(rec, saved)| (saved.session_id, Some(saved), rec)),
restored_items = Some(saved.items); None => {
} let session_id = Uuid::new_v4();
Some(rec) RolloutRecorder::new(&config, session_id, user_instructions.clone())
} .await
Err(e) => { .map(|rec| (session_id, None, rec))
return Err(anyhow::anyhow!(
"failed to resume rollout from {path:?}: {e}"
));
}
},
None => {
match RolloutRecorder::new(&config, session_id, user_instructions.clone()).await {
Ok(r) => Some(r),
Err(e) => {
let message = format!("failed to initialise rollout recorder: {e}");
post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent {
message: message.clone(),
}),
});
warn!(message);
None
}
} }
} }
}; };
let client = ModelClient::new( let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
config.clone(), let default_shell_fut = shell::default_user_shell();
auth.clone(), let history_meta_fut = crate::message_history::history_metadata(&config);
provider.clone(),
model_reasoning_effort, // Join all independent futures.
model_reasoning_summary, let (rollout_res, mcp_res, default_shell, (history_log_id, history_entry_count)) =
tokio::join!(rollout_fut, mcp_fut, default_shell_fut, history_meta_fut);
// Handle rollout result, which determines the session_id.
struct RolloutResult {
session_id: Uuid,
rollout_recorder: Option<RolloutRecorder>,
restored_items: Option<Vec<ResponseItem>>,
}
let rollout_result = match rollout_res {
Ok((session_id, maybe_saved, recorder)) => {
let restored_items: Option<Vec<ResponseItem>> =
maybe_saved.and_then(|saved_session| {
if saved_session.items.is_empty() {
None
} else {
Some(saved_session.items)
}
});
RolloutResult {
session_id,
rollout_recorder: Some(recorder),
restored_items,
}
}
Err(e) => {
if let Some(path) = resume_path.as_ref() {
return Err(anyhow::anyhow!(
"failed to resume rollout from {path:?}: {e}"
));
}
let message = format!("failed to initialize rollout recorder: {e}");
post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent {
message: message.clone(),
}),
});
warn!("{message}");
RolloutResult {
session_id: Uuid::new_v4(),
rollout_recorder: None,
restored_items: None,
}
}
};
let RolloutResult {
session_id, session_id,
); rollout_recorder,
restored_items,
} = rollout_result;
// Create the mutable state for the Session. // Create the mutable state for the Session.
let mut state = State { let mut state = State {
@@ -383,19 +411,19 @@ impl Session {
let writable_roots = get_writable_roots(&cwd); let writable_roots = get_writable_roots(&cwd);
let (mcp_connection_manager, failed_clients) = // Handle MCP manager result and record any startup failures.
match McpConnectionManager::new(config.mcp_servers.clone()).await { let (mcp_connection_manager, failed_clients) = match mcp_res {
Ok((mgr, failures)) => (mgr, failures), Ok((mgr, failures)) => (mgr, failures),
Err(e) => { Err(e) => {
let message = format!("Failed to create MCP connection manager: {e:#}"); let message = format!("Failed to create MCP connection manager: {e:#}");
error!("{message}"); error!("{message}");
post_session_configured_error_events.push(Event { post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(), id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent { message }), msg: EventMsg::Error(ErrorEvent { message }),
}); });
(McpConnectionManager::default(), Default::default()) (McpConnectionManager::default(), Default::default())
} }
}; };
// Surface individual client start-up failures to the user. // Surface individual client start-up failures to the user.
if !failed_clients.is_empty() { if !failed_clients.is_empty() {
@@ -409,7 +437,16 @@ impl Session {
} }
} }
let default_shell = shell::default_user_shell().await; // Now that `session_id` is final (may have been updated by resume),
// construct the model client.
let client = ModelClient::new(
config.clone(),
auth.clone(),
provider.clone(),
model_reasoning_effort,
model_reasoning_summary,
session_id,
);
let sess = Arc::new(Session { let sess = Arc::new(Session {
session_id, session_id,
client, client,
@@ -437,25 +474,20 @@ impl Session {
show_raw_agent_reasoning: config.show_raw_agent_reasoning, show_raw_agent_reasoning: config.show_raw_agent_reasoning,
}); });
// record the initial user instructions and environment context, regardless of whether we restored items. // record the initial user instructions and environment context,
if let Some(user_instructions) = sess.get_user_instructions().clone() { // regardless of whether we restored items.
sess.record_conversation_items(&[Prompt::format_user_instructions_message( let mut conversation_items = Vec::<ResponseItem>::with_capacity(2);
&user_instructions, if let Some(user_instructions) = sess.user_instructions.as_deref() {
)]) conversation_items.push(Prompt::format_user_instructions_message(user_instructions));
.await;
} }
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new( conversation_items.push(ResponseItem::from(EnvironmentContext::new(
sess.get_cwd().to_path_buf(), sess.get_cwd().to_path_buf(),
sess.get_approval_policy(), sess.get_approval_policy(),
sess.get_sandbox_policy().clone(), sess.sandbox_policy.clone(),
))]) )));
.await; sess.record_conversation_items(&conversation_items).await;
// Gather history metadata for SessionConfiguredEvent. // Dispatch the SessionConfiguredEvent first and then report any errors.
let (history_log_id, history_entry_count) =
crate::message_history::history_metadata(&config).await;
// ack
let events = std::iter::once(Event { let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(), id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent { msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
@@ -487,14 +519,6 @@ impl Session {
&self.cwd &self.cwd
} }
pub(crate) fn get_user_instructions(&self) -> Option<String> {
self.user_instructions.clone()
}
pub(crate) fn get_sandbox_policy(&self) -> &SandboxPolicy {
&self.sandbox_policy
}
fn resolve_path(&self, path: Option<String>) -> PathBuf { fn resolve_path(&self, path: Option<String>) -> PathBuf {
path.as_ref() path.as_ref()
.map(PathBuf::from) .map(PathBuf::from)