Files
llmx/codex-rs/core/src/conversation_manager.rs
Michael Bolin cf7a7e63a3 exploration: create Session as part of Codex::spawn() (#2291)
Historically, `Codex::spawn()` would create the instance of `Codex` and
enforce, by construction, that `Op::ConfigureSession` was the first `Op`
submitted via `submit()`. Then over in `submission_loop()`, it would
handle the case for taking the parameters of `Op::ConfigureSession` and
turning it into a `Session`.

This approach has two challenges from a state management perspective:


f968a1327a/codex-rs/core/src/codex.rs (L718)

- The local `sess` variable in `submission_loop()` has to be `mut` and
`Option<Arc<Session>>` because it is not invariant that a `Session` is
present for the lifetime of the loop, so there is a lot of logic to deal
with the case where `sess` is `None` (e.g., the `send_no_session_event`
function and all of its callsites).
- `submission_loop()` is written in such a way that
`Op::ConfigureSession` could be observed multiple times, but in
practice, it is only observed exactly once at the start of the loop.

In this PR, we try to simplify the state management by _removing_ the
`Op::ConfigureSession` enum variant and constructing the `Session` as
part of `Codex::spawn()` so that it can be passed to `submission_loop()`
as `Arc<Session>`. The original logic from the `Op::ConfigureSession`
has largely been moved to the new `Session::new()` constructor.

---

Incidentally, I also noticed that the handling of `Op::ConfigureSession`
can result in events being dispatched in addition to
`EventMsg::SessionConfigured`, as an `EventMsg::Error` is created for
every MCP initialization error, so it is important to preserve that
behavior:


f968a1327a/codex-rs/core/src/codex.rs (L901-L916)

Though admittedly, I believe this does not play nice with #2264, as
these error messages will likely be dispatched before the client has a
chance to call `addConversationListener`, so we likely need to make it
so `newConversation` automatically creates the subscription, but we must
also guarantee that the "ack" from `newConversation` is returned before
any other conversation-related notifications are sent so the client
knows what `conversation_id` to match on.
2025-08-14 09:55:28 -07:00

97 lines
2.9 KiB
Rust

use std::collections::HashMap;
use std::sync::Arc;
use codex_login::CodexAuth;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex::INITIAL_SUBMIT_ID;
use crate::codex_conversation::CodexConversation;
use crate::config::Config;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
/// Represents a newly created Codex conversation, including the first event
/// (which is [`EventMsg::SessionConfigured`]).
pub struct NewConversation {
pub conversation_id: Uuid,
pub conversation: Arc<CodexConversation>,
pub session_configured: SessionConfiguredEvent,
}
/// [`ConversationManager`] is responsible for creating conversations and
/// maintaining them in memory.
pub struct ConversationManager {
conversations: Arc<RwLock<HashMap<Uuid, Arc<CodexConversation>>>>,
}
impl Default for ConversationManager {
fn default() -> Self {
Self {
conversations: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl ConversationManager {
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
let auth = CodexAuth::from_codex_home(&config.codex_home)?;
self.new_conversation_with_auth(config, auth).await
}
/// Used for integration tests: should not be used by ordinary business
/// logic.
pub async fn new_conversation_with_auth(
&self,
config: Config,
auth: Option<CodexAuth>,
) -> CodexResult<NewConversation> {
let CodexSpawnOk {
codex,
session_id: conversation_id,
} = Codex::spawn(config, auth).await?;
// The first event must be `SessionInitialized`. Validate and forward it
// to the caller so that they can display it in the conversation
// history.
let event = codex.next_event().await?;
let session_configured = match event {
Event {
id,
msg: EventMsg::SessionConfigured(session_configured),
} if id == INITIAL_SUBMIT_ID => session_configured,
_ => {
return Err(CodexErr::SessionConfiguredNotFirstEvent);
}
};
let conversation = Arc::new(CodexConversation::new(codex));
self.conversations
.write()
.await
.insert(conversation_id, conversation.clone());
Ok(NewConversation {
conversation_id,
conversation,
session_configured,
})
}
pub async fn get_conversation(
&self,
conversation_id: Uuid,
) -> CodexResult<Arc<CodexConversation>> {
let conversations = self.conversations.read().await;
conversations
.get(&conversation_id)
.cloned()
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}
}