[Core]: add tail in the rollout data (#4461)
This will help us show the conversation tail and last updated timestamp.
This commit is contained in:
@@ -36,13 +36,16 @@ pub struct ConversationsPage {
|
||||
pub struct ConversationItem {
|
||||
/// Absolute path to the rollout file.
|
||||
pub path: PathBuf,
|
||||
/// First up to 5 JSONL records parsed as JSON (includes meta line).
|
||||
/// First up to `HEAD_RECORD_LIMIT` JSONL records parsed as JSON (includes meta line).
|
||||
pub head: Vec<serde_json::Value>,
|
||||
/// Last up to `TAIL_RECORD_LIMIT` JSONL response records parsed as JSON.
|
||||
pub tail: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Hard cap to bound worst‑case work per request.
|
||||
const MAX_SCAN_FILES: usize = 100;
|
||||
const HEAD_RECORD_LIMIT: usize = 10;
|
||||
const TAIL_RECORD_LIMIT: usize = 10;
|
||||
|
||||
/// Pagination cursor identifying a file by timestamp and UUID.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -176,13 +179,13 @@ async fn traverse_directories_for_paths(
|
||||
}
|
||||
// Read head and simultaneously detect message events within the same
|
||||
// first N JSONL records to avoid a second file read.
|
||||
let (head, saw_session_meta, saw_user_event) =
|
||||
read_head_and_flags(&path, HEAD_RECORD_LIMIT)
|
||||
let (head, tail, saw_session_meta, saw_user_event) =
|
||||
read_head_and_tail(&path, HEAD_RECORD_LIMIT, TAIL_RECORD_LIMIT)
|
||||
.await
|
||||
.unwrap_or((Vec::new(), false, false));
|
||||
.unwrap_or((Vec::new(), Vec::new(), false, false));
|
||||
// Apply filters: must have session meta and at least one user message event
|
||||
if saw_session_meta && saw_user_event {
|
||||
items.push(ConversationItem { path, head });
|
||||
items.push(ConversationItem { path, head, tail });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -286,10 +289,11 @@ fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uui
|
||||
Some((ts, uuid))
|
||||
}
|
||||
|
||||
async fn read_head_and_flags(
|
||||
async fn read_head_and_tail(
|
||||
path: &Path,
|
||||
max_records: usize,
|
||||
) -> io::Result<(Vec<serde_json::Value>, bool, bool)> {
|
||||
head_limit: usize,
|
||||
tail_limit: usize,
|
||||
) -> io::Result<(Vec<serde_json::Value>, Vec<serde_json::Value>, bool, bool)> {
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
@@ -299,7 +303,7 @@ async fn read_head_and_flags(
|
||||
let mut saw_session_meta = false;
|
||||
let mut saw_user_event = false;
|
||||
|
||||
while head.len() < max_records {
|
||||
while head.len() < head_limit {
|
||||
let line_opt = lines.next_line().await?;
|
||||
let Some(line) = line_opt else { break };
|
||||
let trimmed = line.trim();
|
||||
@@ -336,7 +340,84 @@ async fn read_head_and_flags(
|
||||
}
|
||||
}
|
||||
|
||||
Ok((head, saw_session_meta, saw_user_event))
|
||||
let tail = if tail_limit == 0 {
|
||||
Vec::new()
|
||||
} else {
|
||||
read_tail_records(path, tail_limit).await?
|
||||
};
|
||||
|
||||
Ok((head, tail, saw_session_meta, saw_user_event))
|
||||
}
|
||||
|
||||
async fn read_tail_records(path: &Path, max_records: usize) -> io::Result<Vec<serde_json::Value>> {
|
||||
use std::io::SeekFrom;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
|
||||
if max_records == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
const CHUNK_SIZE: usize = 8192;
|
||||
|
||||
let mut file = tokio::fs::File::open(path).await?;
|
||||
let mut pos = file.seek(SeekFrom::End(0)).await?;
|
||||
if pos == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
|
||||
loop {
|
||||
let slice_start = match (pos > 0, buffer.iter().position(|&b| b == b'\n')) {
|
||||
(true, Some(idx)) => idx + 1,
|
||||
_ => 0,
|
||||
};
|
||||
let tail = collect_last_response_values(&buffer[slice_start..], max_records);
|
||||
if tail.len() >= max_records || pos == 0 {
|
||||
return Ok(tail);
|
||||
}
|
||||
|
||||
let read_size = CHUNK_SIZE.min(pos as usize);
|
||||
if read_size == 0 {
|
||||
return Ok(tail);
|
||||
}
|
||||
pos -= read_size as u64;
|
||||
file.seek(SeekFrom::Start(pos)).await?;
|
||||
let mut chunk = vec![0; read_size];
|
||||
file.read_exact(&mut chunk).await?;
|
||||
chunk.extend_from_slice(&buffer);
|
||||
buffer = chunk;
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_last_response_values(buffer: &[u8], max_records: usize) -> Vec<serde_json::Value> {
|
||||
use std::borrow::Cow;
|
||||
|
||||
if buffer.is_empty() || max_records == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let text: Cow<'_, str> = String::from_utf8_lossy(buffer);
|
||||
let mut collected_rev: Vec<serde_json::Value> = Vec::new();
|
||||
for line in text.lines().rev() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let parsed: serde_json::Result<RolloutLine> = serde_json::from_str(trimmed);
|
||||
let Ok(rollout_line) = parsed else { continue };
|
||||
if let RolloutItem::ResponseItem(item) = rollout_line.item
|
||||
&& let Ok(val) = serde_json::to_value(item)
|
||||
{
|
||||
collected_rev.push(val);
|
||||
if collected_rev.len() == max_records {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
collected_rev.reverse();
|
||||
collected_rev
|
||||
}
|
||||
|
||||
/// Locate a recorded conversation rollout file by its UUID string using the existing
|
||||
|
||||
@@ -17,6 +17,18 @@ use crate::rollout::list::ConversationsPage;
|
||||
use crate::rollout::list::Cursor;
|
||||
use crate::rollout::list::get_conversation;
|
||||
use crate::rollout::list::get_conversations;
|
||||
use anyhow::Result;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::InputMessageKind;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
|
||||
fn write_session_file(
|
||||
root: &Path,
|
||||
@@ -146,14 +158,17 @@ async fn test_list_conversations_latest_first() {
|
||||
ConversationItem {
|
||||
path: p1,
|
||||
head: head_3,
|
||||
tail: Vec::new(),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p2,
|
||||
head: head_2,
|
||||
tail: Vec::new(),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p3,
|
||||
head: head_1,
|
||||
tail: Vec::new(),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor),
|
||||
@@ -219,10 +234,12 @@ async fn test_pagination_cursor() {
|
||||
ConversationItem {
|
||||
path: p5,
|
||||
head: head_5,
|
||||
tail: Vec::new(),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p4,
|
||||
head: head_4,
|
||||
tail: Vec::new(),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor1.clone()),
|
||||
@@ -269,10 +286,12 @@ async fn test_pagination_cursor() {
|
||||
ConversationItem {
|
||||
path: p3,
|
||||
head: head_3,
|
||||
tail: Vec::new(),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p2,
|
||||
head: head_2,
|
||||
tail: Vec::new(),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor2.clone()),
|
||||
@@ -304,6 +323,7 @@ async fn test_pagination_cursor() {
|
||||
items: vec![ConversationItem {
|
||||
path: p1,
|
||||
head: head_1,
|
||||
tail: Vec::new(),
|
||||
}],
|
||||
next_cursor: Some(expected_cursor3),
|
||||
num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01
|
||||
@@ -346,6 +366,7 @@ async fn test_get_conversation_contents() {
|
||||
items: vec![ConversationItem {
|
||||
path: expected_path,
|
||||
head: expected_head,
|
||||
tail: Vec::new(),
|
||||
}],
|
||||
next_cursor: Some(expected_cursor),
|
||||
num_scanned_files: 1,
|
||||
@@ -366,6 +387,250 @@ async fn test_get_conversation_contents() {
|
||||
assert_eq!(content, expected_content);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tail_includes_last_response_items() -> Result<()> {
|
||||
let temp = TempDir::new().unwrap();
|
||||
let home = temp.path();
|
||||
|
||||
let ts = "2025-06-01T08-00-00";
|
||||
let uuid = Uuid::from_u128(42);
|
||||
let day_dir = home.join("sessions").join("2025").join("06").join("01");
|
||||
fs::create_dir_all(&day_dir)?;
|
||||
let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
|
||||
let mut file = File::create(&file_path)?;
|
||||
|
||||
let conversation_id = ConversationId::from_string(&uuid.to_string())?;
|
||||
let meta_line = RolloutLine {
|
||||
timestamp: ts.to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: conversation_id,
|
||||
timestamp: ts.to_string(),
|
||||
instructions: None,
|
||||
cwd: ".".into(),
|
||||
originator: "test_originator".into(),
|
||||
cli_version: "test_version".into(),
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&meta_line)?)?;
|
||||
|
||||
let user_event_line = RolloutLine {
|
||||
timestamp: ts.to_string(),
|
||||
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "hello".into(),
|
||||
kind: Some(InputMessageKind::Plain),
|
||||
images: None,
|
||||
})),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?;
|
||||
|
||||
let total_messages = 12usize;
|
||||
for idx in 0..total_messages {
|
||||
let response_line = RolloutLine {
|
||||
timestamp: format!("{ts}-{idx:02}"),
|
||||
item: RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("reply-{idx}"),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&response_line)?)?;
|
||||
}
|
||||
drop(file);
|
||||
|
||||
let page = get_conversations(home, 1, None).await?;
|
||||
let item = page.items.first().expect("conversation item");
|
||||
let tail_len = item.tail.len();
|
||||
assert_eq!(tail_len, 10usize.min(total_messages));
|
||||
|
||||
let expected: Vec<serde_json::Value> = (total_messages - tail_len..total_messages)
|
||||
.map(|idx| {
|
||||
serde_json::to_value(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("reply-{idx}"),
|
||||
}],
|
||||
})
|
||||
.expect("serialize response item")
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(item.tail, expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tail_handles_short_sessions() -> Result<()> {
|
||||
let temp = TempDir::new().unwrap();
|
||||
let home = temp.path();
|
||||
|
||||
let ts = "2025-06-02T08-30-00";
|
||||
let uuid = Uuid::from_u128(7);
|
||||
let day_dir = home.join("sessions").join("2025").join("06").join("02");
|
||||
fs::create_dir_all(&day_dir)?;
|
||||
let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
|
||||
let mut file = File::create(&file_path)?;
|
||||
|
||||
let conversation_id = ConversationId::from_string(&uuid.to_string())?;
|
||||
let meta_line = RolloutLine {
|
||||
timestamp: ts.to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: conversation_id,
|
||||
timestamp: ts.to_string(),
|
||||
instructions: None,
|
||||
cwd: ".".into(),
|
||||
originator: "test_originator".into(),
|
||||
cli_version: "test_version".into(),
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&meta_line)?)?;
|
||||
|
||||
let user_event_line = RolloutLine {
|
||||
timestamp: ts.to_string(),
|
||||
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "hi".into(),
|
||||
kind: Some(InputMessageKind::Plain),
|
||||
images: None,
|
||||
})),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?;
|
||||
|
||||
for idx in 0..3 {
|
||||
let response_line = RolloutLine {
|
||||
timestamp: format!("{ts}-{idx:02}"),
|
||||
item: RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("short-{idx}"),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&response_line)?)?;
|
||||
}
|
||||
drop(file);
|
||||
|
||||
let page = get_conversations(home, 1, None).await?;
|
||||
let tail = &page.items.first().expect("conversation item").tail;
|
||||
|
||||
assert_eq!(tail.len(), 3);
|
||||
|
||||
let expected: Vec<serde_json::Value> = (0..3)
|
||||
.map(|idx| {
|
||||
serde_json::to_value(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("short-{idx}"),
|
||||
}],
|
||||
})
|
||||
.expect("serialize response item")
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(tail, &expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tail_skips_trailing_non_responses() -> Result<()> {
|
||||
let temp = TempDir::new().unwrap();
|
||||
let home = temp.path();
|
||||
|
||||
let ts = "2025-06-03T10-00-00";
|
||||
let uuid = Uuid::from_u128(11);
|
||||
let day_dir = home.join("sessions").join("2025").join("06").join("03");
|
||||
fs::create_dir_all(&day_dir)?;
|
||||
let file_path = day_dir.join(format!("rollout-{ts}-{uuid}.jsonl"));
|
||||
let mut file = File::create(&file_path)?;
|
||||
|
||||
let conversation_id = ConversationId::from_string(&uuid.to_string())?;
|
||||
let meta_line = RolloutLine {
|
||||
timestamp: ts.to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: conversation_id,
|
||||
timestamp: ts.to_string(),
|
||||
instructions: None,
|
||||
cwd: ".".into(),
|
||||
originator: "test_originator".into(),
|
||||
cli_version: "test_version".into(),
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&meta_line)?)?;
|
||||
|
||||
let user_event_line = RolloutLine {
|
||||
timestamp: ts.to_string(),
|
||||
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "hello".into(),
|
||||
kind: Some(InputMessageKind::Plain),
|
||||
images: None,
|
||||
})),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&user_event_line)?)?;
|
||||
|
||||
for idx in 0..4 {
|
||||
let response_line = RolloutLine {
|
||||
timestamp: format!("{ts}-{idx:02}"),
|
||||
item: RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("response-{idx}"),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&response_line)?)?;
|
||||
}
|
||||
|
||||
let compacted_line = RolloutLine {
|
||||
timestamp: format!("{ts}-compacted"),
|
||||
item: RolloutItem::Compacted(CompactedItem {
|
||||
message: "compacted".into(),
|
||||
}),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&compacted_line)?)?;
|
||||
|
||||
let shutdown_event = RolloutLine {
|
||||
timestamp: format!("{ts}-shutdown"),
|
||||
item: RolloutItem::EventMsg(EventMsg::ShutdownComplete),
|
||||
};
|
||||
writeln!(file, "{}", serde_json::to_string(&shutdown_event)?)?;
|
||||
drop(file);
|
||||
|
||||
let page = get_conversations(home, 1, None).await?;
|
||||
let tail = &page.items.first().expect("conversation item").tail;
|
||||
|
||||
let expected: Vec<serde_json::Value> = (0..4)
|
||||
.map(|idx| {
|
||||
serde_json::to_value(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("response-{idx}"),
|
||||
}],
|
||||
})
|
||||
.expect("serialize response item")
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(tail, &expected);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_stable_ordering_same_second_pagination() {
|
||||
let temp = TempDir::new().unwrap();
|
||||
@@ -410,10 +675,12 @@ async fn test_stable_ordering_same_second_pagination() {
|
||||
ConversationItem {
|
||||
path: p3,
|
||||
head: head(u3),
|
||||
tail: Vec::new(),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p2,
|
||||
head: head(u2),
|
||||
tail: Vec::new(),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor1.clone()),
|
||||
@@ -436,6 +703,7 @@ async fn test_stable_ordering_same_second_pagination() {
|
||||
items: vec![ConversationItem {
|
||||
path: p1,
|
||||
head: head(u1),
|
||||
tail: Vec::new(),
|
||||
}],
|
||||
next_cursor: Some(expected_cursor2),
|
||||
num_scanned_files: 3, // scanned u3, u2 (anchor), u1
|
||||
|
||||
@@ -804,6 +804,7 @@ mod tests {
|
||||
ConversationItem {
|
||||
path: PathBuf::from(path),
|
||||
head: head_with_ts_and_user_text(ts, &[preview]),
|
||||
tail: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -863,10 +864,12 @@ mod tests {
|
||||
let a = ConversationItem {
|
||||
path: PathBuf::from("/tmp/a.jsonl"),
|
||||
head: head_with_ts_and_user_text("2025-01-01T00:00:00Z", &["A"]),
|
||||
tail: Vec::new(),
|
||||
};
|
||||
let b = ConversationItem {
|
||||
path: PathBuf::from("/tmp/b.jsonl"),
|
||||
head: head_with_ts_and_user_text("2025-01-02T00:00:00Z", &["B"]),
|
||||
tail: Vec::new(),
|
||||
};
|
||||
let rows = rows_from_items(vec![a, b]);
|
||||
assert_eq!(rows.len(), 2);
|
||||
|
||||
Reference in New Issue
Block a user