Add Updated at time in resume picker (#4468)
<img width="639" height="281" alt="image" src="https://github.com/user-attachments/assets/92b2ad2b-9e18-4485-9b8d-d7056eb98651" />
This commit is contained in:
@@ -40,10 +40,24 @@ pub struct ConversationItem {
|
||||
pub head: Vec<serde_json::Value>,
|
||||
/// Last up to `TAIL_RECORD_LIMIT` JSONL response records parsed as JSON.
|
||||
pub tail: Vec<serde_json::Value>,
|
||||
/// RFC3339 timestamp string for when the session was created, if available.
|
||||
pub created_at: Option<String>,
|
||||
/// RFC3339 timestamp string for the most recent response in the tail, if available.
|
||||
pub updated_at: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct HeadTailSummary {
|
||||
head: Vec<serde_json::Value>,
|
||||
tail: Vec<serde_json::Value>,
|
||||
saw_session_meta: bool,
|
||||
saw_user_event: bool,
|
||||
created_at: Option<String>,
|
||||
updated_at: Option<String>,
|
||||
}
|
||||
|
||||
/// Hard cap to bound worst‑case work per request.
|
||||
const MAX_SCAN_FILES: usize = 100;
|
||||
const MAX_SCAN_FILES: usize = 10000;
|
||||
const HEAD_RECORD_LIMIT: usize = 10;
|
||||
const TAIL_RECORD_LIMIT: usize = 10;
|
||||
|
||||
@@ -179,13 +193,26 @@ async fn traverse_directories_for_paths(
|
||||
}
|
||||
// Read head and simultaneously detect message events within the same
|
||||
// first N JSONL records to avoid a second file read.
|
||||
let (head, tail, saw_session_meta, saw_user_event) =
|
||||
read_head_and_tail(&path, HEAD_RECORD_LIMIT, TAIL_RECORD_LIMIT)
|
||||
.await
|
||||
.unwrap_or((Vec::new(), Vec::new(), false, false));
|
||||
let summary = read_head_and_tail(&path, HEAD_RECORD_LIMIT, TAIL_RECORD_LIMIT)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
// Apply filters: must have session meta and at least one user message event
|
||||
if saw_session_meta && saw_user_event {
|
||||
items.push(ConversationItem { path, head, tail });
|
||||
if summary.saw_session_meta && summary.saw_user_event {
|
||||
let HeadTailSummary {
|
||||
head,
|
||||
tail,
|
||||
created_at,
|
||||
mut updated_at,
|
||||
..
|
||||
} = summary;
|
||||
updated_at = updated_at.or_else(|| created_at.clone());
|
||||
items.push(ConversationItem {
|
||||
path,
|
||||
head,
|
||||
tail,
|
||||
created_at,
|
||||
updated_at,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -293,17 +320,15 @@ async fn read_head_and_tail(
|
||||
path: &Path,
|
||||
head_limit: usize,
|
||||
tail_limit: usize,
|
||||
) -> io::Result<(Vec<serde_json::Value>, Vec<serde_json::Value>, bool, bool)> {
|
||||
) -> io::Result<HeadTailSummary> {
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
|
||||
let file = tokio::fs::File::open(path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut head: Vec<serde_json::Value> = Vec::new();
|
||||
let mut saw_session_meta = false;
|
||||
let mut saw_user_event = false;
|
||||
let mut summary = HeadTailSummary::default();
|
||||
|
||||
while head.len() < head_limit {
|
||||
while summary.head.len() < head_limit {
|
||||
let line_opt = lines.next_line().await?;
|
||||
let Some(line) = line_opt else { break };
|
||||
let trimmed = line.trim();
|
||||
@@ -316,14 +341,22 @@ async fn read_head_and_tail(
|
||||
|
||||
match rollout_line.item {
|
||||
RolloutItem::SessionMeta(session_meta_line) => {
|
||||
summary.created_at = summary
|
||||
.created_at
|
||||
.clone()
|
||||
.or_else(|| Some(rollout_line.timestamp.clone()));
|
||||
if let Ok(val) = serde_json::to_value(session_meta_line) {
|
||||
head.push(val);
|
||||
saw_session_meta = true;
|
||||
summary.head.push(val);
|
||||
summary.saw_session_meta = true;
|
||||
}
|
||||
}
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
summary.created_at = summary
|
||||
.created_at
|
||||
.clone()
|
||||
.or_else(|| Some(rollout_line.timestamp.clone()));
|
||||
if let Ok(val) = serde_json::to_value(item) {
|
||||
head.push(val);
|
||||
summary.head.push(val);
|
||||
}
|
||||
}
|
||||
RolloutItem::TurnContext(_) => {
|
||||
@@ -334,28 +367,30 @@ async fn read_head_and_tail(
|
||||
}
|
||||
RolloutItem::EventMsg(ev) => {
|
||||
if matches!(ev, EventMsg::UserMessage(_)) {
|
||||
saw_user_event = true;
|
||||
summary.saw_user_event = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let tail = if tail_limit == 0 {
|
||||
Vec::new()
|
||||
} else {
|
||||
read_tail_records(path, tail_limit).await?
|
||||
};
|
||||
|
||||
Ok((head, tail, saw_session_meta, saw_user_event))
|
||||
if tail_limit != 0 {
|
||||
let (tail, updated_at) = read_tail_records(path, tail_limit).await?;
|
||||
summary.tail = tail;
|
||||
summary.updated_at = updated_at;
|
||||
}
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
async fn read_tail_records(path: &Path, max_records: usize) -> io::Result<Vec<serde_json::Value>> {
|
||||
async fn read_tail_records(
|
||||
path: &Path,
|
||||
max_records: usize,
|
||||
) -> io::Result<(Vec<serde_json::Value>, Option<String>)> {
|
||||
use std::io::SeekFrom;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
|
||||
if max_records == 0 {
|
||||
return Ok(Vec::new());
|
||||
return Ok((Vec::new(), None));
|
||||
}
|
||||
|
||||
const CHUNK_SIZE: usize = 8192;
|
||||
@@ -363,24 +398,28 @@ async fn read_tail_records(path: &Path, max_records: usize) -> io::Result<Vec<se
|
||||
let mut file = tokio::fs::File::open(path).await?;
|
||||
let mut pos = file.seek(SeekFrom::End(0)).await?;
|
||||
if pos == 0 {
|
||||
return Ok(Vec::new());
|
||||
return Ok((Vec::new(), None));
|
||||
}
|
||||
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
let mut latest_timestamp: Option<String> = None;
|
||||
|
||||
loop {
|
||||
let slice_start = match (pos > 0, buffer.iter().position(|&b| b == b'\n')) {
|
||||
(true, Some(idx)) => idx + 1,
|
||||
_ => 0,
|
||||
};
|
||||
let tail = collect_last_response_values(&buffer[slice_start..], max_records);
|
||||
let (tail, newest_ts) = collect_last_response_values(&buffer[slice_start..], max_records);
|
||||
if latest_timestamp.is_none() {
|
||||
latest_timestamp = newest_ts.clone();
|
||||
}
|
||||
if tail.len() >= max_records || pos == 0 {
|
||||
return Ok(tail);
|
||||
return Ok((tail, latest_timestamp.or(newest_ts)));
|
||||
}
|
||||
|
||||
let read_size = CHUNK_SIZE.min(pos as usize);
|
||||
if read_size == 0 {
|
||||
return Ok(tail);
|
||||
return Ok((tail, latest_timestamp.or(newest_ts)));
|
||||
}
|
||||
pos -= read_size as u64;
|
||||
file.seek(SeekFrom::Start(pos)).await?;
|
||||
@@ -391,15 +430,19 @@ async fn read_tail_records(path: &Path, max_records: usize) -> io::Result<Vec<se
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_last_response_values(buffer: &[u8], max_records: usize) -> Vec<serde_json::Value> {
|
||||
fn collect_last_response_values(
|
||||
buffer: &[u8],
|
||||
max_records: usize,
|
||||
) -> (Vec<serde_json::Value>, Option<String>) {
|
||||
use std::borrow::Cow;
|
||||
|
||||
if buffer.is_empty() || max_records == 0 {
|
||||
return Vec::new();
|
||||
return (Vec::new(), None);
|
||||
}
|
||||
|
||||
let text: Cow<'_, str> = String::from_utf8_lossy(buffer);
|
||||
let mut collected_rev: Vec<serde_json::Value> = Vec::new();
|
||||
let mut latest_timestamp: Option<String> = None;
|
||||
for line in text.lines().rev() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
@@ -407,9 +450,13 @@ fn collect_last_response_values(buffer: &[u8], max_records: usize) -> Vec<serde_
|
||||
}
|
||||
let parsed: serde_json::Result<RolloutLine> = serde_json::from_str(trimmed);
|
||||
let Ok(rollout_line) = parsed else { continue };
|
||||
if let RolloutItem::ResponseItem(item) = rollout_line.item
|
||||
&& let Ok(val) = serde_json::to_value(item)
|
||||
let RolloutLine { timestamp, item } = rollout_line;
|
||||
if let RolloutItem::ResponseItem(item) = item
|
||||
&& let Ok(val) = serde_json::to_value(&item)
|
||||
{
|
||||
if latest_timestamp.is_none() {
|
||||
latest_timestamp = Some(timestamp.clone());
|
||||
}
|
||||
collected_rev.push(val);
|
||||
if collected_rev.len() == max_records {
|
||||
break;
|
||||
@@ -417,7 +464,7 @@ fn collect_last_response_values(buffer: &[u8], max_records: usize) -> Vec<serde_
|
||||
}
|
||||
}
|
||||
collected_rev.reverse();
|
||||
collected_rev
|
||||
(collected_rev, latest_timestamp)
|
||||
}
|
||||
|
||||
/// Locate a recorded conversation rollout file by its UUID string using the existing
|
||||
|
||||
@@ -159,16 +159,22 @@ async fn test_list_conversations_latest_first() {
|
||||
path: p1,
|
||||
head: head_3,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-01-03T12-00-00".into()),
|
||||
updated_at: Some("2025-01-03T12-00-00".into()),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p2,
|
||||
head: head_2,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-01-02T12-00-00".into()),
|
||||
updated_at: Some("2025-01-02T12-00-00".into()),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p3,
|
||||
head: head_1,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-01-01T12-00-00".into()),
|
||||
updated_at: Some("2025-01-01T12-00-00".into()),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor),
|
||||
@@ -235,11 +241,15 @@ async fn test_pagination_cursor() {
|
||||
path: p5,
|
||||
head: head_5,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-03-05T09-00-00".into()),
|
||||
updated_at: Some("2025-03-05T09-00-00".into()),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p4,
|
||||
head: head_4,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-03-04T09-00-00".into()),
|
||||
updated_at: Some("2025-03-04T09-00-00".into()),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor1.clone()),
|
||||
@@ -287,11 +297,15 @@ async fn test_pagination_cursor() {
|
||||
path: p3,
|
||||
head: head_3,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-03-03T09-00-00".into()),
|
||||
updated_at: Some("2025-03-03T09-00-00".into()),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p2,
|
||||
head: head_2,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-03-02T09-00-00".into()),
|
||||
updated_at: Some("2025-03-02T09-00-00".into()),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor2.clone()),
|
||||
@@ -324,6 +338,8 @@ async fn test_pagination_cursor() {
|
||||
path: p1,
|
||||
head: head_1,
|
||||
tail: Vec::new(),
|
||||
created_at: Some("2025-03-01T09-00-00".into()),
|
||||
updated_at: Some("2025-03-01T09-00-00".into()),
|
||||
}],
|
||||
next_cursor: Some(expected_cursor3),
|
||||
num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01
|
||||
@@ -367,6 +383,8 @@ async fn test_get_conversation_contents() {
|
||||
path: expected_path,
|
||||
head: expected_head,
|
||||
tail: Vec::new(),
|
||||
created_at: Some(ts.into()),
|
||||
updated_at: Some(ts.into()),
|
||||
}],
|
||||
next_cursor: Some(expected_cursor),
|
||||
num_scanned_files: 1,
|
||||
@@ -449,18 +467,23 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
|
||||
|
||||
let expected: Vec<serde_json::Value> = (total_messages - tail_len..total_messages)
|
||||
.map(|idx| {
|
||||
serde_json::to_value(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("reply-{idx}"),
|
||||
}],
|
||||
serde_json::json!({
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "output_text",
|
||||
"text": format!("reply-{idx}"),
|
||||
}
|
||||
],
|
||||
})
|
||||
.expect("serialize response item")
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(item.tail, expected);
|
||||
assert_eq!(item.created_at.as_deref(), Some(ts));
|
||||
let expected_updated = format!("{ts}-{last:02}", last = total_messages - 1);
|
||||
assert_eq!(item.updated_at.as_deref(), Some(expected_updated.as_str()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -526,18 +549,25 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
|
||||
|
||||
let expected: Vec<serde_json::Value> = (0..3)
|
||||
.map(|idx| {
|
||||
serde_json::to_value(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("short-{idx}"),
|
||||
}],
|
||||
serde_json::json!({
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "output_text",
|
||||
"text": format!("short-{idx}"),
|
||||
}
|
||||
],
|
||||
})
|
||||
.expect("serialize response item")
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(tail, &expected);
|
||||
let expected_updated = format!("{ts}-{last:02}", last = 2);
|
||||
assert_eq!(
|
||||
page.items[0].updated_at.as_deref(),
|
||||
Some(expected_updated.as_str())
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -615,18 +645,25 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
|
||||
|
||||
let expected: Vec<serde_json::Value> = (0..4)
|
||||
.map(|idx| {
|
||||
serde_json::to_value(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: format!("response-{idx}"),
|
||||
}],
|
||||
serde_json::json!({
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "output_text",
|
||||
"text": format!("response-{idx}"),
|
||||
}
|
||||
],
|
||||
})
|
||||
.expect("serialize response item")
|
||||
})
|
||||
.collect();
|
||||
|
||||
assert_eq!(tail, &expected);
|
||||
let expected_updated = format!("{ts}-{last:02}", last = 3);
|
||||
assert_eq!(
|
||||
page.items[0].updated_at.as_deref(),
|
||||
Some(expected_updated.as_str())
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -676,11 +713,15 @@ async fn test_stable_ordering_same_second_pagination() {
|
||||
path: p3,
|
||||
head: head(u3),
|
||||
tail: Vec::new(),
|
||||
created_at: Some(ts.to_string()),
|
||||
updated_at: Some(ts.to_string()),
|
||||
},
|
||||
ConversationItem {
|
||||
path: p2,
|
||||
head: head(u2),
|
||||
tail: Vec::new(),
|
||||
created_at: Some(ts.to_string()),
|
||||
updated_at: Some(ts.to_string()),
|
||||
},
|
||||
],
|
||||
next_cursor: Some(expected_cursor1.clone()),
|
||||
@@ -704,6 +745,8 @@ async fn test_stable_ordering_same_second_pagination() {
|
||||
path: p1,
|
||||
head: head(u1),
|
||||
tail: Vec::new(),
|
||||
created_at: Some(ts.to_string()),
|
||||
updated_at: Some(ts.to_string()),
|
||||
}],
|
||||
next_cursor: Some(expected_cursor2),
|
||||
num_scanned_files: 3, // scanned u3, u2 (anchor), u1
|
||||
|
||||
Reference in New Issue
Block a user