core(rollout): extract rollout module, add listing API, and return file heads (#1634)

- Move rollout persistence and listing into a dedicated module:
rollout/{recorder,list}.
- Expose lightweight conversation listing that returns file paths plus
the first 5 JSONL records for preview.
This commit is contained in:
Ahmed Ibrahim
2025-09-03 00:39:19 -07:00
committed by GitHub
parent 9ad2e726fc
commit d77b33ded7
6 changed files with 708 additions and 4 deletions

View File

@@ -0,0 +1,295 @@
use std::cmp::Reverse;
use std::io::{self};
use std::path::Path;
use std::path::PathBuf;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use super::SESSIONS_SUBDIR;
/// Returned page of conversation summaries.
#[derive(Debug, Default, PartialEq)]
pub struct ConversationsPage {
/// Conversation summaries ordered newest first.
pub items: Vec<ConversationItem>,
/// Opaque pagination token to resume after the last item, or `None` if end.
pub next_cursor: Option<Cursor>,
/// Total number of files touched while scanning this request.
pub num_scanned_files: usize,
/// True if a hard scan cap was hit; consider resuming with `next_cursor`.
pub reached_scan_cap: bool,
}
/// Summary information for a conversation rollout file.
#[derive(Debug, PartialEq)]
pub struct ConversationItem {
/// Absolute path to the rollout file.
pub path: PathBuf,
/// First up to 5 JSONL records parsed as JSON (includes meta line).
pub head: Vec<serde_json::Value>,
}
/// Hard cap to bound worstcase work per request.
const MAX_SCAN_FILES: usize = 50_000;
/// Pagination cursor identifying a file by timestamp and UUID.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor {
ts: OffsetDateTime,
id: Uuid,
}
impl Cursor {
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
Self { ts, id }
}
}
impl serde::Serialize for Cursor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let ts_str = self
.ts
.format(&format_description!(
"[year]-[month]-[day]T[hour]-[minute]-[second]"
))
.map_err(|e| serde::ser::Error::custom(format!("format error: {e}")))?;
serializer.serialize_str(&format!("{ts_str}|{}", self.id))
}
}
impl<'de> serde::Deserialize<'de> for Cursor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
parse_cursor(&s).ok_or_else(|| serde::de::Error::custom("invalid cursor"))
}
}
/// Retrieve recorded conversation file paths with token pagination. The returned `next_cursor`
/// can be supplied on the next call to resume after the last returned item, resilient to
/// concurrent new sessions being appended. Ordering is stable by timestamp desc, then UUID desc.
pub(crate) async fn get_conversations(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
) -> io::Result<ConversationsPage> {
let mut root = codex_home.to_path_buf();
root.push(SESSIONS_SUBDIR);
if !root.exists() {
return Ok(ConversationsPage {
items: Vec::new(),
next_cursor: None,
num_scanned_files: 0,
reached_scan_cap: false,
});
}
let anchor = cursor.cloned();
let result = traverse_directories_for_paths(root.clone(), page_size, anchor).await?;
Ok(result)
}
/// Load the full contents of a single conversation session file at `path`.
/// Returns the entire file contents as a String.
#[allow(dead_code)]
pub(crate) async fn get_conversation(path: &Path) -> io::Result<String> {
tokio::fs::read_to_string(path).await
}
/// Load conversation file paths from disk using directory traversal.
///
/// Directory layout: `~/.codex/sessions/YYYY/MM/DD/rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl`
/// Returned newest (latest) first.
async fn traverse_directories_for_paths(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
) -> io::Result<ConversationsPage> {
let mut items: Vec<ConversationItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
let mut anchor_passed = anchor.is_none();
let (anchor_ts, anchor_id) = match anchor {
Some(c) => (c.ts, c.id),
None => (OffsetDateTime::UNIX_EPOCH, Uuid::nil()),
};
let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u16>().ok()).await?;
'outer: for (_year, year_path) in year_dirs.iter() {
if scanned_files >= MAX_SCAN_FILES {
break;
}
let month_dirs = collect_dirs_desc(year_path, |s| s.parse::<u8>().ok()).await?;
for (_month, month_path) in month_dirs.iter() {
if scanned_files >= MAX_SCAN_FILES {
break 'outer;
}
let day_dirs = collect_dirs_desc(month_path, |s| s.parse::<u8>().ok()).await?;
for (_day, day_path) in day_dirs.iter() {
if scanned_files >= MAX_SCAN_FILES {
break 'outer;
}
let mut day_files = collect_files(day_path, |name_str, path| {
if !name_str.starts_with("rollout-") || !name_str.ends_with(".jsonl") {
return None;
}
parse_timestamp_uuid_from_filename(name_str)
.map(|(ts, id)| (ts, id, name_str.to_string(), path.to_path_buf()))
})
.await?;
// Stable ordering within the same second: (timestamp desc, uuid desc)
day_files.sort_by_key(|(ts, sid, _name_str, _path)| (Reverse(*ts), Reverse(*sid)));
for (ts, sid, _name_str, path) in day_files.into_iter() {
scanned_files += 1;
if scanned_files >= MAX_SCAN_FILES && items.len() >= page_size {
break 'outer;
}
if !anchor_passed {
if ts < anchor_ts || (ts == anchor_ts && sid < anchor_id) {
anchor_passed = true;
} else {
continue;
}
}
if items.len() == page_size {
break 'outer;
}
let head = read_first_jsonl_records(&path, 5).await.unwrap_or_default();
items.push(ConversationItem { path, head });
}
}
}
}
let next = build_next_cursor(&items);
Ok(ConversationsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap: scanned_files >= MAX_SCAN_FILES,
})
}
/// Pagination cursor token format: "<file_ts>|<uuid>" where `file_ts` matches the
/// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames.
/// The cursor orders files by timestamp desc, then UUID desc.
fn parse_cursor(token: &str) -> Option<Cursor> {
let (file_ts, uuid_str) = token.split_once('|')?;
let Ok(uuid) = Uuid::parse_str(uuid_str) else {
return None;
};
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(file_ts, format).ok()?.assume_utc();
Some(Cursor::new(ts, uuid))
}
fn build_next_cursor(items: &[ConversationItem]) -> Option<Cursor> {
let last = items.last()?;
let file_name = last.path.file_name()?.to_string_lossy();
let (ts, id) = parse_timestamp_uuid_from_filename(&file_name)?;
Some(Cursor::new(ts, id))
}
/// Collects immediate subdirectories of `parent`, parses their (string) names with `parse`,
/// and returns them sorted descending by the parsed key.
async fn collect_dirs_desc<T, F>(parent: &Path, parse: F) -> io::Result<Vec<(T, PathBuf)>>
where
T: Ord + Copy,
F: Fn(&str) -> Option<T>,
{
let mut dir = tokio::fs::read_dir(parent).await?;
let mut vec: Vec<(T, PathBuf)> = Vec::new();
while let Some(entry) = dir.next_entry().await? {
if entry
.file_type()
.await
.map(|ft| ft.is_dir())
.unwrap_or(false)
&& let Some(s) = entry.file_name().to_str()
&& let Some(v) = parse(s)
{
vec.push((v, entry.path()));
}
}
vec.sort_by_key(|(v, _)| Reverse(*v));
Ok(vec)
}
/// Collects files in a directory and parses them with `parse`.
async fn collect_files<T, F>(parent: &Path, parse: F) -> io::Result<Vec<T>>
where
F: Fn(&str, &Path) -> Option<T>,
{
let mut dir = tokio::fs::read_dir(parent).await?;
let mut collected: Vec<T> = Vec::new();
while let Some(entry) = dir.next_entry().await? {
if entry
.file_type()
.await
.map(|ft| ft.is_file())
.unwrap_or(false)
&& let Some(s) = entry.file_name().to_str()
&& let Some(v) = parse(s, &entry.path())
{
collected.push(v);
}
}
Ok(collected)
}
fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uuid)> {
// Expected: rollout-YYYY-MM-DDThh-mm-ss-<uuid>.jsonl
let core = name.strip_prefix("rollout-")?.strip_suffix(".jsonl")?;
// Scan from the right for a '-' such that the suffix parses as a UUID.
let (sep_idx, uuid) = core
.match_indices('-')
.rev()
.find_map(|(i, _)| Uuid::parse_str(&core[i + 1..]).ok().map(|u| (i, u)))?;
let ts_str = &core[..sep_idx];
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let ts = PrimitiveDateTime::parse(ts_str, format).ok()?.assume_utc();
Some((ts, uuid))
}
async fn read_first_jsonl_records(
path: &Path,
max_records: usize,
) -> io::Result<Vec<serde_json::Value>> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut head: Vec<serde_json::Value> = Vec::new();
while head.len() < max_records {
let line_opt = lines.next_line().await?;
let Some(line) = line_opt else { break };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
head.push(v);
}
}
Ok(head)
}

