From 311ad0ce267e109853dfb30a4ce93e3f7f011f17 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Fri, 22 Aug 2025 17:06:09 -0700 Subject: [PATCH] fork conversation from a previous message (#2575) This can be the underlying logic in order to start a conversation from a previous message. will need some love in the UI. Base for building this: #2588 --- codex-rs/core/src/codex.rs | 31 ++++- codex-rs/core/src/conversation_manager.rs | 129 +++++++++++++++++- .../src/event_processor_with_human_output.rs | 1 + codex-rs/mcp-server/src/codex_tool_runner.rs | 1 + codex-rs/protocol/src/protocol.rs | 15 ++ codex-rs/tui/src/chatwidget.rs | 1 + 6 files changed, 174 insertions(+), 4 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index cdf94210..b08c8e4a 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -14,6 +14,7 @@ use codex_apply_patch::ApplyPatchAction; use codex_apply_patch::MaybeApplyPatchVerified; use codex_apply_patch::maybe_parse_apply_patch_verified; use codex_login::AuthManager; +use codex_protocol::protocol::ConversationHistoryResponseEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use futures::prelude::*; @@ -147,6 +148,7 @@ impl Codex { pub async fn spawn( config: Config, auth_manager: Arc, + initial_history: Option>, ) -> CodexResult { let (tx_sub, rx_sub) = async_channel::bounded(64); let (tx_event, rx_event) = async_channel::unbounded(); @@ -177,6 +179,7 @@ impl Codex { config.clone(), auth_manager.clone(), tx_event.clone(), + initial_history, ) .await .map_err(|e| { @@ -186,7 +189,12 @@ impl Codex { let session_id = session.session_id; // This task will run until Op::Shutdown is received. - tokio::spawn(submission_loop(session, turn_context, config, rx_sub)); + tokio::spawn(submission_loop( + session.clone(), + turn_context, + config, + rx_sub, + )); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, @@ -332,6 +340,7 @@ impl Session { config: Arc, auth_manager: Arc, tx_event: Sender, + initial_history: Option>, ) -> anyhow::Result<(Arc, TurnContext)> { let ConfigureSession { provider, @@ -391,14 +400,15 @@ impl Session { } let rollout_result = match rollout_res { Ok((session_id, maybe_saved, recorder)) => { - let restored_items: Option> = + let restored_items: Option> = initial_history.or_else(|| { maybe_saved.and_then(|saved_session| { if saved_session.items.is_empty() { None } else { Some(saved_session.items) } - }); + }) + }); RolloutResult { session_id, rollout_recorder: Some(recorder), @@ -1286,6 +1296,21 @@ async fn submission_loop( } break; } + Op::GetHistory => { + let tx_event = sess.tx_event.clone(); + let sub_id = sub.id.clone(); + + let event = Event { + id: sub_id.clone(), + msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent { + conversation_id: sess.session_id, + entries: sess.state.lock_unchecked().history.contents(), + }), + }; + if let Err(e) = tx_event.send(event).await { + warn!("failed to send ConversationHistory event: {e}"); + } + } _ => { // Ignore unknown ops; enum is non_exhaustive to allow extensions. } diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index b5538431..fd90f546 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -16,6 +16,7 @@ use crate::error::Result as CodexResult; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::SessionConfiguredEvent; +use codex_protocol::models::ResponseItem; /// Represents a newly created Codex conversation, including the first event /// (which is [`EventMsg::SessionConfigured`]). @@ -59,8 +60,18 @@ impl ConversationManager { let CodexSpawnOk { codex, session_id: conversation_id, - } = Codex::spawn(config, auth_manager).await?; + } = { + let initial_history = None; + Codex::spawn(config, auth_manager, initial_history).await? + }; + self.finalize_spawn(codex, conversation_id).await + } + async fn finalize_spawn( + &self, + codex: Codex, + conversation_id: Uuid, + ) -> CodexResult { // The first event must be `SessionInitialized`. Validate and forward it // to the caller so that they can display it in the conversation // history. @@ -98,4 +109,120 @@ impl ConversationManager { .cloned() .ok_or_else(|| CodexErr::ConversationNotFound(conversation_id)) } + + /// Fork an existing conversation by dropping the last `drop_last_messages` + /// user/assistant messages from its transcript and starting a new + /// conversation with identical configuration (unless overridden by the + /// caller's `config`). The new conversation will have a fresh id. + pub async fn fork_conversation( + &self, + conversation_history: Vec, + num_messages_to_drop: usize, + config: Config, + ) -> CodexResult { + // Compute the prefix up to the cut point. + let truncated_history = + truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop); + + // Spawn a new conversation with the computed initial history. + let auth_manager = self.auth_manager.clone(); + let CodexSpawnOk { + codex, + session_id: conversation_id, + } = Codex::spawn(config, auth_manager, Some(truncated_history)).await?; + + self.finalize_spawn(codex, conversation_id).await + } +} + +/// Return a prefix of `items` obtained by dropping the last `n` user messages +/// and all items that follow them. +fn truncate_after_dropping_last_messages(items: Vec, n: usize) -> Vec { + if n == 0 || items.is_empty() { + return items; + } + + // Walk backwards counting only `user` Message items, find cut index. + let mut count = 0usize; + let mut cut_index = 0usize; + for (idx, item) in items.iter().enumerate().rev() { + if let ResponseItem::Message { role, .. } = item + && role == "user" + { + count += 1; + if count == n { + // Cut everything from this user message to the end. + cut_index = idx; + break; + } + } + } + if count < n { + // If fewer than n messages exist, drop everything. + Vec::new() + } else { + items.into_iter().take(cut_index).collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_protocol::models::ContentItem; + use codex_protocol::models::ReasoningItemReasoningSummary; + use codex_protocol::models::ResponseItem; + + fn user_msg(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::OutputText { + text: text.to_string(), + }], + } + } + fn assistant_msg(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: text.to_string(), + }], + } + } + + #[test] + fn drops_from_last_user_only() { + let items = vec![ + user_msg("u1"), + assistant_msg("a1"), + assistant_msg("a2"), + user_msg("u2"), + assistant_msg("a3"), + ResponseItem::Reasoning { + id: "r1".to_string(), + summary: vec![ReasoningItemReasoningSummary::SummaryText { + text: "s".to_string(), + }], + content: None, + encrypted_content: None, + }, + ResponseItem::FunctionCall { + id: None, + name: "tool".to_string(), + arguments: "{}".to_string(), + call_id: "c1".to_string(), + }, + assistant_msg("a4"), + ]; + + let truncated = truncate_after_dropping_last_messages(items.clone(), 1); + assert_eq!( + truncated, + vec![items[0].clone(), items[1].clone(), items[2].clone()] + ); + + let truncated2 = truncate_after_dropping_last_messages(items, 2); + assert!(truncated2.is_empty()); + } } diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 1a42fd0c..0f3b56b4 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -540,6 +540,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { } }, EventMsg::ShutdownComplete => return CodexStatus::Shutdown, + EventMsg::ConversationHistory(_) => {} } CodexStatus::Running } diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 36845d89..c6d65bc8 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -275,6 +275,7 @@ async fn run_codex_tool_session_inner( | EventMsg::GetHistoryEntryResponse(_) | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) + | EventMsg::ConversationHistory(_) | EventMsg::ShutdownComplete => { // For now, we do not do anything extra for these // events. Note that diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index f337f94f..e803324a 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -22,6 +22,7 @@ use uuid::Uuid; use crate::config_types::ReasoningEffort as ReasoningEffortConfig; use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::message_history::HistoryEntry; +use crate::models::ResponseItem; use crate::parse_command::ParsedCommand; use crate::plan_tool::UpdatePlanArgs; @@ -137,6 +138,10 @@ pub enum Op { /// Request a single history entry identified by `log_id` + `offset`. GetHistoryEntryRequest { offset: usize, log_id: u64 }, + /// Request the full in-memory conversation transcript for the current session. + /// Reply is delivered via `EventMsg::ConversationHistory`. + GetHistory, + /// Request the list of MCP tools available across all configured servers. /// Reply is delivered via `EventMsg::McpListToolsResponse`. ListMcpTools, @@ -471,6 +476,8 @@ pub enum EventMsg { /// Notification that the agent is shutting down. ShutdownComplete, + + ConversationHistory(ConversationHistoryResponseEvent), } // Individual event payload types matching each `EventMsg` variant. @@ -651,6 +658,14 @@ impl McpToolCallEndEvent { } } +/// Response payload for `Op::GetHistory` containing the current session's +/// in-memory transcript. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ConversationHistoryResponseEvent { + pub conversation_id: Uuid, + pub entries: Vec, +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ExecCommandBeginEvent { /// Identifier so this can be paired with the ExecCommandEnd event. diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index e3097c91..3f24a017 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -839,6 +839,7 @@ impl ChatWidget { self.on_background_event(message) } EventMsg::StreamError(StreamErrorEvent { message }) => self.on_stream_error(message), + EventMsg::ConversationHistory(_) => {} } // Coalesce redraws: issue at most one after handling the event if self.needs_redraw {