From 162e1235a8e2db9d99b59eb0f3ef0b9da7ffe2e7 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 10 Sep 2025 17:42:54 -0700 Subject: [PATCH] Change forking to read the rollout from file (#3440) This PR changes get history op to get path. Then, forking will use a path. This will help us have one unified codepath for resuming/forking conversations. Will also help in having rollout history in order. It also fixes a bug where you won't see the UI when resuming after forking. --- codex-rs/core/src/codex.rs | 25 ++- codex-rs/core/src/conversation_manager.rs | 63 ++++---- codex-rs/core/src/rollout/policy.rs | 2 +- codex-rs/core/src/rollout/recorder.rs | 41 ++++- .../core/tests/suite/fork_conversation.rs | 143 +++++++++++------- .../src/event_processor_with_human_output.rs | 2 +- codex-rs/mcp-server/src/codex_tool_runner.rs | 2 +- codex-rs/protocol/src/protocol.rs | 8 +- codex-rs/tui/src/app_backtrack.rs | 18 +-- codex-rs/tui/src/app_event.rs | 4 +- codex-rs/tui/src/chatwidget.rs | 2 +- 11 files changed, 203 insertions(+), 107 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 356eadd8..46d155be 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -19,7 +19,7 @@ use codex_apply_patch::ApplyPatchAction; use codex_apply_patch::MaybeApplyPatchVerified; use codex_apply_patch::maybe_parse_apply_patch_verified; use codex_protocol::mcp_protocol::ConversationId; -use codex_protocol::protocol::ConversationHistoryResponseEvent; +use codex_protocol::protocol::ConversationPathResponseEvent; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::TaskStartedEvent; use codex_protocol::protocol::TurnAbortReason; @@ -1405,14 +1405,29 @@ async fn submission_loop( sess.send_event(event).await; break; } - Op::GetHistory => { + Op::GetPath => { let sub_id = sub.id.clone(); - + // Flush rollout writes before returning the path so readers observe a consistent file. + let (path, rec_opt) = { + let guard = sess.rollout.lock_unchecked(); + match guard.as_ref() { + Some(rec) => (rec.get_rollout_path(), Some(rec.clone())), + None => { + error!("rollout recorder not found"); + continue; + } + } + }; + if let Some(rec) = rec_opt + && let Err(e) = rec.flush().await + { + warn!("failed to flush rollout recorder before GetHistory: {e}"); + } let event = Event { id: sub_id.clone(), - msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent { + msg: EventMsg::ConversationPath(ConversationPathResponseEvent { conversation_id: sess.conversation_id, - entries: sess.state.lock_unchecked().history.contents(), + path, }), }; sess.send_event(event).await; diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index c0695f16..a5f66d45 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -150,13 +150,13 @@ impl ConversationManager { /// caller's `config`). The new conversation will have a fresh id. pub async fn fork_conversation( &self, - conversation_history: Vec, num_messages_to_drop: usize, config: Config, + path: PathBuf, ) -> CodexResult { // Compute the prefix up to the cut point. - let history = - truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop); + let history = RolloutRecorder::get_rollout_history(&path).await?; + let history = truncate_after_dropping_last_messages(history, num_messages_to_drop); // Spawn a new conversation with the computed initial history. let auth_manager = self.auth_manager.clone(); @@ -171,36 +171,36 @@ impl ConversationManager { /// Return a prefix of `items` obtained by dropping the last `n` user messages /// and all items that follow them. -fn truncate_after_dropping_last_messages(items: Vec, n: usize) -> InitialHistory { +fn truncate_after_dropping_last_messages(history: InitialHistory, n: usize) -> InitialHistory { if n == 0 { - let rolled: Vec = items.into_iter().map(RolloutItem::ResponseItem).collect(); - return InitialHistory::Forked(rolled); + return InitialHistory::Forked(history.get_rollout_items()); } - // Walk backwards counting only `user` Message items, find cut index. - let mut count = 0usize; - let mut cut_index = 0usize; - for (idx, item) in items.iter().enumerate().rev() { - if let ResponseItem::Message { role, .. } = item + // Work directly on rollout items, and cut the vector at the nth-from-last user message input. + let items: Vec = history.get_rollout_items(); + + // Find indices of user message inputs in rollout order. + let mut user_positions: Vec = Vec::new(); + for (idx, item) in items.iter().enumerate() { + if let RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) = item && role == "user" { - count += 1; - if count == n { - // Cut everything from this user message to the end. - cut_index = idx; - break; - } + user_positions.push(idx); } } - if cut_index == 0 { - // No prefix remains after dropping; start a new conversation. + + // If fewer than n user messages exist, treat as empty. + if user_positions.len() < n { + return InitialHistory::New; + } + + // Cut strictly before the nth-from-last user message (do not keep the nth itself). + let cut_idx = user_positions[user_positions.len() - n]; + let rolled: Vec = items.into_iter().take(cut_idx).collect(); + + if rolled.is_empty() { InitialHistory::New } else { - let rolled: Vec = items - .into_iter() - .take(cut_index) - .map(RolloutItem::ResponseItem) - .collect(); InitialHistory::Forked(rolled) } } @@ -256,7 +256,13 @@ mod tests { assistant_msg("a4"), ]; - let truncated = truncate_after_dropping_last_messages(items.clone(), 1); + // Wrap as InitialHistory::Forked with response items only. + let initial: Vec = items + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); + let truncated = truncate_after_dropping_last_messages(InitialHistory::Forked(initial), 1); let got_items = truncated.get_rollout_items(); let expected_items = vec![ RolloutItem::ResponseItem(items[0].clone()), @@ -268,7 +274,12 @@ mod tests { serde_json::to_value(&expected_items).unwrap() ); - let truncated2 = truncate_after_dropping_last_messages(items, 2); + let initial2: Vec = items + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); + let truncated2 = truncate_after_dropping_last_messages(InitialHistory::Forked(initial2), 2); assert!(matches!(truncated2, InitialHistory::New)); } } diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 7c582604..c0d6d3af 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -65,6 +65,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) | EventMsg::ShutdownComplete - | EventMsg::ConversationHistory(_) => false, + | EventMsg::ConversationPath(_) => false, } } diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index f5609f72..63495bd8 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -77,7 +77,13 @@ pub enum RolloutRecorderParams { enum RolloutCmd { AddItems(Vec), - Shutdown { ack: oneshot::Sender<()> }, + /// Ensure all prior writes are processed; respond when flushed. + Flush { + ack: oneshot::Sender<()>, + }, + Shutdown { + ack: oneshot::Sender<()>, + }, } impl RolloutRecorderParams { @@ -185,6 +191,17 @@ impl RolloutRecorder { .map_err(|e| IoError::other(format!("failed to queue rollout items: {e}"))) } + /// Flush all queued writes and wait until they are committed by the writer task. + pub async fn flush(&self) -> std::io::Result<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(RolloutCmd::Flush { ack: tx }) + .await + .map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?; + rx.await + .map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}"))) + } + pub(crate) async fn get_rollout_history(path: &Path) -> std::io::Result { info!("Resuming rollout from {path:?}"); tracing::error!("Resuming rollout from {path:?}"); @@ -211,11 +228,11 @@ impl RolloutRecorder { match serde_json::from_value::(v.clone()) { Ok(rollout_line) => match rollout_line.item { RolloutItem::SessionMeta(session_meta_line) => { - tracing::error!( - "Parsed conversation ID from rollout file: {:?}", - session_meta_line.meta.id - ); - conversation_id = Some(session_meta_line.meta.id); + // Use the FIRST SessionMeta encountered in the file as the canonical + // conversation id and main session information. Keep all items intact. + if conversation_id.is_none() { + conversation_id = Some(session_meta_line.meta.id); + } items.push(RolloutItem::SessionMeta(session_meta_line)); } RolloutItem::ResponseItem(item) => { @@ -251,6 +268,10 @@ impl RolloutRecorder { })) } + pub(crate) fn get_rollout_path(&self) -> PathBuf { + self.rollout_path.clone() + } + pub async fn shutdown(&self) -> std::io::Result<()> { let (tx_done, rx_done) = oneshot::channel(); match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await { @@ -351,6 +372,14 @@ async fn rollout_writer( } } } + RolloutCmd::Flush { ack } => { + // Ensure underlying file is flushed and then ack. + if let Err(e) = writer.file.flush().await { + let _ = ack.send(()); + return Err(e); + } + let _ = ack.send(()); + } RolloutCmd::Shutdown { ack } => { let _ = ack.send(()); } diff --git a/codex-rs/core/tests/suite/fork_conversation.rs b/codex-rs/core/tests/suite/fork_conversation.rs index 92e43dbf..08e3f29d 100644 --- a/codex-rs/core/tests/suite/fork_conversation.rs +++ b/codex-rs/core/tests/suite/fork_conversation.rs @@ -1,12 +1,16 @@ use codex_core::CodexAuth; +use codex_core::ContentItem; use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::NewConversation; +use codex_core::ResponseItem; use codex_core::built_in_model_providers; -use codex_core::protocol::ConversationHistoryResponseEvent; +use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_core::protocol::RolloutItem; +use codex_core::protocol::RolloutLine; use core_test_support::load_default_config_for_test; use core_test_support::wait_for_event; use tempfile::TempDir; @@ -71,84 +75,121 @@ async fn fork_conversation_twice_drops_to_first_message() { let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; } - // Request history from the base conversation. - codex.submit(Op::GetHistory).await.unwrap(); + // Request history from the base conversation to obtain rollout path. + codex.submit(Op::GetPath).await.unwrap(); let base_history = - wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await; - - // Capture entries from the base history and compute expected prefixes after each fork. - let entries_after_three = match &base_history { - EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => { - entries.clone() - } + wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await; + let base_path = match &base_history { + EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(), _ => panic!("expected ConversationHistory event"), }; - // History layout for this test: - // [0] user instructions, - // [1] environment context, - // [2] "first" user message, - // [3] "second" user message, - // [4] "third" user message. - // Fork 1: drops the last user message and everything after. - let expected_after_first = vec![ - entries_after_three[0].clone(), - entries_after_three[1].clone(), - entries_after_three[2].clone(), - entries_after_three[3].clone(), - ]; + // GetHistory flushes before returning the path; no wait needed. - // Fork 2: drops the last user message and everything after. - // [0] user instructions, - // [1] environment context, - // [2] "first" user message, - let expected_after_second = vec![ - entries_after_three[0].clone(), - entries_after_three[1].clone(), - entries_after_three[2].clone(), - ]; + // Helper: read rollout items (excluding SessionMeta) from a JSONL path. + let read_items = |p: &std::path::Path| -> Vec { + let text = std::fs::read_to_string(p).expect("read rollout file"); + let mut items: Vec = Vec::new(); + for line in text.lines() { + if line.trim().is_empty() { + continue; + } + let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line"); + let rl: RolloutLine = serde_json::from_value(v).expect("rollout line"); + match rl.item { + RolloutItem::SessionMeta(_) => {} + other => items.push(other), + } + } + items + }; - // Fork once with n=1 → drops the last user message and everything after. + // Compute expected prefixes after each fork by truncating base rollout at nth-from-last user input. + let base_items = read_items(&base_path); + let find_user_input_positions = |items: &[RolloutItem]| -> Vec { + let mut pos = Vec::new(); + for (i, it) in items.iter().enumerate() { + if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = it + && role == "user" + { + // Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem. + // We specifically look for input items, which are represented as ContentItem::InputText. + if content + .iter() + .any(|c| matches!(c, ContentItem::InputText { .. })) + { + pos.push(i); + } + } + } + pos + }; + let user_inputs = find_user_input_positions(&base_items); + + // After dropping last user input (n=1), cut strictly before that input if present, else empty. + let cut1 = user_inputs + .get(user_inputs.len().saturating_sub(1)) + .copied() + .unwrap_or(0); + let expected_after_first: Vec = base_items[..cut1].to_vec(); + + // After dropping again (n=1 on fork1), compute expected relative to fork1's rollout. + + // Fork once with n=1 → drops the last user input and everything after. let NewConversation { conversation: codex_fork1, .. } = conversation_manager - .fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone()) + .fork_conversation(1, config_for_fork.clone(), base_path.clone()) .await .expect("fork 1"); - codex_fork1.submit(Op::GetHistory).await.unwrap(); + codex_fork1.submit(Op::GetPath).await.unwrap(); let fork1_history = wait_for_event(&codex_fork1, |ev| { - matches!(ev, EventMsg::ConversationHistory(_)) + matches!(ev, EventMsg::ConversationPath(_)) }) .await; - let entries_after_first_fork = match &fork1_history { - EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => { - assert!(matches!( - fork1_history, - EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first - )); - entries.clone() - } + let fork1_path = match &fork1_history { + EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(), _ => panic!("expected ConversationHistory event after first fork"), }; + // GetHistory on fork1 flushed; the file is ready. + let fork1_items = read_items(&fork1_path); + pretty_assertions::assert_eq!( + serde_json::to_value(&fork1_items).unwrap(), + serde_json::to_value(&expected_after_first).unwrap() + ); + // Fork again with n=1 → drops the (new) last user message, leaving only the first. let NewConversation { conversation: codex_fork2, .. } = conversation_manager - .fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone()) + .fork_conversation(1, config_for_fork.clone(), fork1_path.clone()) .await .expect("fork 2"); - codex_fork2.submit(Op::GetHistory).await.unwrap(); + codex_fork2.submit(Op::GetPath).await.unwrap(); let fork2_history = wait_for_event(&codex_fork2, |ev| { - matches!(ev, EventMsg::ConversationHistory(_)) + matches!(ev, EventMsg::ConversationPath(_)) }) .await; - assert!(matches!( - fork2_history, - EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second - )); + let fork2_path = match &fork2_history { + EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(), + _ => panic!("expected ConversationHistory event after second fork"), + }; + // GetHistory on fork2 flushed; the file is ready. + let fork1_items = read_items(&fork1_path); + let fork1_user_inputs = find_user_input_positions(&fork1_items); + let cut_last_on_fork1 = fork1_user_inputs + .get(fork1_user_inputs.len().saturating_sub(1)) + .copied() + .unwrap_or(0); + let expected_after_second: Vec = fork1_items[..cut_last_on_fork1].to_vec(); + let fork2_items = read_items(&fork2_path); + pretty_assertions::assert_eq!( + serde_json::to_value(&fork2_items).unwrap(), + serde_json::to_value(&expected_after_second).unwrap() + ); } diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index ae470897..3c879397 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -559,7 +559,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { } }, EventMsg::ShutdownComplete => return CodexStatus::Shutdown, - EventMsg::ConversationHistory(_) => {} + EventMsg::ConversationPath(_) => {} EventMsg::UserMessage(_) => {} } CodexStatus::Running diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index feeb7f01..d3351faa 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -277,7 +277,7 @@ async fn run_codex_tool_session_inner( | EventMsg::GetHistoryEntryResponse(_) | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) - | EventMsg::ConversationHistory(_) + | EventMsg::ConversationPath(_) | EventMsg::UserMessage(_) | EventMsg::ShutdownComplete => { // For now, we do not do anything extra for these diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 7e358f12..be312285 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -149,7 +149,7 @@ pub enum Op { /// Request the full in-memory conversation transcript for the current session. /// Reply is delivered via `EventMsg::ConversationHistory`. - GetHistory, + GetPath, /// Request the list of MCP tools available across all configured servers. /// Reply is delivered via `EventMsg::McpListToolsResponse`. @@ -499,7 +499,7 @@ pub enum EventMsg { /// Notification that the agent is shutting down. ShutdownComplete, - ConversationHistory(ConversationHistoryResponseEvent), + ConversationPath(ConversationPathResponseEvent), } // Individual event payload types matching each `EventMsg` variant. @@ -801,9 +801,9 @@ pub struct WebSearchEndEvent { /// Response payload for `Op::GetHistory` containing the current session's /// in-memory transcript. #[derive(Debug, Clone, Deserialize, Serialize, TS)] -pub struct ConversationHistoryResponseEvent { +pub struct ConversationPathResponseEvent { pub conversation_id: ConversationId, - pub entries: Vec, + pub path: PathBuf, } #[derive(Debug, Clone, Deserialize, Serialize, TS)] diff --git a/codex-rs/tui/src/app_backtrack.rs b/codex-rs/tui/src/app_backtrack.rs index 039c9403..4445c939 100644 --- a/codex-rs/tui/src/app_backtrack.rs +++ b/codex-rs/tui/src/app_backtrack.rs @@ -1,9 +1,11 @@ +use std::path::PathBuf; + use crate::app::App; use crate::backtrack_helpers; use crate::pager_overlay::Overlay; use crate::tui; use crate::tui::TuiEvent; -use codex_core::protocol::ConversationHistoryResponseEvent; +use codex_core::protocol::ConversationPathResponseEvent; use codex_protocol::mcp_protocol::ConversationId; use color_eyre::eyre::Result; use crossterm::event::KeyCode; @@ -98,7 +100,7 @@ impl App { ) { self.backtrack.pending = Some((base_id, drop_last_messages, prefill)); self.app_event_tx.send(crate::app_event::AppEvent::CodexOp( - codex_core::protocol::Op::GetHistory, + codex_core::protocol::Op::GetPath, )); } @@ -265,7 +267,7 @@ impl App { pub(crate) async fn on_conversation_history_for_backtrack( &mut self, tui: &mut tui::Tui, - ev: ConversationHistoryResponseEvent, + ev: ConversationPathResponseEvent, ) -> Result<()> { if let Some((base_id, _, _)) = self.backtrack.pending.as_ref() && ev.conversation_id == *base_id @@ -281,14 +283,14 @@ impl App { async fn fork_and_switch_to_new_conversation( &mut self, tui: &mut tui::Tui, - ev: ConversationHistoryResponseEvent, + ev: ConversationPathResponseEvent, drop_count: usize, prefill: String, ) { let cfg = self.chat_widget.config_ref().clone(); // Perform the fork via a thin wrapper for clarity/testability. let result = self - .perform_fork(ev.entries.clone(), drop_count, cfg.clone()) + .perform_fork(ev.path.clone(), drop_count, cfg.clone()) .await; match result { Ok(new_conv) => { @@ -301,13 +303,11 @@ impl App { /// Thin wrapper around ConversationManager::fork_conversation. async fn perform_fork( &self, - entries: Vec, + path: PathBuf, drop_count: usize, cfg: codex_core::config::Config, ) -> codex_core::error::Result { - self.server - .fork_conversation(entries, drop_count, cfg) - .await + self.server.fork_conversation(drop_count, cfg, path).await } /// Install a forked conversation into the ChatWidget and update UI to reflect selection. diff --git a/codex-rs/tui/src/app_event.rs b/codex-rs/tui/src/app_event.rs index 1896f0a4..cbd6f3d8 100644 --- a/codex-rs/tui/src/app_event.rs +++ b/codex-rs/tui/src/app_event.rs @@ -1,4 +1,4 @@ -use codex_core::protocol::ConversationHistoryResponseEvent; +use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::Event; use codex_file_search::FileMatch; @@ -58,5 +58,5 @@ pub(crate) enum AppEvent { UpdateSandboxPolicy(SandboxPolicy), /// Forwarded conversation history snapshot from the current conversation. - ConversationHistory(ConversationHistoryResponseEvent), + ConversationHistory(ConversationPathResponseEvent), } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index dca821f2..1ccfddea 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1083,7 +1083,7 @@ impl ChatWidget { self.on_user_message_event(ev); } } - EventMsg::ConversationHistory(ev) => { + EventMsg::ConversationPath(ev) => { self.app_event_tx .send(crate::app_event::AppEvent::ConversationHistory(ev)); }