diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index cc8768d7..5bd5c504 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -10,6 +10,7 @@ use std::time::Duration; use crate::AuthManager; use crate::event_mapping::map_response_item_to_event_messages; +use crate::rollout::recorder::RolloutItem; use async_channel::Receiver; use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; @@ -203,9 +204,6 @@ impl Codex { error!("Failed to create session: {e:#}"); CodexErr::InternalAgentDied })?; - session - .record_initial_history(&turn_context, conversation_history) - .await; let conversation_id = session.conversation_id; // This task will run until Op::Shutdown is received. @@ -494,13 +492,9 @@ impl Session { // Dispatch the SessionConfiguredEvent first and then report any errors. // If resuming, include converted initial messages in the payload so UIs can render them immediately. - let initial_messages = match &initial_history { - InitialHistory::New => None, - InitialHistory::Forked(items) => Some(sess.build_initial_messages(items)), - InitialHistory::Resumed(resumed_history) => { - Some(sess.build_initial_messages(&resumed_history.history)) - } - }; + let initial_messages = initial_history.get_event_msgs(); + sess.record_initial_history(&turn_context, initial_history) + .await; let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), @@ -515,9 +509,7 @@ impl Session { }) .chain(post_session_configured_error_events.into_iter()); for event in events { - if let Err(e) = tx_event.send(event).await { - error!("failed to send event: {e:?}"); - } + sess.send_event(event).await; } Ok((sess, turn_context)) @@ -547,53 +539,33 @@ impl Session { ) { match conversation_history { InitialHistory::New => { - self.record_initial_history_new(turn_context).await; + // Build and record initial items (user instructions + environment context) + let items = self.build_initial_context(turn_context); + self.record_conversation_items(&items).await; } - InitialHistory::Forked(items) => { - self.record_initial_history_from_items(items).await; - } - InitialHistory::Resumed(resumed_history) => { - self.record_initial_history_from_items(resumed_history.history) - .await; + InitialHistory::Resumed(_) | InitialHistory::Forked(_) => { + let rollout_items = conversation_history.get_rollout_items(); + let persist = matches!(conversation_history, InitialHistory::Forked(_)); + + // Always add response items to conversation history + let response_items = conversation_history.get_response_items(); + if !response_items.is_empty() { + self.record_into_history(&response_items); + } + + // If persisting, persist all rollout items as-is (recorder filters) + if persist && !rollout_items.is_empty() { + self.persist_rollout_items(&rollout_items).await; + } } } } - async fn record_initial_history_new(&self, turn_context: &TurnContext) { - // record the initial user instructions and environment context, - // regardless of whether we restored items. - // TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages. - let mut conversation_items = Vec::::with_capacity(2); - if let Some(user_instructions) = turn_context.user_instructions.as_deref() { - conversation_items.push(UserInstructions::new(user_instructions.to_string()).into()); - } - conversation_items.push(ResponseItem::from(EnvironmentContext::new( - Some(turn_context.cwd.clone()), - Some(turn_context.approval_policy), - Some(turn_context.sandbox_policy.clone()), - Some(self.user_shell.clone()), - ))); - self.record_conversation_items(&conversation_items).await; - } - - async fn record_initial_history_from_items(&self, items: Vec) { - self.record_conversation_items_internal(&items, false).await; - } - - /// build the initial messages vector for SessionConfigured by converting - /// ResponseItems into EventMsg. - fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec { - items - .iter() - .flat_map(|item| { - map_response_item_to_event_messages(item, self.show_raw_agent_reasoning) - }) - .collect() - } - - /// Sends the given event to the client and swallows the send event, if - /// any, logging it as an error. + /// Persist the event to rollout and send it to clients. pub(crate) async fn send_event(&self, event: Event) { + // Persist the event into rollout (recorder filters as needed) + let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())]; + self.persist_rollout_items(&rollout_items).await; if let Err(e) = self.tx_event.send(event).await { error!("failed to send tool call event: {e}"); } @@ -627,7 +599,7 @@ impl Session { reason, }), }; - let _ = self.tx_event.send(event).await; + self.send_event(event).await; rx_approve } @@ -659,7 +631,7 @@ impl Session { grant_root, }), }; - let _ = self.tx_event.send(event).await; + self.send_event(event).await; rx_approve } @@ -683,36 +655,76 @@ impl Session { state.approved_commands.insert(cmd); } - /// Records items to both the rollout and the chat completions/ZDR - /// transcript, if enabled. + /// Records input items: always append to conversation history and + /// persist these response items to rollout. async fn record_conversation_items(&self, items: &[ResponseItem]) { - self.record_conversation_items_internal(items, true).await; + self.record_into_history(items); + self.persist_rollout_response_items(items).await; } - async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) { - debug!("Recording items for conversation: {items:?}"); - if persist { - self.record_state_snapshot(items).await; + /// Append ResponseItems to the in-memory conversation history only. + fn record_into_history(&self, items: &[ResponseItem]) { + self.state + .lock_unchecked() + .history + .record_items(items.iter()); + } + + async fn persist_rollout_response_items(&self, items: &[ResponseItem]) { + let rollout_items: Vec = items + .iter() + .cloned() + .map(RolloutItem::ResponseItem) + .collect(); + self.persist_rollout_items(&rollout_items).await; + } + + fn build_initial_context(&self, turn_context: &TurnContext) -> Vec { + let mut items = Vec::::with_capacity(2); + if let Some(user_instructions) = turn_context.user_instructions.as_deref() { + items.push(UserInstructions::new(user_instructions.to_string()).into()); } - - self.state.lock_unchecked().history.record_items(items); + items.push(ResponseItem::from(EnvironmentContext::new( + Some(turn_context.cwd.clone()), + Some(turn_context.approval_policy), + Some(turn_context.sandbox_policy.clone()), + Some(self.user_shell.clone()), + ))); + items } - async fn record_state_snapshot(&self, items: &[ResponseItem]) { - let snapshot = { crate::rollout::SessionStateSnapshot {} }; - + async fn persist_rollout_items(&self, items: &[RolloutItem]) { let recorder = { let guard = self.rollout.lock_unchecked(); guard.as_ref().cloned() }; + if let Some(rec) = recorder + && let Err(e) = rec.record_items(items).await + { + error!("failed to record rollout items: {e:#}"); + } + } - if let Some(rec) = recorder { - if let Err(e) = rec.record_state(snapshot).await { - error!("failed to record rollout state: {e:#}"); - } - if let Err(e) = rec.record_items(items).await { - error!("failed to record rollout items: {e:#}"); - } + /// Record a user input item to conversation history and also persist a + /// corresponding UserMessage EventMsg to rollout. + async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) { + let response_item: ResponseItem = response_input.clone().into(); + // Add to conversation history and persist response item to rollout + self.record_conversation_items(std::slice::from_ref(&response_item)) + .await; + + // Derive user message events and persist only UserMessage to rollout + let msgs = + map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning); + let user_msgs: Vec = msgs + .into_iter() + .filter_map(|m| match m { + EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))), + _ => None, + }) + .collect(); + if !user_msgs.is_empty() { + self.persist_rollout_items(&user_msgs).await; } } @@ -755,7 +767,7 @@ impl Session { id: sub_id.to_string(), msg, }; - let _ = self.tx_event.send(event).await; + self.send_event(event).await; } async fn on_exec_command_end( @@ -802,7 +814,7 @@ impl Session { id: sub_id.to_string(), msg, }; - let _ = self.tx_event.send(event).await; + self.send_event(event).await; // If this is an apply_patch, after we emit the end patch, emit a second event // with the full turn diff if there is one. @@ -814,7 +826,7 @@ impl Session { id: sub_id.into(), msg, }; - let _ = self.tx_event.send(event).await; + self.send_event(event).await; } } } @@ -880,7 +892,7 @@ impl Session { message: message.into(), }), }; - let _ = self.tx_event.send(event).await; + self.send_event(event).await; } async fn notify_stream_error(&self, sub_id: &str, message: impl Into) { @@ -890,7 +902,7 @@ impl Session { message: message.into(), }), }; - let _ = self.tx_event.send(event).await; + self.send_event(event).await; } /// Build the full turn input by concatenating the current conversation @@ -1053,9 +1065,9 @@ impl AgentTask { id: self.sub_id, msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }), }; - let tx_event = self.sess.tx_event.clone(); + let sess = self.sess.clone(); tokio::spawn(async move { - tx_event.send(event).await.ok(); + sess.send_event(event).await; }); } } @@ -1260,7 +1272,7 @@ async fn submission_loop( Op::GetHistoryEntryRequest { offset, log_id } => { let config = config.clone(); - let tx_event = sess.tx_event.clone(); + let sess_clone = sess.clone(); let sub_id = sub.id.clone(); tokio::spawn(async move { @@ -1288,13 +1300,10 @@ async fn submission_loop( ), }; - if let Err(e) = tx_event.send(event).await { - warn!("failed to send GetHistoryEntryResponse event: {e}"); - } + sess_clone.send_event(event).await; }); } Op::ListMcpTools => { - let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); // This is a cheap lookup from the connection manager's cache. @@ -1305,12 +1314,9 @@ async fn submission_loop( crate::protocol::McpListToolsResponseEvent { tools }, ), }; - if let Err(e) = tx_event.send(event).await { - warn!("failed to send McpListToolsResponse event: {e}"); - } + sess.send_event(event).await; } Op::ListCustomPrompts => { - let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); let custom_prompts: Vec = @@ -1326,9 +1332,7 @@ async fn submission_loop( custom_prompts, }), }; - if let Err(e) = tx_event.send(event).await { - warn!("failed to send ListCustomPromptsResponse event: {e}"); - } + sess.send_event(event).await; } Op::Compact => { // Create a summarization request as user input @@ -1364,22 +1368,17 @@ async fn submission_loop( message: "Failed to shutdown rollout recorder".to_string(), }), }; - if let Err(e) = sess.tx_event.send(event).await { - warn!("failed to send error message: {e:?}"); - } + sess.send_event(event).await; } let event = Event { id: sub.id.clone(), msg: EventMsg::ShutdownComplete, }; - if let Err(e) = sess.tx_event.send(event).await { - warn!("failed to send Shutdown event: {e}"); - } + sess.send_event(event).await; break; } Op::GetHistory => { - let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); let event = Event { @@ -1389,9 +1388,7 @@ async fn submission_loop( entries: sess.state.lock_unchecked().history.contents(), }), }; - if let Err(e) = tx_event.send(event).await { - warn!("failed to send ConversationHistory event: {e}"); - } + sess.send_event(event).await; } _ => { // Ignore unknown ops; enum is non_exhaustive to allow extensions. @@ -1429,12 +1426,10 @@ async fn run_task( model_context_window: turn_context.client.get_model_context_window(), }), }; - if sess.tx_event.send(event).await.is_err() { - return; - } + sess.send_event(event).await; let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); - sess.record_conversation_items(&[initial_input_for_turn.clone().into()]) + sess.record_input_and_rollout_usermsg(&initial_input_for_turn) .await; let mut last_agent_message: Option = None; @@ -1603,7 +1598,7 @@ async fn run_task( message: e.to_string(), }), }; - sess.tx_event.send(event).await.ok(); + sess.send_event(event).await; // let the user continue the conversation break; } @@ -1614,7 +1609,7 @@ async fn run_task( id: sub_id, msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }), }; - sess.tx_event.send(event).await.ok(); + sess.send_event(event).await; } async fn run_turn( @@ -1812,13 +1807,12 @@ async fn try_run_turn( st.token_info = info.clone(); info }; - sess.tx_event - .send(Event { + let _ = sess + .send_event(Event { id: sub_id.to_string(), msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }), }) - .await - .ok(); + .await; let unified_diff = turn_diff_tracker.get_unified_diff(); if let Ok(Some(unified_diff)) = unified_diff { @@ -1827,7 +1821,7 @@ async fn try_run_turn( id: sub_id.to_string(), msg, }; - let _ = sess.tx_event.send(event).await; + sess.send_event(event).await; } return Ok(output); @@ -1837,21 +1831,21 @@ async fn try_run_turn( id: sub_id.to_string(), msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }), }; - sess.tx_event.send(event).await.ok(); + sess.send_event(event).await; } ResponseEvent::ReasoningSummaryDelta(delta) => { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }), }; - sess.tx_event.send(event).await.ok(); + sess.send_event(event).await; } ResponseEvent::ReasoningSummaryPartAdded => { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}), }; - sess.tx_event.send(event).await.ok(); + sess.send_event(event).await; } ResponseEvent::ReasoningContentDelta(delta) => { if sess.show_raw_agent_reasoning { @@ -1861,7 +1855,7 @@ async fn try_run_turn( AgentReasoningRawContentDeltaEvent { delta }, ), }; - sess.tx_event.send(event).await.ok(); + sess.send_event(event).await; } } } @@ -1882,9 +1876,7 @@ async fn run_compact_task( model_context_window, }), }; - if sess.tx_event.send(start_event).await.is_err() { - return; - } + sess.send_event(start_event).await; let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let turn_input: Vec = @@ -2062,7 +2054,7 @@ async fn handle_response_item( id: sub_id.to_string(), msg, }; - sess.tx_event.send(event).await.ok(); + sess.send_event(event).await; } None } diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index 87b60e01..e352680c 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -11,6 +11,7 @@ use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::SessionConfiguredEvent; use crate::rollout::RolloutRecorder; +use crate::rollout::recorder::RolloutItem; use codex_protocol::mcp_protocol::ConversationId; use codex_protocol::models::ResponseItem; use std::collections::HashMap; @@ -18,18 +19,72 @@ use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct ResumedHistory { pub conversation_id: ConversationId, - pub history: Vec, + pub history: Vec, pub rollout_path: PathBuf, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum InitialHistory { New, Resumed(ResumedHistory), - Forked(Vec), + Forked(Vec), +} + +impl InitialHistory { + pub(crate) fn get_rollout_items(&self) -> Vec { + match self { + InitialHistory::New => Vec::new(), + InitialHistory::Resumed(resumed) => resumed.history.clone(), + InitialHistory::Forked(items) => items.clone(), + } + } + pub fn get_response_items(&self) -> Vec { + match self { + InitialHistory::New => Vec::new(), + InitialHistory::Resumed(resumed) => resumed + .history + .iter() + .filter_map(|ri| match ri { + RolloutItem::ResponseItem(item) => Some(item.clone()), + _ => None, + }) + .collect(), + InitialHistory::Forked(items) => items + .iter() + .filter_map(|ri| match ri { + RolloutItem::ResponseItem(item) => Some(item.clone()), + _ => None, + }) + .collect(), + } + } + pub fn get_event_msgs(&self) -> Option> { + match self { + InitialHistory::New => None, + InitialHistory::Resumed(resumed) => Some( + resumed + .history + .iter() + .filter_map(|ri| match ri { + RolloutItem::EventMsg(ev) => Some(ev.clone()), + _ => None, + }) + .collect(), + ), + InitialHistory::Forked(items) => Some( + items + .iter() + .filter_map(|ri| match ri { + RolloutItem::EventMsg(ev) => Some(ev.clone()), + _ => None, + }) + .collect(), + ), + } + } } /// Represents a newly created Codex conversation, including the first event @@ -185,7 +240,8 @@ impl ConversationManager { /// and all items that follow them. fn truncate_after_dropping_last_messages(items: Vec, n: usize) -> InitialHistory { if n == 0 { - return InitialHistory::Forked(items); + let rolled: Vec = items.into_iter().map(RolloutItem::ResponseItem).collect(); + return InitialHistory::Forked(rolled); } // Walk backwards counting only `user` Message items, find cut index. @@ -207,7 +263,12 @@ fn truncate_after_dropping_last_messages(items: Vec, n: usize) -> // No prefix remains after dropping; start a new conversation. InitialHistory::New } else { - InitialHistory::Forked(items.into_iter().take(cut_index).collect()) + let rolled: Vec = items + .into_iter() + .take(cut_index) + .map(RolloutItem::ResponseItem) + .collect(); + InitialHistory::Forked(rolled) } } @@ -263,12 +324,18 @@ mod tests { ]; let truncated = truncate_after_dropping_last_messages(items.clone(), 1); + let got_items = truncated.get_rollout_items(); + let expected_items = vec![ + RolloutItem::ResponseItem(items[0].clone()), + RolloutItem::ResponseItem(items[1].clone()), + RolloutItem::ResponseItem(items[2].clone()), + ]; assert_eq!( - truncated, - InitialHistory::Forked(vec![items[0].clone(), items[1].clone(), items[2].clone(),]) + serde_json::to_value(&got_items).unwrap(), + serde_json::to_value(&expected_items).unwrap() ); let truncated2 = truncate_after_dropping_last_messages(items, 2); - assert_eq!(truncated2, InitialHistory::New); + assert!(matches!(truncated2, InitialHistory::New)); } } diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 44f08e67..c4feb629 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -10,6 +10,9 @@ use time::macros::format_description; use uuid::Uuid; use super::SESSIONS_SUBDIR; +use super::recorder::RolloutItem; +use super::recorder::RolloutLine; +use crate::protocol::EventMsg; /// Returned page of conversation summaries. #[derive(Debug, Default, PartialEq)] @@ -34,7 +37,7 @@ pub struct ConversationItem { } /// Hard cap to bound worst‑case work per request. -const MAX_SCAN_FILES: usize = 10_000; +const MAX_SCAN_FILES: usize = 100; const HEAD_RECORD_LIMIT: usize = 10; /// Pagination cursor identifying a file by timestamp and UUID. @@ -167,10 +170,16 @@ async fn traverse_directories_for_paths( if items.len() == page_size { break 'outer; } - let head = read_first_jsonl_records(&path, HEAD_RECORD_LIMIT) - .await - .unwrap_or_default(); - items.push(ConversationItem { path, head }); + // Read head and simultaneously detect message events within the same + // first N JSONL records to avoid a second file read. + let (head, saw_session_meta, saw_user_event) = + read_head_and_flags(&path, HEAD_RECORD_LIMIT) + .await + .unwrap_or((Vec::new(), false, false)); + // Apply filters: must have session meta and at least one user message event + if saw_session_meta && saw_user_event { + items.push(ConversationItem { path, head }); + } } } } @@ -273,16 +282,19 @@ fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uui Some((ts, uuid)) } -async fn read_first_jsonl_records( +async fn read_head_and_flags( path: &Path, max_records: usize, -) -> io::Result> { +) -> io::Result<(Vec, bool, bool)> { use tokio::io::AsyncBufReadExt; let file = tokio::fs::File::open(path).await?; let reader = tokio::io::BufReader::new(file); let mut lines = reader.lines(); let mut head: Vec = Vec::new(); + let mut saw_session_meta = false; + let mut saw_user_event = false; + while head.len() < max_records { let line_opt = lines.next_line().await?; let Some(line) = line_opt else { break }; @@ -290,9 +302,29 @@ async fn read_first_jsonl_records( if trimmed.is_empty() { continue; } - if let Ok(v) = serde_json::from_str::(trimmed) { - head.push(v); + + let parsed: Result = serde_json::from_str(trimmed); + let Ok(rollout_line) = parsed else { continue }; + + match rollout_line.item { + RolloutItem::SessionMeta(session_meta_line) => { + if let Ok(val) = serde_json::to_value(session_meta_line) { + head.push(val); + saw_session_meta = true; + } + } + RolloutItem::ResponseItem(item) => { + if let Ok(val) = serde_json::to_value(item) { + head.push(val); + } + } + RolloutItem::EventMsg(ev) => { + if matches!(ev, EventMsg::UserMessage(_)) { + saw_user_event = true; + } + } } } - Ok(head) + + Ok((head, saw_session_meta, saw_user_event)) } diff --git a/codex-rs/core/src/rollout/mod.rs b/codex-rs/core/src/rollout/mod.rs index a78d7609..341d39ae 100644 --- a/codex-rs/core/src/rollout/mod.rs +++ b/codex-rs/core/src/rollout/mod.rs @@ -10,7 +10,6 @@ pub mod recorder; pub use recorder::RolloutRecorder; pub use recorder::RolloutRecorderParams; pub use recorder::SessionMeta; -pub use recorder::SessionStateSnapshot; #[cfg(test)] pub mod tests; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 3eb52903..676f8e90 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -1,8 +1,21 @@ +use crate::protocol::EventMsg; +use crate::rollout::recorder::RolloutItem; use codex_protocol::models::ResponseItem; +/// Whether a rollout `item` should be persisted in rollout files. +#[inline] +pub(crate) fn is_persisted_response_item(item: &RolloutItem) -> bool { + match item { + RolloutItem::ResponseItem(item) => should_persist_response_item(item), + RolloutItem::EventMsg(ev) => should_persist_event_msg(ev), + // Always persist session meta + RolloutItem::SessionMeta(_) => true, + } +} + /// Whether a `ResponseItem` should be persisted in rollout files. #[inline] -pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool { +pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool { match item { ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } @@ -14,3 +27,44 @@ pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool { ResponseItem::WebSearchCall { .. } | ResponseItem::Other => false, } } + +/// Whether an `EventMsg` should be persisted in rollout files. +#[inline] +pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { + match ev { + EventMsg::UserMessage(_) + | EventMsg::AgentMessage(_) + | EventMsg::AgentReasoning(_) + | EventMsg::AgentReasoningRawContent(_) + | EventMsg::TokenCount(_) => true, + EventMsg::Error(_) + | EventMsg::TaskStarted(_) + | EventMsg::TaskComplete(_) + | EventMsg::AgentMessageDelta(_) + | EventMsg::AgentReasoningDelta(_) + | EventMsg::AgentReasoningRawContentDelta(_) + | EventMsg::AgentReasoningSectionBreak(_) + | EventMsg::SessionConfigured(_) + | EventMsg::McpToolCallBegin(_) + | EventMsg::McpToolCallEnd(_) + | EventMsg::WebSearchBegin(_) + | EventMsg::WebSearchEnd(_) + | EventMsg::ExecCommandBegin(_) + | EventMsg::ExecCommandOutputDelta(_) + | EventMsg::ExecCommandEnd(_) + | EventMsg::ExecApprovalRequest(_) + | EventMsg::ApplyPatchApprovalRequest(_) + | EventMsg::BackgroundEvent(_) + | EventMsg::StreamError(_) + | EventMsg::PatchApplyBegin(_) + | EventMsg::PatchApplyEnd(_) + | EventMsg::TurnDiff(_) + | EventMsg::GetHistoryEntryResponse(_) + | EventMsg::McpListToolsResponse(_) + | EventMsg::ListCustomPromptsResponse(_) + | EventMsg::PlanUpdate(_) + | EventMsg::TurnAborted(_) + | EventMsg::ShutdownComplete + | EventMsg::ConversationHistory(_) => false, + } +} diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 0e0af899..9954c4b1 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -28,25 +28,45 @@ use super::policy::is_persisted_response_item; use crate::config::Config; use crate::conversation_manager::InitialHistory; use crate::conversation_manager::ResumedHistory; +use crate::default_client::ORIGINATOR; use crate::git_info::GitInfo; use crate::git_info::collect_git_info; +use crate::protocol::EventMsg; use codex_protocol::models::ResponseItem; -#[derive(Serialize, Deserialize, Clone, Default)] +#[derive(Serialize, Deserialize, Clone, Default, Debug)] pub struct SessionMeta { pub id: ConversationId, pub timestamp: String, + pub cwd: PathBuf, + pub originator: String, + pub cli_version: String, pub instructions: Option, } -#[derive(Serialize)] -struct SessionMetaWithGit { +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SessionMetaLine { #[serde(flatten)] meta: SessionMeta, #[serde(skip_serializing_if = "Option::is_none")] git: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum RolloutItem { + SessionMeta(SessionMetaLine), + ResponseItem(ResponseItem), + EventMsg(EventMsg), +} + +#[derive(Serialize, Deserialize, Clone)] +pub(crate) struct RolloutLine { + pub(crate) timestamp: String, + #[serde(flatten)] + pub(crate) item: RolloutItem, +} + #[derive(Serialize, Deserialize, Default, Clone)] pub struct SessionStateSnapshot {} @@ -87,8 +107,7 @@ pub enum RolloutRecorderParams { } enum RolloutCmd { - AddItems(Vec), - UpdateState(SessionStateSnapshot), + AddItems(Vec), Shutdown { ack: oneshot::Sender<()> }, } @@ -144,8 +163,11 @@ impl RolloutRecorder { tokio::fs::File::from_std(file), path, Some(SessionMeta { - timestamp, id: session_id, + timestamp, + cwd: config.cwd.clone(), + originator: ORIGINATOR.value.clone(), + cli_version: env!("CARGO_PKG_VERSION").to_string(), instructions, }), ) @@ -176,7 +198,7 @@ impl RolloutRecorder { Ok(Self { tx, rollout_path }) } - pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> { + pub(crate) async fn record_items(&self, items: &[RolloutItem]) -> std::io::Result<()> { let mut filtered = Vec::new(); for item in items { // Note that function calls may look a bit strange if they are @@ -195,60 +217,48 @@ impl RolloutRecorder { .map_err(|e| IoError::other(format!("failed to queue rollout items: {e}"))) } - pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> { - self.tx - .send(RolloutCmd::UpdateState(state)) - .await - .map_err(|e| IoError::other(format!("failed to queue rollout state: {e}"))) - } - - pub async fn get_rollout_history(path: &Path) -> std::io::Result { + pub(crate) async fn get_rollout_history(path: &Path) -> std::io::Result { info!("Resuming rollout from {path:?}"); tracing::error!("Resuming rollout from {path:?}"); let text = tokio::fs::read_to_string(path).await?; - let mut lines = text.lines(); - let first_line = lines - .next() - .ok_or_else(|| IoError::other("empty session file"))?; - let conversation_id = match serde_json::from_str::(first_line) { - Ok(rollout_session_meta) => { - tracing::error!( - "Parsed conversation ID from rollout file: {:?}", - rollout_session_meta.id - ); - Some(rollout_session_meta.id) - } - Err(e) => { - return Err(IoError::other(format!( - "failed to parse first line of rollout file as SessionMeta: {e}" - ))); - } - }; + if text.trim().is_empty() { + return Err(IoError::other("empty session file")); + } - let mut items = Vec::new(); - for line in lines { + let mut items: Vec = Vec::new(); + let mut conversation_id: Option = None; + for line in text.lines() { if line.trim().is_empty() { continue; } let v: Value = match serde_json::from_str(line) { Ok(v) => v, - Err(_) => continue, - }; - if v.get("record_type") - .and_then(|rt| rt.as_str()) - .map(|s| s == "state") - .unwrap_or(false) - { - continue; - } - match serde_json::from_value::(v.clone()) { - Ok(item) => { - if is_persisted_response_item(&item) { - items.push(item); - } - } Err(e) => { - warn!("failed to parse item: {v:?}, error: {e}"); + warn!("failed to parse line as JSON: {line:?}, error: {e}"); + continue; + } + }; + + // Parse the rollout line structure + match serde_json::from_value::(v.clone()) { + Ok(rollout_line) => match rollout_line.item { + RolloutItem::SessionMeta(session_meta_line) => { + tracing::error!( + "Parsed conversation ID from rollout file: {:?}", + session_meta_line.meta.id + ); + conversation_id = Some(session_meta_line.meta.id); + items.push(RolloutItem::SessionMeta(session_meta_line)); + } + RolloutItem::ResponseItem(item) => { + items.push(RolloutItem::ResponseItem(item)); + } + RolloutItem::EventMsg(_ev) => { + items.push(RolloutItem::EventMsg(_ev)); + } + }, + Err(e) => { + warn!("failed to parse rollout line: {v:?}, error: {e}"); } } } @@ -352,13 +362,15 @@ async fn rollout_writer( // If we have a meta, collect git info asynchronously and write meta first if let Some(session_meta) = meta.take() { let git_info = collect_git_info(&cwd).await; - let session_meta_with_git = SessionMetaWithGit { + let session_meta_line = SessionMetaLine { meta: session_meta, git: git_info, }; - // Write the SessionMeta as the first item in the file - writer.write_line(&session_meta_with_git).await?; + // Write the SessionMeta as the first item in the file, wrapped in a rollout line + writer + .write_rollout_item(RolloutItem::SessionMeta(session_meta_line)) + .await?; } // Process rollout commands @@ -367,24 +379,10 @@ async fn rollout_writer( RolloutCmd::AddItems(items) => { for item in items { if is_persisted_response_item(&item) { - writer.write_line(&item).await?; + writer.write_rollout_item(item).await?; } } } - RolloutCmd::UpdateState(state) => { - #[derive(Serialize)] - struct StateLine<'a> { - record_type: &'static str, - #[serde(flatten)] - state: &'a SessionStateSnapshot, - } - writer - .write_line(&StateLine { - record_type: "state", - state: &state, - }) - .await?; - } RolloutCmd::Shutdown { ack } => { let _ = ack.send(()); } @@ -399,6 +397,20 @@ struct JsonlWriter { } impl JsonlWriter { + async fn write_rollout_item(&mut self, rollout_item: RolloutItem) -> std::io::Result<()> { + let timestamp_format: &[FormatItem] = format_description!( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" + ); + let timestamp = OffsetDateTime::now_utc() + .format(timestamp_format) + .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; + + let line = RolloutLine { + timestamp, + item: rollout_item, + }; + self.write_line(&line).await + } async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> { let mut json = serde_json::to_string(item)?; json.push('\n'); diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index 52c121e7..ecff658d 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -42,10 +42,30 @@ fn write_session_file( let meta = serde_json::json!({ "timestamp": ts_str, - "id": uuid.to_string() + "type": "session_meta", + "payload": { + "id": uuid, + "timestamp": ts_str, + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + } }); writeln!(file, "{meta}")?; + // Include at least one user message event to satisfy listing filters + let user_event = serde_json::json!({ + "timestamp": ts_str, + "type": "event_msg", + "payload": { + "type": "user_message", + "message": "Hello from user", + "kind": "plain" + } + }); + writeln!(file, "{user_event}")?; + for i in 0..num_records { let rec = serde_json::json!({ "record_type": "response", @@ -93,24 +113,30 @@ async fn test_list_conversations_latest_first() { .join("01") .join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl")); - let head_3 = vec![ - serde_json::json!({"timestamp": "2025-01-03T12-00-00", "id": u3.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - serde_json::json!({"record_type": "response", "index": 1}), - serde_json::json!({"record_type": "response", "index": 2}), - ]; - let head_2 = vec![ - serde_json::json!({"timestamp": "2025-01-02T12-00-00", "id": u2.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - serde_json::json!({"record_type": "response", "index": 1}), - serde_json::json!({"record_type": "response", "index": 2}), - ]; - let head_1 = vec![ - serde_json::json!({"timestamp": "2025-01-01T12-00-00", "id": u1.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - serde_json::json!({"record_type": "response", "index": 1}), - serde_json::json!({"record_type": "response", "index": 2}), - ]; + let head_3 = vec![serde_json::json!({ + "id": u3, + "timestamp": "2025-01-03T12-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; + let head_2 = vec![serde_json::json!({ + "id": u2, + "timestamp": "2025-01-02T12-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; + let head_1 = vec![serde_json::json!({ + "id": u1, + "timestamp": "2025-01-01T12-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; let expected_cursor: Cursor = serde_json::from_str(&format!("\"2025-01-01T12-00-00|{u1}\"")).unwrap(); @@ -170,14 +196,22 @@ async fn test_pagination_cursor() { .join("03") .join("04") .join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl")); - let head_5 = vec![ - serde_json::json!({"timestamp": "2025-03-05T09-00-00", "id": u5.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - ]; - let head_4 = vec![ - serde_json::json!({"timestamp": "2025-03-04T09-00-00", "id": u4.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - ]; + let head_5 = vec![serde_json::json!({ + "id": u5, + "timestamp": "2025-03-05T09-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; + let head_4 = vec![serde_json::json!({ + "id": u4, + "timestamp": "2025-03-04T09-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; let expected_cursor1: Cursor = serde_json::from_str(&format!("\"2025-03-04T09-00-00|{u4}\"")).unwrap(); let expected_page1 = ConversationsPage { @@ -212,14 +246,22 @@ async fn test_pagination_cursor() { .join("03") .join("02") .join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl")); - let head_3 = vec![ - serde_json::json!({"timestamp": "2025-03-03T09-00-00", "id": u3.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - ]; - let head_2 = vec![ - serde_json::json!({"timestamp": "2025-03-02T09-00-00", "id": u2.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - ]; + let head_3 = vec![serde_json::json!({ + "id": u3, + "timestamp": "2025-03-03T09-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; + let head_2 = vec![serde_json::json!({ + "id": u2, + "timestamp": "2025-03-02T09-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; let expected_cursor2: Cursor = serde_json::from_str(&format!("\"2025-03-02T09-00-00|{u2}\"")).unwrap(); let expected_page2 = ConversationsPage { @@ -248,10 +290,14 @@ async fn test_pagination_cursor() { .join("03") .join("01") .join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl")); - let head_1 = vec![ - serde_json::json!({"timestamp": "2025-03-01T09-00-00", "id": u1.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - ]; + let head_1 = vec![serde_json::json!({ + "id": u1, + "timestamp": "2025-03-01T09-00-00", + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; let expected_cursor3: Cursor = serde_json::from_str(&format!("\"2025-03-01T09-00-00|{u1}\"")).unwrap(); let expected_page3 = ConversationsPage { @@ -287,11 +333,14 @@ async fn test_get_conversation_contents() { .join("04") .join("01") .join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl")); - let expected_head = vec![ - serde_json::json!({"timestamp": ts, "id": uuid.to_string()}), - serde_json::json!({"record_type": "response", "index": 0}), - serde_json::json!({"record_type": "response", "index": 1}), - ]; + let expected_head = vec![serde_json::json!({ + "id": uuid, + "timestamp": ts, + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })]; let expected_cursor: Cursor = serde_json::from_str(&format!("\"{ts}|{uuid}\"")).unwrap(); let expected_page = ConversationsPage { items: vec![ConversationItem { @@ -305,10 +354,15 @@ async fn test_get_conversation_contents() { assert_eq!(page, expected_page); // Entire file contents equality - let meta = serde_json::json!({"timestamp": ts, "id": uuid.to_string()}); + let meta = serde_json::json!({"timestamp": ts, "type": "session_meta", "payload": {"id": uuid, "timestamp": ts, "instructions": null, "cwd": ".", "originator": "test_originator", "cli_version": "test_version"}}); + let user_event = serde_json::json!({ + "timestamp": ts, + "type": "event_msg", + "payload": {"type": "user_message", "message": "Hello from user", "kind": "plain"} + }); let rec0 = serde_json::json!({"record_type": "response", "index": 0}); let rec1 = serde_json::json!({"record_type": "response", "index": 1}); - let expected_content = format!("{meta}\n{rec0}\n{rec1}\n"); + let expected_content = format!("{meta}\n{user_event}\n{rec0}\n{rec1}\n"); assert_eq!(content, expected_content); } @@ -341,7 +395,14 @@ async fn test_stable_ordering_same_second_pagination() { .join("01") .join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl")); let head = |u: Uuid| -> Vec { - vec![serde_json::json!({"timestamp": ts, "id": u.to_string()})] + vec![serde_json::json!({ + "id": u, + "timestamp": ts, + "instructions": null, + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + })] }; let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap(); let expected_page1 = ConversationsPage { diff --git a/codex-rs/core/tests/suite/cli_stream.rs b/codex-rs/core/tests/suite/cli_stream.rs index ee4eef70..71624739 100644 --- a/codex-rs/core/tests/suite/cli_stream.rs +++ b/codex-rs/core/tests/suite/cli_stream.rs @@ -1,4 +1,5 @@ use assert_cmd::Command as AssertCommand; +use codex_core::RolloutRecorder; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use std::time::Duration; use std::time::Instant; @@ -77,6 +78,22 @@ async fn chat_mode_stream_cli() { assert_eq!(hi_lines, 1, "Expected exactly one line with 'hi'"); server.verify().await; + + // Verify a new session rollout was created and is discoverable via list_conversations + let page = RolloutRecorder::list_conversations(home.path(), 10, None) + .await + .expect("list conversations"); + assert!( + !page.items.is_empty(), + "expected at least one session to be listed" + ); + // First line of head must be the SessionMeta payload (id/timestamp) + let head0 = page.items[0].head.first().expect("missing head record"); + assert!(head0.get("id").is_some(), "head[0] missing id"); + assert!( + head0.get("timestamp").is_some(), + "head[0] missing timestamp" + ); } /// Verify that passing `-c experimental_instructions_file=...` to the CLI @@ -297,8 +314,10 @@ async fn integration_creates_and_checks_session_file() { Ok(v) => v, Err(_) => continue, }; - if item.get("type").and_then(|t| t.as_str()) == Some("message") - && let Some(c) = item.get("content") + if item.get("type").and_then(|t| t.as_str()) == Some("response_item") + && let Some(payload) = item.get("payload") + && payload.get("type").and_then(|t| t.as_str()) == Some("message") + && let Some(c) = payload.get("content") && c.to_string().contains(&marker) { matching_path = Some(path.to_path_buf()); @@ -361,9 +380,16 @@ async fn integration_creates_and_checks_session_file() { .unwrap_or_else(|_| panic!("missing session meta line")); let meta: serde_json::Value = serde_json::from_str(meta_line) .unwrap_or_else(|_| panic!("Failed to parse session meta line as JSON")); - assert!(meta.get("id").is_some(), "SessionMeta missing id"); + assert_eq!( + meta.get("type").and_then(|v| v.as_str()), + Some("session_meta") + ); + let payload = meta + .get("payload") + .unwrap_or_else(|| panic!("Missing payload in meta line")); + assert!(payload.get("id").is_some(), "SessionMeta missing id"); assert!( - meta.get("timestamp").is_some(), + payload.get("timestamp").is_some(), "SessionMeta missing timestamp" ); @@ -375,8 +401,10 @@ async fn integration_creates_and_checks_session_file() { let Ok(item) = serde_json::from_str::(line) else { continue; }; - if item.get("type").and_then(|t| t.as_str()) == Some("message") - && let Some(c) = item.get("content") + if item.get("type").and_then(|t| t.as_str()) == Some("response_item") + && let Some(payload) = item.get("payload") + && payload.get("type").and_then(|t| t.as_str()) == Some("message") + && let Some(c) = payload.get("content") && c.to_string().contains(&marker) { found_message = true; diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index ed609c21..32466295 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -123,10 +123,22 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { let tmpdir = TempDir::new().unwrap(); let session_path = tmpdir.path().join("resume-session.jsonl"); let mut f = std::fs::File::create(&session_path).unwrap(); + let convo_id = Uuid::new_v4(); writeln!( f, "{}", - json!({"meta":"test","instructions":"be nice", "id": Uuid::new_v4(), "timestamp": "2024-01-01T00:00:00Z"}) + json!({ + "timestamp": "2024-01-01T00:00:00.000Z", + "type": "session_meta", + "payload": { + "id": convo_id, + "timestamp": "2024-01-01T00:00:00Z", + "instructions": "be nice", + "cwd": ".", + "originator": "test_originator", + "cli_version": "test_version" + } + }) ) .unwrap(); @@ -138,7 +150,17 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { text: "resumed user message".to_string(), }], }; - writeln!(f, "{}", serde_json::to_string(&prior_user).unwrap()).unwrap(); + let prior_user_json = serde_json::to_value(&prior_user).unwrap(); + writeln!( + f, + "{}", + json!({ + "timestamp": "2024-01-01T00:00:01.000Z", + "type": "response_item", + "payload": prior_user_json + }) + ) + .unwrap(); // Prior item: system message (excluded from API history) let prior_system = codex_protocol::models::ResponseItem::Message { @@ -148,7 +170,17 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { text: "resumed system instruction".to_string(), }], }; - writeln!(f, "{}", serde_json::to_string(&prior_system).unwrap()).unwrap(); + let prior_system_json = serde_json::to_value(&prior_system).unwrap(); + writeln!( + f, + "{}", + json!({ + "timestamp": "2024-01-01T00:00:02.000Z", + "type": "response_item", + "payload": prior_system_json + }) + ) + .unwrap(); // Prior item: assistant message let prior_item = codex_protocol::models::ResponseItem::Message { @@ -158,7 +190,17 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { text: "resumed assistant message".to_string(), }], }; - writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap(); + let prior_item_json = serde_json::to_value(&prior_item).unwrap(); + writeln!( + f, + "{}", + json!({ + "timestamp": "2024-01-01T00:00:03.000Z", + "type": "response_item", + "payload": prior_item_json + }) + ) + .unwrap(); drop(f); // Mock server that will receive the resumed request @@ -196,16 +238,13 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { .await .expect("create new conversation"); - // 1) Assert initial_messages contains the prior user + assistant messages as EventMsg entries + // 1) Assert initial_messages only includes existing EventMsg entries; response items are not converted let initial_msgs = session_configured .initial_messages .clone() - .expect("expected initial messages for resumed session"); + .expect("expected initial messages option for resumed session"); let initial_json = serde_json::to_value(&initial_msgs).unwrap(); - let expected_initial_json = json!([ - { "type": "user_message", "message": "resumed user message", "kind": "plain" }, - { "type": "agent_message", "message": "resumed assistant message" } - ]); + let expected_initial_json = json!([]); assert_eq!(initial_json, expected_initial_json); // 2) Submit new input; the request body must include the prior item followed by the new user input. diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index fa84bf90..53e0b9d5 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -1251,10 +1251,7 @@ fn extract_conversation_summary( head: &[serde_json::Value], ) -> Option { let session_meta = match head.first() { - Some(first_line) => match serde_json::from_value::(first_line.clone()) { - Ok(session_meta) => session_meta, - Err(..) => return None, - }, + Some(first_line) => serde_json::from_value::(first_line.clone()).ok()?, None => return None, }; @@ -1312,6 +1309,10 @@ mod tests { json!({ "id": conversation_id.0, "timestamp": timestamp, + "cwd": "/", + "originator": "codex", + "cli_version": "0.0.0", + "instructions": null }), json!({ "type": "message", diff --git a/codex-rs/mcp-server/tests/suite/list_resume.rs b/codex-rs/mcp-server/tests/suite/list_resume.rs index eef69c6d..9b5748ca 100644 --- a/codex-rs/mcp-server/tests/suite/list_resume.rs +++ b/codex-rs/mcp-server/tests/suite/list_resume.rs @@ -156,14 +156,45 @@ fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str, let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl")); let mut lines = Vec::new(); - // Meta line with timestamp - lines.push(json!({"timestamp": meta_rfc3339, "id": uuid}).to_string()); - // Minimal user message entry as a persisted response item + // Meta line with timestamp (flattened meta in payload for new schema) lines.push( json!({ - "type":"message", - "role":"user", - "content":[{"type":"input_text","text": preview}] + "timestamp": meta_rfc3339, + "type": "session_meta", + "payload": { + "id": uuid, + "timestamp": meta_rfc3339, + "cwd": "/", + "originator": "codex", + "cli_version": "0.0.0", + "instructions": null + } + }) + .to_string(), + ); + // Minimal user message entry as a persisted response item (with envelope timestamp) + lines.push( + json!({ + "timestamp": meta_rfc3339, + "type":"response_item", + "payload": { + "type":"message", + "role":"user", + "content":[{"type":"input_text","text": preview}] + } + }) + .to_string(), + ); + // Add a matching user message event line to satisfy filters + lines.push( + json!({ + "timestamp": meta_rfc3339, + "type":"event_msg", + "payload": { + "type":"user_message", + "message": preview, + "kind": "plain" + } }) .to_string(), ); diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index f34a9a55..8609ab0c 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -2,7 +2,6 @@ use std::path::Path; use std::path::PathBuf; use chrono::DateTime; -use chrono::TimeZone; use chrono::Utc; use codex_core::ConversationItem; use codex_core::ConversationsPage; @@ -255,19 +254,10 @@ impl PickerState { } fn to_rows(page: ConversationsPage) -> Vec { - use std::cmp::Reverse; - let mut rows: Vec = page - .items - .into_iter() - .filter_map(|it| head_to_row(&it)) - .collect(); - // Ensure newest-first ordering within the page by timestamp when available. - let epoch = Utc.timestamp_opt(0, 0).single().unwrap_or_else(Utc::now); - rows.sort_by_key(|r| Reverse(r.ts.unwrap_or(epoch))); - rows + page.items.into_iter().map(|it| head_to_row(&it)).collect() } -fn head_to_row(item: &ConversationItem) -> Option { +fn head_to_row(item: &ConversationItem) -> Row { let mut ts: Option> = None; if let Some(first) = item.head.first() && let Some(t) = first.get("timestamp").and_then(|v| v.as_str()) @@ -276,16 +266,16 @@ fn head_to_row(item: &ConversationItem) -> Option { ts = Some(parsed.with_timezone(&Utc)); } - let preview = preview_from_head(&item.head)?; - let preview = preview.trim().to_string(); - if preview.is_empty() { - return None; - } - Some(Row { + let preview = preview_from_head(&item.head) + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| String::from("(no message yet)")); + + Row { path: item.path.clone(), preview, ts, - }) + } } fn preview_from_head(head: &[serde_json::Value]) -> Option { @@ -483,7 +473,7 @@ mod tests { } #[test] - fn to_rows_sorts_descending_by_timestamp() { + fn to_rows_preserves_backend_order() { // Construct two items with different timestamps and real user text. let a = ConversationItem { path: PathBuf::from("/tmp/a.jsonl"), @@ -500,8 +490,8 @@ mod tests { reached_scan_cap: false, }); assert_eq!(rows.len(), 2); - // Expect the newer timestamp (B) first - assert!(rows[0].preview.contains('B')); - assert!(rows[1].preview.contains('A')); + // Preserve the given order; backend already provides newest-first + assert!(rows[0].preview.contains('A')); + assert!(rows[1].preview.contains('B')); } }