View File

@@ -0,0 +1,12 @@
//! Rollout module: persistence and discovery of session rollout files.
pub(crate) const SESSIONS_SUBDIR: &str = "sessions";
pub mod list;
pub mod recorder;
pub use recorder::RolloutRecorder;
pub use recorder::SessionStateSnapshot;
#[cfg(test)]
pub mod tests;

View File

@@ -0,0 +1,357 @@
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
use std::fs::File;
use std::fs::{self};
use std::io::Error as IoError;
use std::path::Path;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
use uuid::Uuid;
use super::SESSIONS_SUBDIR;
use super::list::ConversationsPage;
use super::list::Cursor;
use super::list::get_conversations;
use crate::config::Config;
use crate::conversation_manager::InitialHistory;
use crate::git_info::GitInfo;
use crate::git_info::collect_git_info;
use codex_protocol::models::ResponseItem;
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct SessionMeta {
pub id: Uuid,
pub timestamp: String,
pub instructions: Option<String>,
}
#[derive(Serialize)]
struct SessionMetaWithGit {
#[serde(flatten)]
meta: SessionMeta,
#[serde(skip_serializing_if = "Option::is_none")]
git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct SessionStateSnapshot {}
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct SavedSession {
pub session: SessionMeta,
#[serde(default)]
pub items: Vec<ResponseItem>,
#[serde(default)]
pub state: SessionStateSnapshot,
pub session_id: Uuid,
}
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
/// every update.
///
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
///
/// ```ignore
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
/// $ fx ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
/// ```
#[derive(Clone)]
pub struct RolloutRecorder {
tx: Sender<RolloutCmd>,
}
enum RolloutCmd {
AddItems(Vec<ResponseItem>),
UpdateState(SessionStateSnapshot),
Shutdown { ack: oneshot::Sender<()> },
}
impl RolloutRecorder {
#[allow(dead_code)]
/// List conversations (rollout files) under the provided Codex home directory.
pub async fn list_conversations(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
) -> std::io::Result<ConversationsPage> {
get_conversations(codex_home, page_size, cursor).await
}
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
/// cannot be created or the rollout file cannot be opened we return the
/// error so the caller can decide whether to disable persistence.
pub async fn new(
config: &Config,
uuid: Uuid,
instructions: Option<String>,
) -> std::io::Result<Self> {
let LogFileInfo {
file,
session_id,
timestamp,
} = create_log_file(config, uuid)?;
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = timestamp
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
// Clone the cwd for the spawned task to collect git info asynchronously
let cwd = config.cwd.clone();
// A reasonably-sized bounded channel. If the buffer fills up the send
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
// Spawn a Tokio task that owns the file handle and performs async
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(
tokio::fs::File::from_std(file),
rx,
Some(SessionMeta {
timestamp,
id: session_id,
instructions,
}),
cwd,
));
Ok(Self { tx })
}
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
match item {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
ResponseItem::Message { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::Reasoning { .. } => filtered.push(item.clone()),
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => {
// These should never be serialized.
continue;
}
}
}
if filtered.is_empty() {
return Ok(());
}
self.tx
.send(RolloutCmd::AddItems(filtered))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::UpdateState(state))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
}
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
info!("Resuming rollout from {path:?}");
let text = tokio::fs::read_to_string(path).await?;
let mut lines = text.lines();
let _ = lines
.next()
.ok_or_else(|| IoError::other("empty session file"))?;
let mut items = Vec::new();
for line in lines {
if line.trim().is_empty() {
continue;
}
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
if v.get("record_type")
.and_then(|rt| rt.as_str())
.map(|s| s == "state")
.unwrap_or(false)
{
continue;
}
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => match item {
ResponseItem::Message { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::Reasoning { .. } => items.push(item),
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => {}
},
Err(e) => {
warn!("failed to parse item: {v:?}, error: {e}");
}
}
}
info!("Resumed rollout successfully from {path:?}");
if items.is_empty() {
Ok(InitialHistory::New)
} else {
Ok(InitialHistory::Resumed(items))
}
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
Ok(_) => rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}"))),
Err(e) => {
warn!("failed to send rollout shutdown command: {e}");
Err(IoError::other(format!(
"failed to send rollout shutdown command: {e}"
)))
}
}
}
}
struct LogFileInfo {
/// Opened file handle to the rollout file.
file: File,
/// Session ID (also embedded in filename).
session_id: Uuid,
/// Timestamp for the start of the session.
timestamp: OffsetDateTime,
}
fn create_log_file(config: &Config, session_id: Uuid) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let mut dir = config.codex_home.clone();
dir.push(SESSIONS_SUBDIR);
dir.push(timestamp.year().to_string());
dir.push(format!("{:02}", u8::from(timestamp.month())));
dir.push(format!("{:02}", timestamp.day()));
fs::create_dir_all(&dir)?;
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
// compatibility with filesystems that do not allow colons in filenames.
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let date_str = timestamp
.format(format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let filename = format!("rollout-{date_str}-{session_id}.jsonl");
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(&path)?;
Ok(LogFileInfo {
file,
session_id,
timestamp,
})
}
async fn rollout_writer(
file: tokio::fs::File,
mut rx: mpsc::Receiver<RolloutCmd>,
mut meta: Option<SessionMeta>,
cwd: std::path::PathBuf,
) -> std::io::Result<()> {
let mut writer = JsonlWriter { file };
// If we have a meta, collect git info asynchronously and write meta first
if let Some(session_meta) = meta.take() {
let git_info = collect_git_info(&cwd).await;
let session_meta_with_git = SessionMetaWithGit {
meta: session_meta,
git: git_info,
};
// Write the SessionMeta as the first item in the file
writer.write_line(&session_meta_with_git).await?;
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
for item in items {
match item {
ResponseItem::Message { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::Reasoning { .. } => {
writer.write_line(&item).await?;
}
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => {}
}
}
}
RolloutCmd::UpdateState(state) => {
#[derive(Serialize)]
struct StateLine<'a> {
record_type: &'static str,
#[serde(flatten)]
state: &'a SessionStateSnapshot,
}
writer
.write_line(&StateLine {
record_type: "state",
state: &state,
})
.await?;
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}
}
}
Ok(())
}
struct JsonlWriter {
file: tokio::fs::File,
}
impl JsonlWriter {
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
let mut json = serde_json::to_string(item)?;
json.push('\n');
let _ = self.file.write_all(json.as_bytes()).await;
self.file.flush().await?;
Ok(())
}
}

