From 45bd5ca4b90c636b0717178cfdcb862a3f145773 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 10 Sep 2025 10:17:24 -0700 Subject: [PATCH] Move initial history to protocol (#3422) To fix an edge case of forking then resuming #3419 --- codex-rs/core/src/codex.rs | 4 +- codex-rs/core/src/conversation_manager.rs | 71 +------------- codex-rs/core/src/git_info.rs | 14 +-- codex-rs/core/src/lib.rs | 2 +- codex-rs/core/src/rollout/list.rs | 4 +- codex-rs/core/src/rollout/mod.rs | 2 +- codex-rs/core/src/rollout/policy.rs | 2 +- codex-rs/core/src/rollout/recorder.rs | 43 ++------ codex-rs/core/tests/suite/cli_stream.rs | 3 +- codex-rs/protocol/src/protocol.rs | 114 ++++++++++++++++++++++ 10 files changed, 132 insertions(+), 127 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5bd5c504..dcdad6c1 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -10,7 +10,6 @@ use std::time::Duration; use crate::AuthManager; use crate::event_mapping::map_response_item_to_event_messages; -use crate::rollout::recorder::RolloutItem; use async_channel::Receiver; use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; @@ -18,6 +17,7 @@ use codex_apply_patch::MaybeApplyPatchVerified; use codex_apply_patch::maybe_parse_apply_patch_verified; use codex_protocol::mcp_protocol::ConversationId; use codex_protocol::protocol::ConversationHistoryResponseEvent; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::TaskStartedEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; @@ -45,7 +45,6 @@ use crate::client_common::ResponseEvent; use crate::config::Config; use crate::config_types::ShellEnvironmentPolicy; use crate::conversation_history::ConversationHistory; -use crate::conversation_manager::InitialHistory; use crate::environment_context::EnvironmentContext; use crate::error::CodexErr; use crate::error::Result as CodexResult; @@ -122,6 +121,7 @@ use codex_protocol::models::LocalShellAction; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::models::ShellToolCallParams; +use codex_protocol::protocol::InitialHistory; // A convenience extension trait for acquiring mutex locks where poisoning is // unrecoverable and should abort the program. This avoids scattered `.unwrap()` diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index e352680c..c0695f16 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -11,82 +11,15 @@ use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::SessionConfiguredEvent; use crate::rollout::RolloutRecorder; -use crate::rollout::recorder::RolloutItem; use codex_protocol::mcp_protocol::ConversationId; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::InitialHistory; +use codex_protocol::protocol::RolloutItem; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; -#[derive(Debug, Clone)] -pub struct ResumedHistory { - pub conversation_id: ConversationId, - pub history: Vec, - pub rollout_path: PathBuf, -} - -#[derive(Debug, Clone)] -pub enum InitialHistory { - New, - Resumed(ResumedHistory), - Forked(Vec), -} - -impl InitialHistory { - pub(crate) fn get_rollout_items(&self) -> Vec { - match self { - InitialHistory::New => Vec::new(), - InitialHistory::Resumed(resumed) => resumed.history.clone(), - InitialHistory::Forked(items) => items.clone(), - } - } - pub fn get_response_items(&self) -> Vec { - match self { - InitialHistory::New => Vec::new(), - InitialHistory::Resumed(resumed) => resumed - .history - .iter() - .filter_map(|ri| match ri { - RolloutItem::ResponseItem(item) => Some(item.clone()), - _ => None, - }) - .collect(), - InitialHistory::Forked(items) => items - .iter() - .filter_map(|ri| match ri { - RolloutItem::ResponseItem(item) => Some(item.clone()), - _ => None, - }) - .collect(), - } - } - pub fn get_event_msgs(&self) -> Option> { - match self { - InitialHistory::New => None, - InitialHistory::Resumed(resumed) => Some( - resumed - .history - .iter() - .filter_map(|ri| match ri { - RolloutItem::EventMsg(ev) => Some(ev.clone()), - _ => None, - }) - .collect(), - ), - InitialHistory::Forked(items) => Some( - items - .iter() - .filter_map(|ri| match ri { - RolloutItem::EventMsg(ev) => Some(ev.clone()), - _ => None, - }) - .collect(), - ), - } - } -} - /// Represents a newly created Codex conversation, including the first event /// (which is [`EventMsg::SessionConfigured`]). pub struct NewConversation { diff --git a/codex-rs/core/src/git_info.rs b/codex-rs/core/src/git_info.rs index 79a8647a..619f4f52 100644 --- a/codex-rs/core/src/git_info.rs +++ b/codex-rs/core/src/git_info.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::path::PathBuf; use codex_protocol::mcp_protocol::GitSha; +use codex_protocol::protocol::GitInfo; use futures::future::join_all; use serde::Deserialize; use serde::Serialize; @@ -43,19 +44,6 @@ pub fn get_git_repo_root(base_dir: &Path) -> Option { /// Timeout for git commands to prevent freezing on large repositories const GIT_COMMAND_TIMEOUT: TokioDuration = TokioDuration::from_secs(5); -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct GitInfo { - /// Current commit hash (SHA) - #[serde(skip_serializing_if = "Option::is_none")] - pub commit_hash: Option, - /// Current branch name - #[serde(skip_serializing_if = "Option::is_none")] - pub branch: Option, - /// Repository URL (if available from remote) - #[serde(skip_serializing_if = "Option::is_none")] - pub repository_url: Option, -} - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct GitDiffToRemote { pub sha: GitSha, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 352bd482..c0531530 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -42,8 +42,8 @@ pub use model_provider_info::built_in_model_providers; pub use model_provider_info::create_oss_provider_with_base_url; mod conversation_manager; mod event_mapping; +pub use codex_protocol::protocol::InitialHistory; pub use conversation_manager::ConversationManager; -pub use conversation_manager::InitialHistory; pub use conversation_manager::NewConversation; // Re-export common auth types for workspace consumers pub use auth::AuthManager; diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index c4feb629..417638e7 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -10,9 +10,9 @@ use time::macros::format_description; use uuid::Uuid; use super::SESSIONS_SUBDIR; -use super::recorder::RolloutItem; -use super::recorder::RolloutLine; use crate::protocol::EventMsg; +use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::RolloutLine; /// Returned page of conversation summaries. #[derive(Debug, Default, PartialEq)] diff --git a/codex-rs/core/src/rollout/mod.rs b/codex-rs/core/src/rollout/mod.rs index 341d39ae..6bf1cf94 100644 --- a/codex-rs/core/src/rollout/mod.rs +++ b/codex-rs/core/src/rollout/mod.rs @@ -7,9 +7,9 @@ pub mod list; pub(crate) mod policy; pub mod recorder; +pub use codex_protocol::protocol::SessionMeta; pub use recorder::RolloutRecorder; pub use recorder::RolloutRecorderParams; -pub use recorder::SessionMeta; #[cfg(test)] pub mod tests; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 676f8e90..7c582604 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -1,5 +1,5 @@ use crate::protocol::EventMsg; -use crate::rollout::recorder::RolloutItem; +use crate::protocol::RolloutItem; use codex_protocol::models::ResponseItem; /// Whether a rollout `item` should be persisted in rollout files. diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 5451af0d..f5609f72 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -26,46 +26,15 @@ use super::list::Cursor; use super::list::get_conversations; use super::policy::is_persisted_response_item; use crate::config::Config; -use crate::conversation_manager::InitialHistory; -use crate::conversation_manager::ResumedHistory; use crate::default_client::ORIGINATOR; -use crate::git_info::GitInfo; use crate::git_info::collect_git_info; -use crate::protocol::EventMsg; use codex_protocol::models::ResponseItem; - -#[derive(Serialize, Deserialize, Clone, Default, Debug)] -pub struct SessionMeta { - pub id: ConversationId, - pub timestamp: String, - pub cwd: PathBuf, - pub originator: String, - pub cli_version: String, - pub instructions: Option, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct SessionMetaLine { - #[serde(flatten)] - meta: SessionMeta, - #[serde(skip_serializing_if = "Option::is_none")] - git: Option, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(tag = "type", content = "payload", rename_all = "snake_case")] -pub enum RolloutItem { - SessionMeta(SessionMetaLine), - ResponseItem(ResponseItem), - EventMsg(EventMsg), -} - -#[derive(Serialize, Deserialize, Clone)] -pub(crate) struct RolloutLine { - pub(crate) timestamp: String, - #[serde(flatten)] - pub(crate) item: RolloutItem, -} +use codex_protocol::protocol::InitialHistory; +use codex_protocol::protocol::ResumedHistory; +use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::RolloutLine; +use codex_protocol::protocol::SessionMeta; +use codex_protocol::protocol::SessionMetaLine; #[derive(Serialize, Deserialize, Default, Clone)] pub struct SessionStateSnapshot {} diff --git a/codex-rs/core/tests/suite/cli_stream.rs b/codex-rs/core/tests/suite/cli_stream.rs index 71624739..01b9d6bf 100644 --- a/codex-rs/core/tests/suite/cli_stream.rs +++ b/codex-rs/core/tests/suite/cli_stream.rs @@ -1,5 +1,6 @@ use assert_cmd::Command as AssertCommand; use codex_core::RolloutRecorder; +use codex_core::protocol::GitInfo; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use std::time::Duration; use std::time::Instant; @@ -617,7 +618,7 @@ async fn integration_git_info_unit_test() { // 5. Test serialization to ensure it works in SessionMeta let serialized = serde_json::to_string(&git_info).unwrap(); - let deserialized: codex_core::git_info::GitInfo = serde_json::from_str(&serialized).unwrap(); + let deserialized: GitInfo = serde_json::from_str(&serialized).unwrap(); assert_eq!(git_info.commit_hash, deserialized.commit_hash); assert_eq!(git_info.branch, deserialized.branch); diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 2f1c4636..860bb59f 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -804,6 +804,120 @@ pub struct ConversationHistoryResponseEvent { pub entries: Vec, } +#[derive(Debug, Clone, Deserialize, Serialize, TS)] +pub struct ResumedHistory { + pub conversation_id: ConversationId, + pub history: Vec, + pub rollout_path: PathBuf, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS)] +pub enum InitialHistory { + New, + Resumed(ResumedHistory), + Forked(Vec), +} + +impl InitialHistory { + pub fn get_rollout_items(&self) -> Vec { + match self { + InitialHistory::New => Vec::new(), + InitialHistory::Resumed(resumed) => resumed.history.clone(), + InitialHistory::Forked(items) => items.clone(), + } + } + pub fn get_response_items(&self) -> Vec { + match self { + InitialHistory::New => Vec::new(), + InitialHistory::Resumed(resumed) => resumed + .history + .iter() + .filter_map(|ri| match ri { + RolloutItem::ResponseItem(item) => Some(item.clone()), + _ => None, + }) + .collect(), + InitialHistory::Forked(items) => items + .iter() + .filter_map(|ri| match ri { + RolloutItem::ResponseItem(item) => Some(item.clone()), + _ => None, + }) + .collect(), + } + } + pub fn get_event_msgs(&self) -> Option> { + match self { + InitialHistory::New => None, + InitialHistory::Resumed(resumed) => Some( + resumed + .history + .iter() + .filter_map(|ri| match ri { + RolloutItem::EventMsg(ev) => Some(ev.clone()), + _ => None, + }) + .collect(), + ), + InitialHistory::Forked(items) => Some( + items + .iter() + .filter_map(|ri| match ri { + RolloutItem::EventMsg(ev) => Some(ev.clone()), + _ => None, + }) + .collect(), + ), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Default, Debug, TS)] +pub struct SessionMeta { + pub id: ConversationId, + pub timestamp: String, + pub cwd: PathBuf, + pub originator: String, + pub cli_version: String, + pub instructions: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, TS)] +pub struct SessionMetaLine { + #[serde(flatten)] + pub meta: SessionMeta, + #[serde(skip_serializing_if = "Option::is_none")] + pub git: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, TS)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum RolloutItem { + SessionMeta(SessionMetaLine), + ResponseItem(ResponseItem), + EventMsg(EventMsg), +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct RolloutLine { + pub timestamp: String, + #[serde(flatten)] + pub item: RolloutItem, +} + +#[derive(Serialize, Deserialize, Clone, Debug, TS)] +pub struct GitInfo { + /// Current commit hash (SHA) + #[serde(skip_serializing_if = "Option::is_none")] + pub commit_hash: Option, + /// Current branch name + #[serde(skip_serializing_if = "Option::is_none")] + pub branch: Option, + /// Repository URL (if available from remote) + #[serde(skip_serializing_if = "Option::is_none")] + pub repository_url: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize, TS)] pub struct ExecCommandBeginEvent { /// Identifier so this can be paired with the ExecCommandEnd event.