From d77b33ded77542cb66915fb96de083c79ce63c7e Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 3 Sep 2025 00:39:19 -0700 Subject: [PATCH] core(rollout): extract rollout module, add listing API, and return file heads (#1634) - Move rollout persistence and listing into a dedicated module: rollout/{recorder,list}. - Expose lightweight conversation listing that returns file paths plus the first 5 JSONL records for preview. --- codex-rs/core/Cargo.toml | 2 +- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/rollout/list.rs | 295 ++++++++++++++ codex-rs/core/src/rollout/mod.rs | 12 + .../src/{rollout.rs => rollout/recorder.rs} | 18 +- codex-rs/core/src/rollout/tests.rs | 384 ++++++++++++++++++ 6 files changed, 708 insertions(+), 4 deletions(-) create mode 100644 codex-rs/core/src/rollout/list.rs create mode 100644 codex-rs/core/src/rollout/mod.rs rename codex-rs/core/src/{rollout.rs => rollout/recorder.rs} (95%) create mode 100644 codex-rs/core/src/rollout/tests.rs diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 12f0bc62..3d2cf4a6 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -41,7 +41,7 @@ similar = "2.7.0" strum_macros = "0.27.2" tempfile = "3" thiserror = "2.0.16" -time = { version = "0.3", features = ["formatting", "local-offset", "macros"] } +time = { version = "0.3", features = ["formatting", "parsing", "local-offset", "macros"] } tokio = { version = "1", features = [ "io-std", "macros", diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index b4966e79..8ae3e335 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -59,6 +59,7 @@ pub mod terminal; mod tool_apply_patch; pub mod turn_diff_tracker; pub mod user_agent; +pub use rollout::list::ConversationsPage; mod user_notification; pub mod util; pub use apply_patch::CODEX_APPLY_PATCH_ARG1; diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs new file mode 100644 index 00000000..375140c4 --- /dev/null +++ b/codex-rs/core/src/rollout/list.rs @@ -0,0 +1,295 @@ +use std::cmp::Reverse; +use std::io::{self}; +use std::path::Path; +use std::path::PathBuf; + +use time::OffsetDateTime; +use time::PrimitiveDateTime; +use time::format_description::FormatItem; +use time::macros::format_description; +use uuid::Uuid; + +use super::SESSIONS_SUBDIR; + +/// Returned page of conversation summaries. +#[derive(Debug, Default, PartialEq)] +pub struct ConversationsPage { + /// Conversation summaries ordered newest first. + pub items: Vec, + /// Opaque pagination token to resume after the last item, or `None` if end. + pub next_cursor: Option, + /// Total number of files touched while scanning this request. + pub num_scanned_files: usize, + /// True if a hard scan cap was hit; consider resuming with `next_cursor`. + pub reached_scan_cap: bool, +} + +/// Summary information for a conversation rollout file. +#[derive(Debug, PartialEq)] +pub struct ConversationItem { + /// Absolute path to the rollout file. + pub path: PathBuf, + /// First up to 5 JSONL records parsed as JSON (includes meta line). + pub head: Vec, +} + +/// Hard cap to bound worst‑case work per request. +const MAX_SCAN_FILES: usize = 50_000; + +/// Pagination cursor identifying a file by timestamp and UUID. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Cursor { + ts: OffsetDateTime, + id: Uuid, +} + +impl Cursor { + fn new(ts: OffsetDateTime, id: Uuid) -> Self { + Self { ts, id } + } +} + +impl serde::Serialize for Cursor { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let ts_str = self + .ts + .format(&format_description!( + "[year]-[month]-[day]T[hour]-[minute]-[second]" + )) + .map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?; + serializer.serialize_str(&format!("{ts_str}|{}", self.id)) + } +} + +impl<'de> serde::Deserialize<'de> for Cursor { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor")) + } +} + +/// Retrieve recorded conversation file paths with token pagination. The returned `next_cursor` +/// can be supplied on the next call to resume after the last returned item, resilient to +/// concurrent new sessions being appended. Ordering is stable by timestamp desc, then UUID desc. +pub(crate) async fn get_conversations( + codex_home: &Path, + page_size: usize, + cursor: Option<&Cursor>, +) -> io::Result { + let mut root = codex_home.to_path_buf(); + root.push(SESSIONS_SUBDIR); + + if !root.exists() { + return Ok(ConversationsPage { + items: Vec::new(), + next_cursor: None, + num_scanned_files: 0, + reached_scan_cap: false, + }); + } + + let anchor = cursor.cloned(); + + let result = traverse_directories_for_paths(root.clone(), page_size, anchor).await?; + Ok(result) +} + +/// Load the full contents of a single conversation session file at `path`. +/// Returns the entire file contents as a String. +#[allow(dead_code)] +pub(crate) async fn get_conversation(path: &Path) -> io::Result { + tokio::fs::read_to_string(path).await +} + +/// Load conversation file paths from disk using directory traversal. +/// +/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-.jsonl` +/// Returned newest (latest) first. +async fn traverse_directories_for_paths( + root: PathBuf, + page_size: usize, + anchor: Option, +) -> io::Result { + let mut items: Vec = Vec::with_capacity(page_size); + let mut scanned_files = 0usize; + let mut anchor_passed = anchor.is_none(); + let (anchor_ts, anchor_id) = match anchor { + Some(c) => (c.ts, c.id), + None => (OffsetDateTime::UNIX_EPOCH, Uuid::nil()), + }; + + let year_dirs = collect_dirs_desc(&root, |s| s.parse::().ok()).await?; + + 'outer: for (_year, year_path) in year_dirs.iter() { + if scanned_files >= MAX_SCAN_FILES { + break; + } + let month_dirs = collect_dirs_desc(year_path, |s| s.parse::().ok()).await?; + for (_month, month_path) in month_dirs.iter() { + if scanned_files >= MAX_SCAN_FILES { + break 'outer; + } + let day_dirs = collect_dirs_desc(month_path, |s| s.parse::().ok()).await?; + for (_day, day_path) in day_dirs.iter() { + if scanned_files >= MAX_SCAN_FILES { + break 'outer; + } + let mut day_files = collect_files(day_path, |name_str, path| { + if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") { + return None; + } + + parse_timestamp_uuid_from_filename(name_str) + .map(|(ts, id)| (ts, id, name_str.to_string(), path.to_path_buf())) + }) + .await?; + // Stable ordering within the same second: (timestamp desc, uuid desc) + day_files.sort_by_key(|(ts, sid, _name_str, _path)| (Reverse(*ts), Reverse(*sid))); + for (ts, sid, _name_str, path) in day_files.into_iter() { + scanned_files += 1; + if scanned_files >= MAX_SCAN_FILES && items.len() >= page_size { + break 'outer; + } + if !anchor_passed { + if ts < anchor_ts || (ts == anchor_ts && sid < anchor_id) { + anchor_passed = true; + } else { + continue; + } + } + if items.len() == page_size { + break 'outer; + } + let head = read_first_jsonl_records(&path, 5).await.unwrap_or_default(); + items.push(ConversationItem { path, head }); + } + } + } + } + + let next = build_next_cursor(&items); + Ok(ConversationsPage { + items, + next_cursor: next, + num_scanned_files: scanned_files, + reached_scan_cap: scanned_files >= MAX_SCAN_FILES, + }) +} + +/// Pagination cursor token format: "|" where `file_ts` matches the +/// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames. +/// The cursor orders files by timestamp desc, then UUID desc. +fn parse_cursor(token: &str) -> Option { + let (file_ts, uuid_str) = token.split_once('|')?; + + let Ok(uuid) = Uuid::parse_str(uuid_str) else { + return None; + }; + + let format: &[FormatItem] = + format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"); + let ts = PrimitiveDateTime::parse(file_ts, format).ok()?.assume_utc(); + + Some(Cursor::new(ts, uuid)) +} + +fn build_next_cursor(items: &[ConversationItem]) -> Option { + let last = items.last()?; + let file_name = last.path.file_name()?.to_string_lossy(); + let (ts, id) = parse_timestamp_uuid_from_filename(&file_name)?; + Some(Cursor::new(ts, id)) +} + +/// Collects immediate subdirectories of `parent`, parses their (string) names with `parse`, +/// and returns them sorted descending by the parsed key. +async fn collect_dirs_desc(parent: &Path, parse: F) -> io::Result> +where + T: Ord + Copy, + F: Fn(&str) -> Option, +{ + let mut dir = tokio::fs::read_dir(parent).await?; + let mut vec: Vec<(T, PathBuf)> = Vec::new(); + while let Some(entry) = dir.next_entry().await? { + if entry + .file_type() + .await + .map(|ft| ft.is_dir()) + .unwrap_or(false) + && let Some(s) = entry.file_name().to_str() + && let Some(v) = parse(s) + { + vec.push((v, entry.path())); + } + } + vec.sort_by_key(|(v, _)| Reverse(*v)); + Ok(vec) +} + +/// Collects files in a directory and parses them with `parse`. +async fn collect_files(parent: &Path, parse: F) -> io::Result> +where + F: Fn(&str, &Path) -> Option, +{ + let mut dir = tokio::fs::read_dir(parent).await?; + let mut collected: Vec = Vec::new(); + while let Some(entry) = dir.next_entry().await? { + if entry + .file_type() + .await + .map(|ft| ft.is_file()) + .unwrap_or(false) + && let Some(s) = entry.file_name().to_str() + && let Some(v) = parse(s, &entry.path()) + { + collected.push(v); + } + } + Ok(collected) +} + +fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> { + // Expected: rollout-YYYY-MM-DDThh-mm-ss-.jsonl + let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?; + + // Scan from the right for a '-' such that the suffix parses as a UUID. + let (sep_idx, uuid) = core + .match_indices('-') + .rev() + .find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?; + + let ts_str = &core[..sep_idx]; + let format: &[FormatItem] = + format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"); + let ts = PrimitiveDateTime::parse(ts_str, format).ok()?.assume_utc(); + Some((ts, uuid)) +} + +async fn read_first_jsonl_records( + path: &Path, + max_records: usize, +) -> io::Result> { + use tokio::io::AsyncBufReadExt; + + let file = tokio::fs::File::open(path).await?; + let reader = tokio::io::BufReader::new(file); + let mut lines = reader.lines(); + let mut head: Vec = Vec::new(); + while head.len() < max_records { + let line_opt = lines.next_line().await?; + let Some(line) = line_opt else { break }; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + if let Ok(v) = serde_json::from_str::(trimmed) { + head.push(v); + } + } + Ok(head) +} diff --git a/codex-rs/core/src/rollout/mod.rs b/codex-rs/core/src/rollout/mod.rs new file mode 100644 index 00000000..19c66a19 --- /dev/null +++ b/codex-rs/core/src/rollout/mod.rs @@ -0,0 +1,12 @@ +//! Rollout module: persistence and discovery of session rollout files. + +pub(crate) const SESSIONS_SUBDIR: &str = "sessions"; + +pub mod list; +pub mod recorder; + +pub use recorder::RolloutRecorder; +pub use recorder::SessionStateSnapshot; + +#[cfg(test)] +pub mod tests; diff --git a/codex-rs/core/src/rollout.rs b/codex-rs/core/src/rollout/recorder.rs similarity index 95% rename from codex-rs/core/src/rollout.rs rename to codex-rs/core/src/rollout/recorder.rs index 98d576cc..20d30df6 100644 --- a/codex-rs/core/src/rollout.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -19,14 +19,16 @@ use tracing::info; use tracing::warn; use uuid::Uuid; +use super::SESSIONS_SUBDIR; +use super::list::ConversationsPage; +use super::list::Cursor; +use super::list::get_conversations; use crate::config::Config; use crate::conversation_manager::InitialHistory; use crate::git_info::GitInfo; use crate::git_info::collect_git_info; use codex_protocol::models::ResponseItem; -const SESSIONS_SUBDIR: &str = "sessions"; - #[derive(Serialize, Deserialize, Clone, Default)] pub struct SessionMeta { pub id: Uuid, @@ -65,7 +67,7 @@ pub struct SavedSession { /// $ fx ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl /// ``` #[derive(Clone)] -pub(crate) struct RolloutRecorder { +pub struct RolloutRecorder { tx: Sender, } @@ -76,6 +78,16 @@ enum RolloutCmd { } impl RolloutRecorder { + #[allow(dead_code)] + /// List conversations (rollout files) under the provided Codex home directory. + pub async fn list_conversations( + codex_home: &Path, + page_size: usize, + cursor: Option<&Cursor>, + ) -> std::io::Result { + get_conversations(codex_home, page_size, cursor).await + } + /// Attempt to create a new [`RolloutRecorder`]. If the sessions directory /// cannot be created or the rollout file cannot be opened we return the /// error so the caller can decide whether to disable persistence. diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs new file mode 100644 index 00000000..52c121e7 --- /dev/null +++ b/codex-rs/core/src/rollout/tests.rs @@ -0,0 +1,384 @@ +#![allow(clippy::unwrap_used, clippy::expect_used)] + +use std::fs::File; +use std::fs::{self}; +use std::io::Write; +use std::path::Path; + +use tempfile::TempDir; +use time::OffsetDateTime; +use time::PrimitiveDateTime; +use time::format_description::FormatItem; +use time::macros::format_description; +use uuid::Uuid; + +use crate::rollout::list::ConversationItem; +use crate::rollout::list::ConversationsPage; +use crate::rollout::list::Cursor; +use crate::rollout::list::get_conversation; +use crate::rollout::list::get_conversations; + +fn write_session_file( + root: &Path, + ts_str: &str, + uuid: Uuid, + num_records: usize, +) -> std::io::Result<(OffsetDateTime, Uuid)> { + let format: &[FormatItem] = + format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"); + let dt = PrimitiveDateTime::parse(ts_str, format) + .unwrap() + .assume_utc(); + let dir = root + .join("sessions") + .join(format!("{:04}", dt.year())) + .join(format!("{:02}", u8::from(dt.month()))) + .join(format!("{:02}", dt.day())); + fs::create_dir_all(&dir)?; + + let filename = format!("rollout-{ts_str}-{uuid}.jsonl"); + let file_path = dir.join(filename); + let mut file = File::create(file_path)?; + + let meta = serde_json::json!({ + "timestamp": ts_str, + "id": uuid.to_string() + }); + writeln!(file, "{meta}")?; + + for i in 0..num_records { + let rec = serde_json::json!({ + "record_type": "response", + "index": i + }); + writeln!(file, "{rec}")?; + } + Ok((dt, uuid)) +} + +#[tokio::test] +async fn test_list_conversations_latest_first() { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + + // Fixed UUIDs for deterministic expectations + let u1 = Uuid::from_u128(1); + let u2 = Uuid::from_u128(2); + let u3 = Uuid::from_u128(3); + + // Create three sessions across three days + write_session_file(home, "2025-01-01T12-00-00", u1, 3).unwrap(); + write_session_file(home, "2025-01-02T12-00-00", u2, 3).unwrap(); + write_session_file(home, "2025-01-03T12-00-00", u3, 3).unwrap(); + + let page = get_conversations(home, 10, None).await.unwrap(); + + // Build expected objects + let p1 = home + .join("sessions") + .join("2025") + .join("01") + .join("03") + .join(format!("rollout-2025-01-03T12-00-00-{u3}.jsonl")); + let p2 = home + .join("sessions") + .join("2025") + .join("01") + .join("02") + .join(format!("rollout-2025-01-02T12-00-00-{u2}.jsonl")); + let p3 = home + .join("sessions") + .join("2025") + .join("01") + .join("01") + .join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl")); + + let head_3 = vec![ + serde_json::json!({"timestamp": "2025-01-03T12-00-00", "id": u3.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + serde_json::json!({"record_type": "response", "index": 1}), + serde_json::json!({"record_type": "response", "index": 2}), + ]; + let head_2 = vec![ + serde_json::json!({"timestamp": "2025-01-02T12-00-00", "id": u2.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + serde_json::json!({"record_type": "response", "index": 1}), + serde_json::json!({"record_type": "response", "index": 2}), + ]; + let head_1 = vec![ + serde_json::json!({"timestamp": "2025-01-01T12-00-00", "id": u1.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + serde_json::json!({"record_type": "response", "index": 1}), + serde_json::json!({"record_type": "response", "index": 2}), + ]; + + let expected_cursor: Cursor = + serde_json::from_str(&format!("\"2025-01-01T12-00-00|{u1}\"")).unwrap(); + + let expected = ConversationsPage { + items: vec![ + ConversationItem { + path: p1, + head: head_3, + }, + ConversationItem { + path: p2, + head: head_2, + }, + ConversationItem { + path: p3, + head: head_1, + }, + ], + next_cursor: Some(expected_cursor), + num_scanned_files: 3, + reached_scan_cap: false, + }; + + assert_eq!(page, expected); +} + +#[tokio::test] +async fn test_pagination_cursor() { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + + // Fixed UUIDs for deterministic expectations + let u1 = Uuid::from_u128(11); + let u2 = Uuid::from_u128(22); + let u3 = Uuid::from_u128(33); + let u4 = Uuid::from_u128(44); + let u5 = Uuid::from_u128(55); + + // Oldest to newest + write_session_file(home, "2025-03-01T09-00-00", u1, 1).unwrap(); + write_session_file(home, "2025-03-02T09-00-00", u2, 1).unwrap(); + write_session_file(home, "2025-03-03T09-00-00", u3, 1).unwrap(); + write_session_file(home, "2025-03-04T09-00-00", u4, 1).unwrap(); + write_session_file(home, "2025-03-05T09-00-00", u5, 1).unwrap(); + + let page1 = get_conversations(home, 2, None).await.unwrap(); + let p5 = home + .join("sessions") + .join("2025") + .join("03") + .join("05") + .join(format!("rollout-2025-03-05T09-00-00-{u5}.jsonl")); + let p4 = home + .join("sessions") + .join("2025") + .join("03") + .join("04") + .join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl")); + let head_5 = vec![ + serde_json::json!({"timestamp": "2025-03-05T09-00-00", "id": u5.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + ]; + let head_4 = vec![ + serde_json::json!({"timestamp": "2025-03-04T09-00-00", "id": u4.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + ]; + let expected_cursor1: Cursor = + serde_json::from_str(&format!("\"2025-03-04T09-00-00|{u4}\"")).unwrap(); + let expected_page1 = ConversationsPage { + items: vec![ + ConversationItem { + path: p5, + head: head_5, + }, + ConversationItem { + path: p4, + head: head_4, + }, + ], + next_cursor: Some(expected_cursor1.clone()), + num_scanned_files: 3, // scanned 05, 04, and peeked at 03 before breaking + reached_scan_cap: false, + }; + assert_eq!(page1, expected_page1); + + let page2 = get_conversations(home, 2, page1.next_cursor.as_ref()) + .await + .unwrap(); + let p3 = home + .join("sessions") + .join("2025") + .join("03") + .join("03") + .join(format!("rollout-2025-03-03T09-00-00-{u3}.jsonl")); + let p2 = home + .join("sessions") + .join("2025") + .join("03") + .join("02") + .join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl")); + let head_3 = vec![ + serde_json::json!({"timestamp": "2025-03-03T09-00-00", "id": u3.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + ]; + let head_2 = vec![ + serde_json::json!({"timestamp": "2025-03-02T09-00-00", "id": u2.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + ]; + let expected_cursor2: Cursor = + serde_json::from_str(&format!("\"2025-03-02T09-00-00|{u2}\"")).unwrap(); + let expected_page2 = ConversationsPage { + items: vec![ + ConversationItem { + path: p3, + head: head_3, + }, + ConversationItem { + path: p2, + head: head_2, + }, + ], + next_cursor: Some(expected_cursor2.clone()), + num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02, and peeked at 01 + reached_scan_cap: false, + }; + assert_eq!(page2, expected_page2); + + let page3 = get_conversations(home, 2, page2.next_cursor.as_ref()) + .await + .unwrap(); + let p1 = home + .join("sessions") + .join("2025") + .join("03") + .join("01") + .join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl")); + let head_1 = vec![ + serde_json::json!({"timestamp": "2025-03-01T09-00-00", "id": u1.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + ]; + let expected_cursor3: Cursor = + serde_json::from_str(&format!("\"2025-03-01T09-00-00|{u1}\"")).unwrap(); + let expected_page3 = ConversationsPage { + items: vec![ConversationItem { + path: p1, + head: head_1, + }], + next_cursor: Some(expected_cursor3.clone()), + num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01 + reached_scan_cap: false, + }; + assert_eq!(page3, expected_page3); +} + +#[tokio::test] +async fn test_get_conversation_contents() { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + + let uuid = Uuid::new_v4(); + let ts = "2025-04-01T10-30-00"; + write_session_file(home, ts, uuid, 2).unwrap(); + + let page = get_conversations(home, 1, None).await.unwrap(); + let path = &page.items[0].path; + + let content = get_conversation(path).await.unwrap(); + + // Page equality (single item) + let expected_path = home + .join("sessions") + .join("2025") + .join("04") + .join("01") + .join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl")); + let expected_head = vec![ + serde_json::json!({"timestamp": ts, "id": uuid.to_string()}), + serde_json::json!({"record_type": "response", "index": 0}), + serde_json::json!({"record_type": "response", "index": 1}), + ]; + let expected_cursor: Cursor = serde_json::from_str(&format!("\"{ts}|{uuid}\"")).unwrap(); + let expected_page = ConversationsPage { + items: vec![ConversationItem { + path: expected_path.clone(), + head: expected_head, + }], + next_cursor: Some(expected_cursor), + num_scanned_files: 1, + reached_scan_cap: false, + }; + assert_eq!(page, expected_page); + + // Entire file contents equality + let meta = serde_json::json!({"timestamp": ts, "id": uuid.to_string()}); + let rec0 = serde_json::json!({"record_type": "response", "index": 0}); + let rec1 = serde_json::json!({"record_type": "response", "index": 1}); + let expected_content = format!("{meta}\n{rec0}\n{rec1}\n"); + assert_eq!(content, expected_content); +} + +#[tokio::test] +async fn test_stable_ordering_same_second_pagination() { + let temp = TempDir::new().unwrap(); + let home = temp.path(); + + let ts = "2025-07-01T00-00-00"; + let u1 = Uuid::from_u128(1); + let u2 = Uuid::from_u128(2); + let u3 = Uuid::from_u128(3); + + write_session_file(home, ts, u1, 0).unwrap(); + write_session_file(home, ts, u2, 0).unwrap(); + write_session_file(home, ts, u3, 0).unwrap(); + + let page1 = get_conversations(home, 2, None).await.unwrap(); + + let p3 = home + .join("sessions") + .join("2025") + .join("07") + .join("01") + .join(format!("rollout-2025-07-01T00-00-00-{u3}.jsonl")); + let p2 = home + .join("sessions") + .join("2025") + .join("07") + .join("01") + .join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl")); + let head = |u: Uuid| -> Vec { + vec![serde_json::json!({"timestamp": ts, "id": u.to_string()})] + }; + let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap(); + let expected_page1 = ConversationsPage { + items: vec![ + ConversationItem { + path: p3, + head: head(u3), + }, + ConversationItem { + path: p2, + head: head(u2), + }, + ], + next_cursor: Some(expected_cursor1.clone()), + num_scanned_files: 3, // scanned u3, u2, peeked u1 + reached_scan_cap: false, + }; + assert_eq!(page1, expected_page1); + + let page2 = get_conversations(home, 2, page1.next_cursor.as_ref()) + .await + .unwrap(); + let p1 = home + .join("sessions") + .join("2025") + .join("07") + .join("01") + .join(format!("rollout-2025-07-01T00-00-00-{u1}.jsonl")); + let expected_cursor2: Cursor = serde_json::from_str(&format!("\"{ts}|{u1}\"")).unwrap(); + let expected_page2 = ConversationsPage { + items: vec![ConversationItem { + path: p1, + head: head(u1), + }], + next_cursor: Some(expected_cursor2.clone()), + num_scanned_files: 3, // scanned u3, u2 (anchor), u1 + reached_scan_cap: false, + }; + assert_eq!(page2, expected_page2); +}