View File

@@ -0,0 +1,384 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::fs::File;
use std::fs::{self};
use std::io::Write;
use std::path::Path;
use tempfile::TempDir;
use time::OffsetDateTime;
use time::PrimitiveDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use crate::rollout::list::ConversationItem;
use crate::rollout::list::ConversationsPage;
use crate::rollout::list::Cursor;
use crate::rollout::list::get_conversation;
use crate::rollout::list::get_conversations;
fn write_session_file(
root: &Path,
ts_str: &str,
uuid: Uuid,
num_records: usize,
) -> std::io::Result<(OffsetDateTime, Uuid)> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let dt = PrimitiveDateTime::parse(ts_str, format)
.unwrap()
.assume_utc();
let dir = root
.join("sessions")
.join(format!("{:04}", dt.year()))
.join(format!("{:02}", u8::from(dt.month())))
.join(format!("{:02}", dt.day()));
fs::create_dir_all(&dir)?;
let filename = format!("rollout-{ts_str}-{uuid}.jsonl");
let file_path = dir.join(filename);
let mut file = File::create(file_path)?;
let meta = serde_json::json!({
"timestamp": ts_str,
"id": uuid.to_string()
});
writeln!(file, "{meta}")?;
for i in 0..num_records {
let rec = serde_json::json!({
"record_type": "response",
"index": i
});
writeln!(file, "{rec}")?;
}
Ok((dt, uuid))
}
#[tokio::test]
async fn test_list_conversations_latest_first() {
let temp = TempDir::new().unwrap();
let home = temp.path();
// Fixed UUIDs for deterministic expectations
let u1 = Uuid::from_u128(1);
let u2 = Uuid::from_u128(2);
let u3 = Uuid::from_u128(3);
// Create three sessions across three days
write_session_file(home, "2025-01-01T12-00-00", u1, 3).unwrap();
write_session_file(home, "2025-01-02T12-00-00", u2, 3).unwrap();
write_session_file(home, "2025-01-03T12-00-00", u3, 3).unwrap();
let page = get_conversations(home, 10, None).await.unwrap();
// Build expected objects
let p1 = home
.join("sessions")
.join("2025")
.join("01")
.join("03")
.join(format!("rollout-2025-01-03T12-00-00-{u3}.jsonl"));
let p2 = home
.join("sessions")
.join("2025")
.join("01")
.join("02")
.join(format!("rollout-2025-01-02T12-00-00-{u2}.jsonl"));
let p3 = home
.join("sessions")
.join("2025")
.join("01")
.join("01")
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
let head_3 = vec![
serde_json::json!({"timestamp": "2025-01-03T12-00-00", "id": u3.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_2 = vec![
serde_json::json!({"timestamp": "2025-01-02T12-00-00", "id": u2.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_1 = vec![
serde_json::json!({"timestamp": "2025-01-01T12-00-00", "id": u1.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let expected_cursor: Cursor =
serde_json::from_str(&format!("\"2025-01-01T12-00-00|{u1}\"")).unwrap();
let expected = ConversationsPage {
items: vec![
ConversationItem {
path: p1,
head: head_3,
},
ConversationItem {
path: p2,
head: head_2,
},
ConversationItem {
path: p3,
head: head_1,
},
],
next_cursor: Some(expected_cursor),
num_scanned_files: 3,
reached_scan_cap: false,
};
assert_eq!(page, expected);
}
#[tokio::test]
async fn test_pagination_cursor() {
let temp = TempDir::new().unwrap();
let home = temp.path();
// Fixed UUIDs for deterministic expectations
let u1 = Uuid::from_u128(11);
let u2 = Uuid::from_u128(22);
let u3 = Uuid::from_u128(33);
let u4 = Uuid::from_u128(44);
let u5 = Uuid::from_u128(55);
// Oldest to newest
write_session_file(home, "2025-03-01T09-00-00", u1, 1).unwrap();
write_session_file(home, "2025-03-02T09-00-00", u2, 1).unwrap();
write_session_file(home, "2025-03-03T09-00-00", u3, 1).unwrap();
write_session_file(home, "2025-03-04T09-00-00", u4, 1).unwrap();
write_session_file(home, "2025-03-05T09-00-00", u5, 1).unwrap();
let page1 = get_conversations(home, 2, None).await.unwrap();
let p5 = home
.join("sessions")
.join("2025")
.join("03")
.join("05")
.join(format!("rollout-2025-03-05T09-00-00-{u5}.jsonl"));
let p4 = home
.join("sessions")
.join("2025")
.join("03")
.join("04")
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
let head_5 = vec![
serde_json::json!({"timestamp": "2025-03-05T09-00-00", "id": u5.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_4 = vec![
serde_json::json!({"timestamp": "2025-03-04T09-00-00", "id": u4.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor1: Cursor =
serde_json::from_str(&format!("\"2025-03-04T09-00-00|{u4}\"")).unwrap();
let expected_page1 = ConversationsPage {
items: vec![
ConversationItem {
path: p5,
head: head_5,
},
ConversationItem {
path: p4,
head: head_4,
},
],
next_cursor: Some(expected_cursor1.clone()),
num_scanned_files: 3, // scanned 05, 04, and peeked at 03 before breaking
reached_scan_cap: false,
};
assert_eq!(page1, expected_page1);
let page2 = get_conversations(home, 2, page1.next_cursor.as_ref())
.await
.unwrap();
let p3 = home
.join("sessions")
.join("2025")
.join("03")
.join("03")
.join(format!("rollout-2025-03-03T09-00-00-{u3}.jsonl"));
let p2 = home
.join("sessions")
.join("2025")
.join("03")
.join("02")
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
let head_3 = vec![
serde_json::json!({"timestamp": "2025-03-03T09-00-00", "id": u3.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_2 = vec![
serde_json::json!({"timestamp": "2025-03-02T09-00-00", "id": u2.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor2: Cursor =
serde_json::from_str(&format!("\"2025-03-02T09-00-00|{u2}\"")).unwrap();
let expected_page2 = ConversationsPage {
items: vec![
ConversationItem {
path: p3,
head: head_3,
},
ConversationItem {
path: p2,
head: head_2,
},
],
next_cursor: Some(expected_cursor2.clone()),
num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02, and peeked at 01
reached_scan_cap: false,
};
assert_eq!(page2, expected_page2);
let page3 = get_conversations(home, 2, page2.next_cursor.as_ref())
.await
.unwrap();
let p1 = home
.join("sessions")
.join("2025")
.join("03")
.join("01")
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
let head_1 = vec![
serde_json::json!({"timestamp": "2025-03-01T09-00-00", "id": u1.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor3: Cursor =
serde_json::from_str(&format!("\"2025-03-01T09-00-00|{u1}\"")).unwrap();
let expected_page3 = ConversationsPage {
items: vec![ConversationItem {
path: p1,
head: head_1,
}],
next_cursor: Some(expected_cursor3.clone()),
num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01
reached_scan_cap: false,
};
assert_eq!(page3, expected_page3);
}
#[tokio::test]
async fn test_get_conversation_contents() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let uuid = Uuid::new_v4();
let ts = "2025-04-01T10-30-00";
write_session_file(home, ts, uuid, 2).unwrap();
let page = get_conversations(home, 1, None).await.unwrap();
let path = &page.items[0].path;
let content = get_conversation(path).await.unwrap();
// Page equality (single item)
let expected_path = home
.join("sessions")
.join("2025")
.join("04")
.join("01")
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
let expected_head = vec![
serde_json::json!({"timestamp": ts, "id": uuid.to_string()}),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
];
let expected_cursor: Cursor = serde_json::from_str(&format!("\"{ts}|{uuid}\"")).unwrap();
let expected_page = ConversationsPage {
items: vec![ConversationItem {
path: expected_path.clone(),
head: expected_head,
}],
next_cursor: Some(expected_cursor),
num_scanned_files: 1,
reached_scan_cap: false,
};
assert_eq!(page, expected_page);
// Entire file contents equality
let meta = serde_json::json!({"timestamp": ts, "id": uuid.to_string()});
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
let expected_content = format!("{meta}\n{rec0}\n{rec1}\n");
assert_eq!(content, expected_content);
}
#[tokio::test]
async fn test_stable_ordering_same_second_pagination() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let ts = "2025-07-01T00-00-00";
let u1 = Uuid::from_u128(1);
let u2 = Uuid::from_u128(2);
let u3 = Uuid::from_u128(3);
write_session_file(home, ts, u1, 0).unwrap();
write_session_file(home, ts, u2, 0).unwrap();
write_session_file(home, ts, u3, 0).unwrap();
let page1 = get_conversations(home, 2, None).await.unwrap();
let p3 = home
.join("sessions")
.join("2025")
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u3}.jsonl"));
let p2 = home
.join("sessions")
.join("2025")
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
let head = |u: Uuid| -> Vec<serde_json::Value> {
vec![serde_json::json!({"timestamp": ts, "id": u.to_string()})]
};
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
let expected_page1 = ConversationsPage {
items: vec![
ConversationItem {
path: p3,
head: head(u3),
},
ConversationItem {
path: p2,
head: head(u2),
},
],
next_cursor: Some(expected_cursor1.clone()),
num_scanned_files: 3, // scanned u3, u2, peeked u1
reached_scan_cap: false,
};
assert_eq!(page1, expected_page1);
let page2 = get_conversations(home, 2, page1.next_cursor.as_ref())
.await
.unwrap();
let p1 = home
.join("sessions")
.join("2025")
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u1}.jsonl"));
let expected_cursor2: Cursor = serde_json::from_str(&format!("\"{ts}|{u1}\"")).unwrap();
let expected_page2 = ConversationsPage {
items: vec![ConversationItem {
path: p1,
head: head(u1),
}],
next_cursor: Some(expected_cursor2.clone()),
num_scanned_files: 3, // scanned u3, u2 (anchor), u1
reached_scan_cap: false,
};
assert_eq!(page2, expected_page2);
}