exploration: create Session as part of Codex::spawn() (#2291)

Historically, `Codex::spawn()` would create the instance of `Codex` and
enforce, by construction, that `Op::ConfigureSession` was the first `Op`
submitted via `submit()`. Then over in `submission_loop()`, it would
handle the case for taking the parameters of `Op::ConfigureSession` and
turning it into a `Session`.

This approach has two challenges from a state management perspective:


f968a1327a/codex-rs/core/src/codex.rs (L718)

- The local `sess` variable in `submission_loop()` has to be `mut` and
`Option<Arc<Session>>` because it is not invariant that a `Session` is
present for the lifetime of the loop, so there is a lot of logic to deal
with the case where `sess` is `None` (e.g., the `send_no_session_event`
function and all of its callsites).
- `submission_loop()` is written in such a way that
`Op::ConfigureSession` could be observed multiple times, but in
practice, it is only observed exactly once at the start of the loop.

In this PR, we try to simplify the state management by _removing_ the
`Op::ConfigureSession` enum variant and constructing the `Session` as
part of `Codex::spawn()` so that it can be passed to `submission_loop()`
as `Arc<Session>`. The original logic from the `Op::ConfigureSession`
has largely been moved to the new `Session::new()` constructor.

---

Incidentally, I also noticed that the handling of `Op::ConfigureSession`
can result in events being dispatched in addition to
`EventMsg::SessionConfigured`, as an `EventMsg::Error` is created for
every MCP initialization error, so it is important to preserve that
behavior:


f968a1327a/codex-rs/core/src/codex.rs (L901-L916)

Though admittedly, I believe this does not play nice with #2264, as
these error messages will likely be dispatched before the client has a
chance to call `addConversationListener`, so we likely need to make it
so `newConversation` automatically creates the subscription, but we must
also guarantee that the "ack" from `newConversation` is returned before
any other conversation-related notifications are sent so the client
knows what `conversation_id` to match on.
This commit is contained in:
Michael Bolin
2025-08-14 09:55:28 -07:00
committed by GitHub
parent f968a1327a
commit cf7a7e63a3
3 changed files with 260 additions and 340 deletions

View File

@@ -30,6 +30,7 @@ use tracing::trace;
use tracing::warn;
use uuid::Uuid;
use crate::ModelProviderInfo;
use crate::apply_patch::ApplyPatchExec;
use crate::apply_patch::CODEX_APPLY_PATCH_ARG1;
use crate::apply_patch::InternalApplyPatchInvocation;
@@ -41,6 +42,8 @@ use crate::client_common::EnvironmentContext;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::config::Config;
use crate::config_types::ReasoningEffort as ReasoningEffortConfig;
use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
use crate::config_types::ShellEnvironmentPolicy;
use crate::conversation_history::ConversationHistory;
use crate::error::CodexErr;
@@ -118,22 +121,25 @@ pub struct Codex {
/// unique session id.
pub struct CodexSpawnOk {
pub codex: Codex,
pub init_id: String,
pub session_id: Uuid,
}
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
impl Codex {
/// Spawn a new [`Codex`] and initialize the session.
pub async fn spawn(config: Config, auth: Option<CodexAuth>) -> CodexResult<CodexSpawnOk> {
// experimental resume path (undocumented)
let resume_path = config.experimental_resume.clone();
info!("resume_path: {resume_path:?}");
let (tx_sub, rx_sub) = async_channel::bounded(64);
let (tx_event, rx_event) = async_channel::unbounded();
let user_instructions = get_user_instructions(&config).await;
let configure_session = Op::ConfigureSession {
let config = Arc::new(config);
let resume_path = config.experimental_resume.clone();
let session_id = Uuid::new_v4();
let configure_session = ConfigureSession {
session_id,
provider: config.model_provider.clone(),
model: config.model.clone(),
model_reasoning_effort: config.model_reasoning_effort,
@@ -145,28 +151,26 @@ impl Codex {
disable_response_storage: config.disable_response_storage,
notify: config.notify.clone(),
cwd: config.cwd.clone(),
resume_path: resume_path.clone(),
resume_path,
};
let config = Arc::new(config);
// Generate a unique ID for the lifetime of this Codex session.
let session_id = Uuid::new_v4();
let session = Session::new(configure_session, config.clone(), auth, tx_event.clone())
.await
.map_err(|e| {
error!("Failed to create session: {e:#}");
CodexErr::InternalAgentDied
})?;
// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(session_id, config, auth, rx_sub, tx_event));
tokio::spawn(submission_loop(session, config, rx_sub));
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event,
};
let init_id = codex.submit(configure_session).await?;
Ok(CodexSpawnOk {
codex,
init_id,
session_id,
})
Ok(CodexSpawnOk { codex, session_id })
}
/// Submit the `op` wrapped in a `Submission` with a unique ID.
@@ -214,6 +218,7 @@ struct State {
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
pub(crate) struct Session {
session_id: Uuid,
client: ModelClient,
tx_event: Sender<Event>,
@@ -246,7 +251,216 @@ pub(crate) struct Session {
show_raw_agent_reasoning: bool,
}
/// Configure the model session.
struct ConfigureSession {
session_id: Uuid,
/// Provider identifier ("openai", "openrouter", ...).
provider: ModelProviderInfo,
/// If not specified, server will use its default model.
model: String,
model_reasoning_effort: ReasoningEffortConfig,
model_reasoning_summary: ReasoningSummaryConfig,
/// Model instructions that are appended to the base instructions.
user_instructions: Option<String>,
/// Base instructions override.
base_instructions: Option<String>,
/// When to escalate for approval for execution
approval_policy: AskForApproval,
/// How to sandbox commands executed in the system
sandbox_policy: SandboxPolicy,
/// Disable server-side response storage (send full context each request)
disable_response_storage: bool,
/// Optional external notifier command tokens. Present only when the
/// client wants the agent to spawn a program after each completed
/// turn.
notify: Option<Vec<String>>,
/// Working directory that should be treated as the *root* of the
/// session. All relative paths supplied by the model as well as the
/// execution sandbox are resolved against this directory **instead**
/// of the process-wide current working directory. CLI front-ends are
/// expected to expand this to an absolute path before sending the
/// `ConfigureSession` operation so that the business-logic layer can
/// operate deterministically.
cwd: PathBuf,
resume_path: Option<PathBuf>,
}
impl Session {
async fn new(
configure_session: ConfigureSession,
config: Arc<Config>,
auth: Option<CodexAuth>,
tx_event: Sender<Event>,
) -> anyhow::Result<Arc<Self>> {
let ConfigureSession {
mut session_id,
provider,
model,
model_reasoning_effort,
model_reasoning_summary,
user_instructions,
base_instructions,
approval_policy,
sandbox_policy,
disable_response_storage,
notify,
cwd,
resume_path,
} = configure_session;
debug!("Configuring session: model={model}; provider={provider:?}");
if !cwd.is_absolute() {
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
}
// Error messages to dispatch after SessionConfigured is sent.
let mut post_session_configured_error_events = Vec::<Event>::new();
// If `resume_path` is specified, then fail if we cannot resume the
// existing rollout. Though if `resume_path` is not specified and we
// fail to create a `RolloutRecorder`, potentially due to some sort of
// I/O error in attempting to create the log file, then we should still
// create the `Session`, but we do log an error.
let mut restored_items: Option<Vec<ResponseItem>> = None;
let rollout_recorder: Option<RolloutRecorder> = match resume_path.as_ref() {
Some(path) => match RolloutRecorder::resume(path, cwd.clone()).await {
Ok((rec, saved)) => {
session_id = saved.session_id;
if !saved.items.is_empty() {
restored_items = Some(saved.items);
}
Some(rec)
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to resume rollout from {path:?}: {e}"
));
}
},
None => {
match RolloutRecorder::new(&config, session_id, user_instructions.clone()).await {
Ok(r) => Some(r),
Err(e) => {
let message = format!("failed to initialise rollout recorder: {e}");
post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent {
message: message.clone(),
}),
});
warn!(message);
None
}
}
}
};
let client = ModelClient::new(
config.clone(),
auth.clone(),
provider.clone(),
model_reasoning_effort,
model_reasoning_summary,
session_id,
);
// Create the mutable state for the Session.
let mut state = State {
history: ConversationHistory::new(),
..Default::default()
};
if let Some(restored_items) = restored_items {
state.history.record_items(&restored_items);
}
let writable_roots = get_writable_roots(&cwd);
let (mcp_connection_manager, failed_clients) =
match McpConnectionManager::new(config.mcp_servers.clone()).await {
Ok((mgr, failures)) => (mgr, failures),
Err(e) => {
let message = format!("Failed to create MCP connection manager: {e:#}");
error!("{message}");
post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent { message }),
});
(McpConnectionManager::default(), Default::default())
}
};
// Surface individual client start-up failures to the user.
if !failed_clients.is_empty() {
for (server_name, err) in failed_clients {
let message = format!("MCP client for `{server_name}` failed to start: {err:#}");
error!("{message}");
post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent { message }),
});
}
}
let default_shell = shell::default_user_shell().await;
let sess = Arc::new(Session {
session_id,
client,
tools_config: ToolsConfig::new(
&config.model_family,
approval_policy,
sandbox_policy.clone(),
config.include_plan_tool,
),
tx_event: tx_event.clone(),
user_instructions,
base_instructions,
approval_policy,
sandbox_policy,
shell_environment_policy: config.shell_environment_policy.clone(),
cwd,
writable_roots,
mcp_connection_manager,
notify,
state: Mutex::new(state),
rollout: Mutex::new(rollout_recorder),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
disable_response_storage,
user_shell: default_shell,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
});
// Gather history metadata for SessionConfiguredEvent.
let (history_log_id, history_entry_count) =
crate::message_history::history_metadata(&config).await;
// ack
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id,
model,
history_log_id,
history_entry_count,
}),
})
.chain(post_session_configured_error_events.into_iter());
for event in events {
if let Err(e) = tx_event.send(event).await {
error!("failed to send event: {e:?}");
}
}
Ok(sess)
}
pub(crate) fn get_writable_roots(&self) -> &[PathBuf] {
&self.writable_roots
}
@@ -628,16 +842,6 @@ impl Drop for Session {
}
}
impl State {
pub fn partial_clone(&self) -> Self {
Self {
approved_commands: self.approved_commands.clone(),
history: self.history.clone(),
..Default::default()
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct ExecCommandContext {
pub(crate) sub_id: String,
@@ -708,262 +912,36 @@ impl AgentTask {
}
}
async fn submission_loop(
mut session_id: Uuid,
config: Arc<Config>,
auth: Option<CodexAuth>,
rx_sub: Receiver<Submission>,
tx_event: Sender<Event>,
) {
let mut sess: Option<Arc<Session>> = None;
// shorthand - send an event when there is no active session
let send_no_session_event = |sub_id: String| async {
let event = Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent {
message: "No session initialized, expected 'ConfigureSession' as first Op"
.to_string(),
}),
};
tx_event.send(event).await.ok();
};
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
debug!(?sub, "Submission");
match sub.op {
Op::Interrupt => {
let sess = match sess.as_ref() {
Some(sess) => sess,
None => {
send_no_session_event(sub.id).await;
continue;
}
};
sess.abort();
}
Op::ConfigureSession {
provider,
model,
model_reasoning_effort,
model_reasoning_summary,
user_instructions,
base_instructions,
approval_policy,
sandbox_policy,
disable_response_storage,
notify,
cwd,
resume_path,
} => {
debug!(
"Configuring session: model={model}; provider={provider:?}; resume={resume_path:?}"
);
if !cwd.is_absolute() {
let message = format!("cwd is not absolute: {cwd:?}");
error!(message);
let event = Event {
id: sub.id,
msg: EventMsg::Error(ErrorEvent { message }),
};
if let Err(e) = tx_event.send(event).await {
error!("failed to send error message: {e:?}");
}
return;
}
// Optionally resume an existing rollout.
let mut restored_items: Option<Vec<ResponseItem>> = None;
let rollout_recorder: Option<RolloutRecorder> =
if let Some(path) = resume_path.as_ref() {
match RolloutRecorder::resume(path, cwd.clone()).await {
Ok((rec, saved)) => {
session_id = saved.session_id;
if !saved.items.is_empty() {
restored_items = Some(saved.items);
}
Some(rec)
}
Err(e) => {
warn!("failed to resume rollout from {path:?}: {e}");
None
}
}
} else {
None
};
let rollout_recorder = match rollout_recorder {
Some(rec) => Some(rec),
None => {
match RolloutRecorder::new(&config, session_id, user_instructions.clone())
.await
{
Ok(r) => Some(r),
Err(e) => {
warn!("failed to initialise rollout recorder: {e}");
None
}
}
}
};
let client = ModelClient::new(
config.clone(),
auth.clone(),
provider.clone(),
model_reasoning_effort,
model_reasoning_summary,
session_id,
);
// abort any current running session and clone its state
let state = match sess.take() {
Some(sess) => {
sess.abort();
sess.state.lock().unwrap().partial_clone()
}
None => State {
history: ConversationHistory::new(),
..Default::default()
},
};
let writable_roots = get_writable_roots(&cwd);
// Error messages to dispatch after SessionConfigured is sent.
let mut mcp_connection_errors = Vec::<Event>::new();
let (mcp_connection_manager, failed_clients) =
match McpConnectionManager::new(config.mcp_servers.clone()).await {
Ok((mgr, failures)) => (mgr, failures),
Err(e) => {
let message = format!("Failed to create MCP connection manager: {e:#}");
error!("{message}");
mcp_connection_errors.push(Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent { message }),
});
(McpConnectionManager::default(), Default::default())
}
};
// Surface individual client start-up failures to the user.
if !failed_clients.is_empty() {
for (server_name, err) in failed_clients {
let message =
format!("MCP client for `{server_name}` failed to start: {err:#}");
error!("{message}");
mcp_connection_errors.push(Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent { message }),
});
}
}
let default_shell = shell::default_user_shell().await;
sess = Some(Arc::new(Session {
client,
tools_config: ToolsConfig::new(
&config.model_family,
approval_policy,
sandbox_policy.clone(),
config.include_plan_tool,
),
tx_event: tx_event.clone(),
user_instructions,
base_instructions,
approval_policy,
sandbox_policy,
shell_environment_policy: config.shell_environment_policy.clone(),
cwd,
writable_roots,
mcp_connection_manager,
notify,
state: Mutex::new(state),
rollout: Mutex::new(rollout_recorder),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
disable_response_storage,
user_shell: default_shell,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
}));
// Patch restored state into the newly created session.
if let Some(sess_arc) = &sess {
if restored_items.is_some() {
let mut st = sess_arc.state.lock().unwrap();
st.history.record_items(restored_items.unwrap().iter());
}
}
// Gather history metadata for SessionConfiguredEvent.
let (history_log_id, history_entry_count) =
crate::message_history::history_metadata(&config).await;
// ack
let events = std::iter::once(Event {
id: sub.id.clone(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id,
model,
history_log_id,
history_entry_count,
}),
})
.chain(mcp_connection_errors.into_iter());
for event in events {
if let Err(e) = tx_event.send(event).await {
error!("failed to send event: {e:?}");
}
}
}
Op::UserInput { items } => {
let sess = match sess.as_ref() {
Some(sess) => sess,
None => {
send_no_session_event(sub.id).await;
continue;
}
};
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items) {
// no current task, spawn a new one
let task = AgentTask::spawn(Arc::clone(sess), sub.id, items);
let task = AgentTask::spawn(sess.clone(), sub.id, items);
sess.set_task(task);
}
}
Op::ExecApproval { id, decision } => {
let sess = match sess.as_ref() {
Some(sess) => sess,
None => {
send_no_session_event(sub.id).await;
continue;
}
};
match decision {
ReviewDecision::Abort => {
sess.abort();
}
other => sess.notify_approval(&id, other),
Op::ExecApproval { id, decision } => match decision {
ReviewDecision::Abort => {
sess.abort();
}
}
Op::PatchApproval { id, decision } => {
let sess = match sess.as_ref() {
Some(sess) => sess,
None => {
send_no_session_event(sub.id).await;
continue;
}
};
match decision {
ReviewDecision::Abort => {
sess.abort();
}
other => sess.notify_approval(&id, other),
other => sess.notify_approval(&id, other),
},
Op::PatchApproval { id, decision } => match decision {
ReviewDecision::Abort => {
sess.abort();
}
}
other => sess.notify_approval(&id, other),
},
Op::AddToHistory { text } => {
// TODO: What should we do if we got AddToHistory before ConfigureSession?
// currently, if ConfigureSession has resume path, this history will be ignored
let id = session_id;
let id = sess.session_id;
let config = config.clone();
tokio::spawn(async move {
if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await
@@ -975,7 +953,7 @@ async fn submission_loop(
Op::GetHistoryEntryRequest { offset, log_id } => {
let config = config.clone();
let tx_event = tx_event.clone();
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
tokio::spawn(async move {
@@ -1003,14 +981,6 @@ async fn submission_loop(
});
}
Op::Compact => {
let sess = match sess.as_ref() {
Some(sess) => sess,
None => {
send_no_session_event(sub.id).await;
continue;
}
};
// Create a summarization request as user input
const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md");
@@ -1032,28 +1002,27 @@ async fn submission_loop(
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
if let Some(sess_arc) = sess {
let recorder_opt = sess_arc.rollout.lock().unwrap().take();
if let Some(rec) = recorder_opt {
if let Err(e) = rec.shutdown().await {
warn!("failed to shutdown rollout recorder: {e}");
let event = Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send error message: {e:?}");
}
let recorder_opt = sess.rollout.lock().unwrap().take();
if let Some(rec) = recorder_opt {
if let Err(e) = rec.shutdown().await {
warn!("failed to shutdown rollout recorder: {e}");
let event = Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send error message: {e:?}");
}
}
}
let event = Event {
id: sub.id.clone(),
msg: EventMsg::ShutdownComplete,
};
if let Err(e) = tx_event.send(event).await {
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send Shutdown event: {e}");
}
break;