Change forking to read the rollout from file (#3440)
This PR changes get history op to get path. Then, forking will use a path. This will help us have one unified codepath for resuming/forking conversations. Will also help in having rollout history in order. It also fixes a bug where you won't see the UI when resuming after forking.
This commit is contained in:
@@ -19,7 +19,7 @@ use codex_apply_patch::ApplyPatchAction;
|
|||||||
use codex_apply_patch::MaybeApplyPatchVerified;
|
use codex_apply_patch::MaybeApplyPatchVerified;
|
||||||
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
||||||
use codex_protocol::mcp_protocol::ConversationId;
|
use codex_protocol::mcp_protocol::ConversationId;
|
||||||
use codex_protocol::protocol::ConversationHistoryResponseEvent;
|
use codex_protocol::protocol::ConversationPathResponseEvent;
|
||||||
use codex_protocol::protocol::RolloutItem;
|
use codex_protocol::protocol::RolloutItem;
|
||||||
use codex_protocol::protocol::TaskStartedEvent;
|
use codex_protocol::protocol::TaskStartedEvent;
|
||||||
use codex_protocol::protocol::TurnAbortReason;
|
use codex_protocol::protocol::TurnAbortReason;
|
||||||
@@ -1405,14 +1405,29 @@ async fn submission_loop(
|
|||||||
sess.send_event(event).await;
|
sess.send_event(event).await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Op::GetHistory => {
|
Op::GetPath => {
|
||||||
let sub_id = sub.id.clone();
|
let sub_id = sub.id.clone();
|
||||||
|
// Flush rollout writes before returning the path so readers observe a consistent file.
|
||||||
|
let (path, rec_opt) = {
|
||||||
|
let guard = sess.rollout.lock_unchecked();
|
||||||
|
match guard.as_ref() {
|
||||||
|
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
|
||||||
|
None => {
|
||||||
|
error!("rollout recorder not found");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Some(rec) = rec_opt
|
||||||
|
&& let Err(e) = rec.flush().await
|
||||||
|
{
|
||||||
|
warn!("failed to flush rollout recorder before GetHistory: {e}");
|
||||||
|
}
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: sub_id.clone(),
|
id: sub_id.clone(),
|
||||||
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
|
msg: EventMsg::ConversationPath(ConversationPathResponseEvent {
|
||||||
conversation_id: sess.conversation_id,
|
conversation_id: sess.conversation_id,
|
||||||
entries: sess.state.lock_unchecked().history.contents(),
|
path,
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
sess.send_event(event).await;
|
sess.send_event(event).await;
|
||||||
|
|||||||
@@ -150,13 +150,13 @@ impl ConversationManager {
|
|||||||
/// caller's `config`). The new conversation will have a fresh id.
|
/// caller's `config`). The new conversation will have a fresh id.
|
||||||
pub async fn fork_conversation(
|
pub async fn fork_conversation(
|
||||||
&self,
|
&self,
|
||||||
conversation_history: Vec<ResponseItem>,
|
|
||||||
num_messages_to_drop: usize,
|
num_messages_to_drop: usize,
|
||||||
config: Config,
|
config: Config,
|
||||||
|
path: PathBuf,
|
||||||
) -> CodexResult<NewConversation> {
|
) -> CodexResult<NewConversation> {
|
||||||
// Compute the prefix up to the cut point.
|
// Compute the prefix up to the cut point.
|
||||||
let history =
|
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||||
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
|
let history = truncate_after_dropping_last_messages(history, num_messages_to_drop);
|
||||||
|
|
||||||
// Spawn a new conversation with the computed initial history.
|
// Spawn a new conversation with the computed initial history.
|
||||||
let auth_manager = self.auth_manager.clone();
|
let auth_manager = self.auth_manager.clone();
|
||||||
@@ -171,36 +171,36 @@ impl ConversationManager {
|
|||||||
|
|
||||||
/// Return a prefix of `items` obtained by dropping the last `n` user messages
|
/// Return a prefix of `items` obtained by dropping the last `n` user messages
|
||||||
/// and all items that follow them.
|
/// and all items that follow them.
|
||||||
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
|
fn truncate_after_dropping_last_messages(history: InitialHistory, n: usize) -> InitialHistory {
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
let rolled: Vec<RolloutItem> = items.into_iter().map(RolloutItem::ResponseItem).collect();
|
return InitialHistory::Forked(history.get_rollout_items());
|
||||||
return InitialHistory::Forked(rolled);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk backwards counting only `user` Message items, find cut index.
|
// Work directly on rollout items, and cut the vector at the nth-from-last user message input.
|
||||||
let mut count = 0usize;
|
let items: Vec<RolloutItem> = history.get_rollout_items();
|
||||||
let mut cut_index = 0usize;
|
|
||||||
for (idx, item) in items.iter().enumerate().rev() {
|
// Find indices of user message inputs in rollout order.
|
||||||
if let ResponseItem::Message { role, .. } = item
|
let mut user_positions: Vec<usize> = Vec::new();
|
||||||
|
for (idx, item) in items.iter().enumerate() {
|
||||||
|
if let RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) = item
|
||||||
&& role == "user"
|
&& role == "user"
|
||||||
{
|
{
|
||||||
count += 1;
|
user_positions.push(idx);
|
||||||
if count == n {
|
|
||||||
// Cut everything from this user message to the end.
|
|
||||||
cut_index = idx;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cut_index == 0 {
|
|
||||||
// No prefix remains after dropping; start a new conversation.
|
// If fewer than n user messages exist, treat as empty.
|
||||||
|
if user_positions.len() < n {
|
||||||
|
return InitialHistory::New;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cut strictly before the nth-from-last user message (do not keep the nth itself).
|
||||||
|
let cut_idx = user_positions[user_positions.len() - n];
|
||||||
|
let rolled: Vec<RolloutItem> = items.into_iter().take(cut_idx).collect();
|
||||||
|
|
||||||
|
if rolled.is_empty() {
|
||||||
InitialHistory::New
|
InitialHistory::New
|
||||||
} else {
|
} else {
|
||||||
let rolled: Vec<RolloutItem> = items
|
|
||||||
.into_iter()
|
|
||||||
.take(cut_index)
|
|
||||||
.map(RolloutItem::ResponseItem)
|
|
||||||
.collect();
|
|
||||||
InitialHistory::Forked(rolled)
|
InitialHistory::Forked(rolled)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -256,7 +256,13 @@ mod tests {
|
|||||||
assistant_msg("a4"),
|
assistant_msg("a4"),
|
||||||
];
|
];
|
||||||
|
|
||||||
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
|
// Wrap as InitialHistory::Forked with response items only.
|
||||||
|
let initial: Vec<RolloutItem> = items
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::ResponseItem)
|
||||||
|
.collect();
|
||||||
|
let truncated = truncate_after_dropping_last_messages(InitialHistory::Forked(initial), 1);
|
||||||
let got_items = truncated.get_rollout_items();
|
let got_items = truncated.get_rollout_items();
|
||||||
let expected_items = vec![
|
let expected_items = vec![
|
||||||
RolloutItem::ResponseItem(items[0].clone()),
|
RolloutItem::ResponseItem(items[0].clone()),
|
||||||
@@ -268,7 +274,12 @@ mod tests {
|
|||||||
serde_json::to_value(&expected_items).unwrap()
|
serde_json::to_value(&expected_items).unwrap()
|
||||||
);
|
);
|
||||||
|
|
||||||
let truncated2 = truncate_after_dropping_last_messages(items, 2);
|
let initial2: Vec<RolloutItem> = items
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::ResponseItem)
|
||||||
|
.collect();
|
||||||
|
let truncated2 = truncate_after_dropping_last_messages(InitialHistory::Forked(initial2), 2);
|
||||||
assert!(matches!(truncated2, InitialHistory::New));
|
assert!(matches!(truncated2, InitialHistory::New));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,6 +65,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
|
|||||||
| EventMsg::PlanUpdate(_)
|
| EventMsg::PlanUpdate(_)
|
||||||
| EventMsg::TurnAborted(_)
|
| EventMsg::TurnAborted(_)
|
||||||
| EventMsg::ShutdownComplete
|
| EventMsg::ShutdownComplete
|
||||||
| EventMsg::ConversationHistory(_) => false,
|
| EventMsg::ConversationPath(_) => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,7 +77,13 @@ pub enum RolloutRecorderParams {
|
|||||||
|
|
||||||
enum RolloutCmd {
|
enum RolloutCmd {
|
||||||
AddItems(Vec<RolloutItem>),
|
AddItems(Vec<RolloutItem>),
|
||||||
Shutdown { ack: oneshot::Sender<()> },
|
/// Ensure all prior writes are processed; respond when flushed.
|
||||||
|
Flush {
|
||||||
|
ack: oneshot::Sender<()>,
|
||||||
|
},
|
||||||
|
Shutdown {
|
||||||
|
ack: oneshot::Sender<()>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RolloutRecorderParams {
|
impl RolloutRecorderParams {
|
||||||
@@ -185,6 +191,17 @@ impl RolloutRecorder {
|
|||||||
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
|
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Flush all queued writes and wait until they are committed by the writer task.
|
||||||
|
pub async fn flush(&self) -> std::io::Result<()> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.tx
|
||||||
|
.send(RolloutCmd::Flush { ack: tx })
|
||||||
|
.await
|
||||||
|
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
|
||||||
|
rx.await
|
||||||
|
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
pub(crate) async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||||||
info!("Resuming rollout from {path:?}");
|
info!("Resuming rollout from {path:?}");
|
||||||
tracing::error!("Resuming rollout from {path:?}");
|
tracing::error!("Resuming rollout from {path:?}");
|
||||||
@@ -211,11 +228,11 @@ impl RolloutRecorder {
|
|||||||
match serde_json::from_value::<RolloutLine>(v.clone()) {
|
match serde_json::from_value::<RolloutLine>(v.clone()) {
|
||||||
Ok(rollout_line) => match rollout_line.item {
|
Ok(rollout_line) => match rollout_line.item {
|
||||||
RolloutItem::SessionMeta(session_meta_line) => {
|
RolloutItem::SessionMeta(session_meta_line) => {
|
||||||
tracing::error!(
|
// Use the FIRST SessionMeta encountered in the file as the canonical
|
||||||
"Parsed conversation ID from rollout file: {:?}",
|
// conversation id and main session information. Keep all items intact.
|
||||||
session_meta_line.meta.id
|
if conversation_id.is_none() {
|
||||||
);
|
conversation_id = Some(session_meta_line.meta.id);
|
||||||
conversation_id = Some(session_meta_line.meta.id);
|
}
|
||||||
items.push(RolloutItem::SessionMeta(session_meta_line));
|
items.push(RolloutItem::SessionMeta(session_meta_line));
|
||||||
}
|
}
|
||||||
RolloutItem::ResponseItem(item) => {
|
RolloutItem::ResponseItem(item) => {
|
||||||
@@ -251,6 +268,10 @@ impl RolloutRecorder {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_rollout_path(&self) -> PathBuf {
|
||||||
|
self.rollout_path.clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn shutdown(&self) -> std::io::Result<()> {
|
pub async fn shutdown(&self) -> std::io::Result<()> {
|
||||||
let (tx_done, rx_done) = oneshot::channel();
|
let (tx_done, rx_done) = oneshot::channel();
|
||||||
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
|
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
|
||||||
@@ -351,6 +372,14 @@ async fn rollout_writer(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
RolloutCmd::Flush { ack } => {
|
||||||
|
// Ensure underlying file is flushed and then ack.
|
||||||
|
if let Err(e) = writer.file.flush().await {
|
||||||
|
let _ = ack.send(());
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
let _ = ack.send(());
|
||||||
|
}
|
||||||
RolloutCmd::Shutdown { ack } => {
|
RolloutCmd::Shutdown { ack } => {
|
||||||
let _ = ack.send(());
|
let _ = ack.send(());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,12 +1,16 @@
|
|||||||
use codex_core::CodexAuth;
|
use codex_core::CodexAuth;
|
||||||
|
use codex_core::ContentItem;
|
||||||
use codex_core::ConversationManager;
|
use codex_core::ConversationManager;
|
||||||
use codex_core::ModelProviderInfo;
|
use codex_core::ModelProviderInfo;
|
||||||
use codex_core::NewConversation;
|
use codex_core::NewConversation;
|
||||||
|
use codex_core::ResponseItem;
|
||||||
use codex_core::built_in_model_providers;
|
use codex_core::built_in_model_providers;
|
||||||
use codex_core::protocol::ConversationHistoryResponseEvent;
|
use codex_core::protocol::ConversationPathResponseEvent;
|
||||||
use codex_core::protocol::EventMsg;
|
use codex_core::protocol::EventMsg;
|
||||||
use codex_core::protocol::InputItem;
|
use codex_core::protocol::InputItem;
|
||||||
use codex_core::protocol::Op;
|
use codex_core::protocol::Op;
|
||||||
|
use codex_core::protocol::RolloutItem;
|
||||||
|
use codex_core::protocol::RolloutLine;
|
||||||
use core_test_support::load_default_config_for_test;
|
use core_test_support::load_default_config_for_test;
|
||||||
use core_test_support::wait_for_event;
|
use core_test_support::wait_for_event;
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
@@ -71,84 +75,121 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
|||||||
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request history from the base conversation.
|
// Request history from the base conversation to obtain rollout path.
|
||||||
codex.submit(Op::GetHistory).await.unwrap();
|
codex.submit(Op::GetPath).await.unwrap();
|
||||||
let base_history =
|
let base_history =
|
||||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
|
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
|
||||||
|
let base_path = match &base_history {
|
||||||
// Capture entries from the base history and compute expected prefixes after each fork.
|
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
|
||||||
let entries_after_three = match &base_history {
|
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
|
|
||||||
entries.clone()
|
|
||||||
}
|
|
||||||
_ => panic!("expected ConversationHistory event"),
|
_ => panic!("expected ConversationHistory event"),
|
||||||
};
|
};
|
||||||
// History layout for this test:
|
|
||||||
// [0] user instructions,
|
|
||||||
// [1] environment context,
|
|
||||||
// [2] "first" user message,
|
|
||||||
// [3] "second" user message,
|
|
||||||
// [4] "third" user message.
|
|
||||||
|
|
||||||
// Fork 1: drops the last user message and everything after.
|
// GetHistory flushes before returning the path; no wait needed.
|
||||||
let expected_after_first = vec![
|
|
||||||
entries_after_three[0].clone(),
|
|
||||||
entries_after_three[1].clone(),
|
|
||||||
entries_after_three[2].clone(),
|
|
||||||
entries_after_three[3].clone(),
|
|
||||||
];
|
|
||||||
|
|
||||||
// Fork 2: drops the last user message and everything after.
|
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
|
||||||
// [0] user instructions,
|
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
|
||||||
// [1] environment context,
|
let text = std::fs::read_to_string(p).expect("read rollout file");
|
||||||
// [2] "first" user message,
|
let mut items: Vec<RolloutItem> = Vec::new();
|
||||||
let expected_after_second = vec![
|
for line in text.lines() {
|
||||||
entries_after_three[0].clone(),
|
if line.trim().is_empty() {
|
||||||
entries_after_three[1].clone(),
|
continue;
|
||||||
entries_after_three[2].clone(),
|
}
|
||||||
];
|
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
|
||||||
|
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
|
||||||
|
match rl.item {
|
||||||
|
RolloutItem::SessionMeta(_) => {}
|
||||||
|
other => items.push(other),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
items
|
||||||
|
};
|
||||||
|
|
||||||
// Fork once with n=1 → drops the last user message and everything after.
|
// Compute expected prefixes after each fork by truncating base rollout at nth-from-last user input.
|
||||||
|
let base_items = read_items(&base_path);
|
||||||
|
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
|
||||||
|
let mut pos = Vec::new();
|
||||||
|
for (i, it) in items.iter().enumerate() {
|
||||||
|
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = it
|
||||||
|
&& role == "user"
|
||||||
|
{
|
||||||
|
// Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem.
|
||||||
|
// We specifically look for input items, which are represented as ContentItem::InputText.
|
||||||
|
if content
|
||||||
|
.iter()
|
||||||
|
.any(|c| matches!(c, ContentItem::InputText { .. }))
|
||||||
|
{
|
||||||
|
pos.push(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pos
|
||||||
|
};
|
||||||
|
let user_inputs = find_user_input_positions(&base_items);
|
||||||
|
|
||||||
|
// After dropping last user input (n=1), cut strictly before that input if present, else empty.
|
||||||
|
let cut1 = user_inputs
|
||||||
|
.get(user_inputs.len().saturating_sub(1))
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(0);
|
||||||
|
let expected_after_first: Vec<RolloutItem> = base_items[..cut1].to_vec();
|
||||||
|
|
||||||
|
// After dropping again (n=1 on fork1), compute expected relative to fork1's rollout.
|
||||||
|
|
||||||
|
// Fork once with n=1 → drops the last user input and everything after.
|
||||||
let NewConversation {
|
let NewConversation {
|
||||||
conversation: codex_fork1,
|
conversation: codex_fork1,
|
||||||
..
|
..
|
||||||
} = conversation_manager
|
} = conversation_manager
|
||||||
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
|
.fork_conversation(1, config_for_fork.clone(), base_path.clone())
|
||||||
.await
|
.await
|
||||||
.expect("fork 1");
|
.expect("fork 1");
|
||||||
|
|
||||||
codex_fork1.submit(Op::GetHistory).await.unwrap();
|
codex_fork1.submit(Op::GetPath).await.unwrap();
|
||||||
let fork1_history = wait_for_event(&codex_fork1, |ev| {
|
let fork1_history = wait_for_event(&codex_fork1, |ev| {
|
||||||
matches!(ev, EventMsg::ConversationHistory(_))
|
matches!(ev, EventMsg::ConversationPath(_))
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
let entries_after_first_fork = match &fork1_history {
|
let fork1_path = match &fork1_history {
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
|
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
|
||||||
assert!(matches!(
|
|
||||||
fork1_history,
|
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
|
|
||||||
));
|
|
||||||
entries.clone()
|
|
||||||
}
|
|
||||||
_ => panic!("expected ConversationHistory event after first fork"),
|
_ => panic!("expected ConversationHistory event after first fork"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// GetHistory on fork1 flushed; the file is ready.
|
||||||
|
let fork1_items = read_items(&fork1_path);
|
||||||
|
pretty_assertions::assert_eq!(
|
||||||
|
serde_json::to_value(&fork1_items).unwrap(),
|
||||||
|
serde_json::to_value(&expected_after_first).unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
|
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
|
||||||
let NewConversation {
|
let NewConversation {
|
||||||
conversation: codex_fork2,
|
conversation: codex_fork2,
|
||||||
..
|
..
|
||||||
} = conversation_manager
|
} = conversation_manager
|
||||||
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
|
.fork_conversation(1, config_for_fork.clone(), fork1_path.clone())
|
||||||
.await
|
.await
|
||||||
.expect("fork 2");
|
.expect("fork 2");
|
||||||
|
|
||||||
codex_fork2.submit(Op::GetHistory).await.unwrap();
|
codex_fork2.submit(Op::GetPath).await.unwrap();
|
||||||
let fork2_history = wait_for_event(&codex_fork2, |ev| {
|
let fork2_history = wait_for_event(&codex_fork2, |ev| {
|
||||||
matches!(ev, EventMsg::ConversationHistory(_))
|
matches!(ev, EventMsg::ConversationPath(_))
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert!(matches!(
|
let fork2_path = match &fork2_history {
|
||||||
fork2_history,
|
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
|
_ => panic!("expected ConversationHistory event after second fork"),
|
||||||
));
|
};
|
||||||
|
// GetHistory on fork2 flushed; the file is ready.
|
||||||
|
let fork1_items = read_items(&fork1_path);
|
||||||
|
let fork1_user_inputs = find_user_input_positions(&fork1_items);
|
||||||
|
let cut_last_on_fork1 = fork1_user_inputs
|
||||||
|
.get(fork1_user_inputs.len().saturating_sub(1))
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(0);
|
||||||
|
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
|
||||||
|
let fork2_items = read_items(&fork2_path);
|
||||||
|
pretty_assertions::assert_eq!(
|
||||||
|
serde_json::to_value(&fork2_items).unwrap(),
|
||||||
|
serde_json::to_value(&expected_after_second).unwrap()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -559,7 +559,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
|
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
|
||||||
EventMsg::ConversationHistory(_) => {}
|
EventMsg::ConversationPath(_) => {}
|
||||||
EventMsg::UserMessage(_) => {}
|
EventMsg::UserMessage(_) => {}
|
||||||
}
|
}
|
||||||
CodexStatus::Running
|
CodexStatus::Running
|
||||||
|
|||||||
@@ -277,7 +277,7 @@ async fn run_codex_tool_session_inner(
|
|||||||
| EventMsg::GetHistoryEntryResponse(_)
|
| EventMsg::GetHistoryEntryResponse(_)
|
||||||
| EventMsg::PlanUpdate(_)
|
| EventMsg::PlanUpdate(_)
|
||||||
| EventMsg::TurnAborted(_)
|
| EventMsg::TurnAborted(_)
|
||||||
| EventMsg::ConversationHistory(_)
|
| EventMsg::ConversationPath(_)
|
||||||
| EventMsg::UserMessage(_)
|
| EventMsg::UserMessage(_)
|
||||||
| EventMsg::ShutdownComplete => {
|
| EventMsg::ShutdownComplete => {
|
||||||
// For now, we do not do anything extra for these
|
// For now, we do not do anything extra for these
|
||||||
|
|||||||
@@ -149,7 +149,7 @@ pub enum Op {
|
|||||||
|
|
||||||
/// Request the full in-memory conversation transcript for the current session.
|
/// Request the full in-memory conversation transcript for the current session.
|
||||||
/// Reply is delivered via `EventMsg::ConversationHistory`.
|
/// Reply is delivered via `EventMsg::ConversationHistory`.
|
||||||
GetHistory,
|
GetPath,
|
||||||
|
|
||||||
/// Request the list of MCP tools available across all configured servers.
|
/// Request the list of MCP tools available across all configured servers.
|
||||||
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
|
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
|
||||||
@@ -499,7 +499,7 @@ pub enum EventMsg {
|
|||||||
/// Notification that the agent is shutting down.
|
/// Notification that the agent is shutting down.
|
||||||
ShutdownComplete,
|
ShutdownComplete,
|
||||||
|
|
||||||
ConversationHistory(ConversationHistoryResponseEvent),
|
ConversationPath(ConversationPathResponseEvent),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Individual event payload types matching each `EventMsg` variant.
|
// Individual event payload types matching each `EventMsg` variant.
|
||||||
@@ -801,9 +801,9 @@ pub struct WebSearchEndEvent {
|
|||||||
/// Response payload for `Op::GetHistory` containing the current session's
|
/// Response payload for `Op::GetHistory` containing the current session's
|
||||||
/// in-memory transcript.
|
/// in-memory transcript.
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||||
pub struct ConversationHistoryResponseEvent {
|
pub struct ConversationPathResponseEvent {
|
||||||
pub conversation_id: ConversationId,
|
pub conversation_id: ConversationId,
|
||||||
pub entries: Vec<ResponseItem>,
|
pub path: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use crate::app::App;
|
use crate::app::App;
|
||||||
use crate::backtrack_helpers;
|
use crate::backtrack_helpers;
|
||||||
use crate::pager_overlay::Overlay;
|
use crate::pager_overlay::Overlay;
|
||||||
use crate::tui;
|
use crate::tui;
|
||||||
use crate::tui::TuiEvent;
|
use crate::tui::TuiEvent;
|
||||||
use codex_core::protocol::ConversationHistoryResponseEvent;
|
use codex_core::protocol::ConversationPathResponseEvent;
|
||||||
use codex_protocol::mcp_protocol::ConversationId;
|
use codex_protocol::mcp_protocol::ConversationId;
|
||||||
use color_eyre::eyre::Result;
|
use color_eyre::eyre::Result;
|
||||||
use crossterm::event::KeyCode;
|
use crossterm::event::KeyCode;
|
||||||
@@ -98,7 +100,7 @@ impl App {
|
|||||||
) {
|
) {
|
||||||
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
|
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
|
||||||
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
|
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
|
||||||
codex_core::protocol::Op::GetHistory,
|
codex_core::protocol::Op::GetPath,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -265,7 +267,7 @@ impl App {
|
|||||||
pub(crate) async fn on_conversation_history_for_backtrack(
|
pub(crate) async fn on_conversation_history_for_backtrack(
|
||||||
&mut self,
|
&mut self,
|
||||||
tui: &mut tui::Tui,
|
tui: &mut tui::Tui,
|
||||||
ev: ConversationHistoryResponseEvent,
|
ev: ConversationPathResponseEvent,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
|
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
|
||||||
&& ev.conversation_id == *base_id
|
&& ev.conversation_id == *base_id
|
||||||
@@ -281,14 +283,14 @@ impl App {
|
|||||||
async fn fork_and_switch_to_new_conversation(
|
async fn fork_and_switch_to_new_conversation(
|
||||||
&mut self,
|
&mut self,
|
||||||
tui: &mut tui::Tui,
|
tui: &mut tui::Tui,
|
||||||
ev: ConversationHistoryResponseEvent,
|
ev: ConversationPathResponseEvent,
|
||||||
drop_count: usize,
|
drop_count: usize,
|
||||||
prefill: String,
|
prefill: String,
|
||||||
) {
|
) {
|
||||||
let cfg = self.chat_widget.config_ref().clone();
|
let cfg = self.chat_widget.config_ref().clone();
|
||||||
// Perform the fork via a thin wrapper for clarity/testability.
|
// Perform the fork via a thin wrapper for clarity/testability.
|
||||||
let result = self
|
let result = self
|
||||||
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
|
.perform_fork(ev.path.clone(), drop_count, cfg.clone())
|
||||||
.await;
|
.await;
|
||||||
match result {
|
match result {
|
||||||
Ok(new_conv) => {
|
Ok(new_conv) => {
|
||||||
@@ -301,13 +303,11 @@ impl App {
|
|||||||
/// Thin wrapper around ConversationManager::fork_conversation.
|
/// Thin wrapper around ConversationManager::fork_conversation.
|
||||||
async fn perform_fork(
|
async fn perform_fork(
|
||||||
&self,
|
&self,
|
||||||
entries: Vec<codex_protocol::models::ResponseItem>,
|
path: PathBuf,
|
||||||
drop_count: usize,
|
drop_count: usize,
|
||||||
cfg: codex_core::config::Config,
|
cfg: codex_core::config::Config,
|
||||||
) -> codex_core::error::Result<codex_core::NewConversation> {
|
) -> codex_core::error::Result<codex_core::NewConversation> {
|
||||||
self.server
|
self.server.fork_conversation(drop_count, cfg, path).await
|
||||||
.fork_conversation(entries, drop_count, cfg)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Install a forked conversation into the ChatWidget and update UI to reflect selection.
|
/// Install a forked conversation into the ChatWidget and update UI to reflect selection.
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use codex_core::protocol::ConversationHistoryResponseEvent;
|
use codex_core::protocol::ConversationPathResponseEvent;
|
||||||
use codex_core::protocol::Event;
|
use codex_core::protocol::Event;
|
||||||
use codex_file_search::FileMatch;
|
use codex_file_search::FileMatch;
|
||||||
|
|
||||||
@@ -58,5 +58,5 @@ pub(crate) enum AppEvent {
|
|||||||
UpdateSandboxPolicy(SandboxPolicy),
|
UpdateSandboxPolicy(SandboxPolicy),
|
||||||
|
|
||||||
/// Forwarded conversation history snapshot from the current conversation.
|
/// Forwarded conversation history snapshot from the current conversation.
|
||||||
ConversationHistory(ConversationHistoryResponseEvent),
|
ConversationHistory(ConversationPathResponseEvent),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1083,7 +1083,7 @@ impl ChatWidget {
|
|||||||
self.on_user_message_event(ev);
|
self.on_user_message_event(ev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EventMsg::ConversationHistory(ev) => {
|
EventMsg::ConversationPath(ev) => {
|
||||||
self.app_event_tx
|
self.app_event_tx
|
||||||
.send(crate::app_event::AppEvent::ConversationHistory(ev));
|
.send(crate::app_event::AppEvent::ConversationHistory(ev));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user