2025-07-18 17:04:04 -07:00
|
|
|
|
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
use std::fs::File;
|
|
|
|
|
|
use std::fs::{self};
|
|
|
|
|
|
use std::io::Error as IoError;
|
2025-07-18 17:04:04 -07:00
|
|
|
|
use std::path::Path;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
2025-07-18 17:04:04 -07:00
|
|
|
|
use serde::Deserialize;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
use serde::Serialize;
|
2025-07-18 17:04:04 -07:00
|
|
|
|
use serde_json::Value;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
use time::OffsetDateTime;
|
|
|
|
|
|
use time::format_description::FormatItem;
|
|
|
|
|
|
use time::macros::format_description;
|
|
|
|
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
|
|
use tokio::sync::mpsc::Sender;
|
|
|
|
|
|
use tokio::sync::mpsc::{self};
|
2025-07-18 17:04:04 -07:00
|
|
|
|
use tracing::info;
|
2025-07-23 10:37:45 -07:00
|
|
|
|
use tracing::warn;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
2025-05-15 00:30:13 -07:00
|
|
|
|
use crate::config::Config;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
use crate::models::ResponseItem;
|
|
|
|
|
|
|
|
|
|
|
|
const SESSIONS_SUBDIR: &str = "sessions";
|
|
|
|
|
|
|
2025-07-18 17:04:04 -07:00
|
|
|
|
#[derive(Serialize, Deserialize, Clone, Default)]
|
|
|
|
|
|
pub struct SessionMeta {
|
|
|
|
|
|
pub id: Uuid,
|
|
|
|
|
|
pub timestamp: String,
|
|
|
|
|
|
pub instructions: Option<String>,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[derive(Serialize, Deserialize, Default, Clone)]
|
2025-07-23 10:37:45 -07:00
|
|
|
|
pub struct SessionStateSnapshot {}
|
2025-07-18 17:04:04 -07:00
|
|
|
|
|
|
|
|
|
|
#[derive(Serialize, Deserialize, Default, Clone)]
|
|
|
|
|
|
pub struct SavedSession {
|
|
|
|
|
|
pub session: SessionMeta,
|
|
|
|
|
|
#[serde(default)]
|
|
|
|
|
|
pub items: Vec<ResponseItem>,
|
|
|
|
|
|
#[serde(default)]
|
|
|
|
|
|
pub state: SessionStateSnapshot,
|
|
|
|
|
|
pub session_id: Uuid,
|
2025-05-07 13:49:15 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
|
|
|
|
|
|
/// every update.
|
|
|
|
|
|
///
|
|
|
|
|
|
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
|
|
|
|
|
|
///
|
|
|
|
|
|
/// ```ignore
|
2025-05-13 19:22:16 -07:00
|
|
|
|
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
|
|
|
|
|
/// $ fx ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
2025-05-07 13:49:15 -07:00
|
|
|
|
/// ```
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
|
pub(crate) struct RolloutRecorder {
|
2025-07-18 17:04:04 -07:00
|
|
|
|
tx: Sender<RolloutCmd>,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
|
enum RolloutCmd {
|
|
|
|
|
|
AddItems(Vec<ResponseItem>),
|
|
|
|
|
|
UpdateState(SessionStateSnapshot),
|
2025-05-07 13:49:15 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl RolloutRecorder {
|
|
|
|
|
|
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
|
|
|
|
|
|
/// cannot be created or the rollout file cannot be opened we return the
|
|
|
|
|
|
/// error so the caller can decide whether to disable persistence.
|
2025-05-15 00:30:13 -07:00
|
|
|
|
pub async fn new(
|
|
|
|
|
|
config: &Config,
|
|
|
|
|
|
uuid: Uuid,
|
|
|
|
|
|
instructions: Option<String>,
|
|
|
|
|
|
) -> std::io::Result<Self> {
|
2025-05-07 13:49:15 -07:00
|
|
|
|
let LogFileInfo {
|
|
|
|
|
|
file,
|
|
|
|
|
|
session_id,
|
|
|
|
|
|
timestamp,
|
2025-05-15 00:30:13 -07:00
|
|
|
|
} = create_log_file(config, uuid)?;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
let timestamp_format: &[FormatItem] = format_description!(
|
|
|
|
|
|
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
|
|
|
|
|
);
|
2025-05-15 14:07:16 -07:00
|
|
|
|
let timestamp = timestamp
|
|
|
|
|
|
.format(timestamp_format)
|
|
|
|
|
|
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
let meta = SessionMeta {
|
|
|
|
|
|
timestamp,
|
2025-07-18 17:04:04 -07:00
|
|
|
|
id: session_id,
|
2025-05-07 13:49:15 -07:00
|
|
|
|
instructions,
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// A reasonably-sized bounded channel. If the buffer fills up the send
|
|
|
|
|
|
// future will yield, which is fine – we only need to ensure we do not
|
|
|
|
|
|
// perform *blocking* I/O on the caller’s thread.
|
2025-07-18 17:04:04 -07:00
|
|
|
|
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
// Spawn a Tokio task that owns the file handle and performs async
|
|
|
|
|
|
// writes. Using `tokio::fs::File` keeps everything on the async I/O
|
|
|
|
|
|
// driver instead of blocking the runtime.
|
2025-07-18 17:04:04 -07:00
|
|
|
|
tokio::task::spawn(rollout_writer(
|
|
|
|
|
|
tokio::fs::File::from_std(file),
|
|
|
|
|
|
rx,
|
|
|
|
|
|
Some(meta),
|
|
|
|
|
|
));
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
2025-07-18 17:04:04 -07:00
|
|
|
|
Ok(Self { tx })
|
2025-05-07 13:49:15 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
|
2025-07-18 17:04:04 -07:00
|
|
|
|
let mut filtered = Vec::new();
|
2025-05-07 13:49:15 -07:00
|
|
|
|
for item in items {
|
|
|
|
|
|
match item {
|
|
|
|
|
|
// Note that function calls may look a bit strange if they are
|
|
|
|
|
|
// "fully qualified MCP tool calls," so we could consider
|
|
|
|
|
|
// reformatting them in that case.
|
|
|
|
|
|
ResponseItem::Message { .. }
|
2025-05-16 14:38:08 -07:00
|
|
|
|
| ResponseItem::LocalShellCall { .. }
|
2025-05-07 13:49:15 -07:00
|
|
|
|
| ResponseItem::FunctionCall { .. }
|
2025-07-23 10:37:45 -07:00
|
|
|
|
| ResponseItem::FunctionCallOutput { .. }
|
|
|
|
|
|
| ResponseItem::Reasoning { .. } => filtered.push(item.clone()),
|
|
|
|
|
|
ResponseItem::Other => {
|
2025-05-07 13:49:15 -07:00
|
|
|
|
// These should never be serialized.
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-07-18 17:04:04 -07:00
|
|
|
|
if filtered.is_empty() {
|
|
|
|
|
|
return Ok(());
|
|
|
|
|
|
}
|
|
|
|
|
|
self.tx
|
|
|
|
|
|
.send(RolloutCmd::AddItems(filtered))
|
|
|
|
|
|
.await
|
|
|
|
|
|
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
|
2025-05-07 13:49:15 -07:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-07-18 17:04:04 -07:00
|
|
|
|
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
|
2025-05-15 14:07:16 -07:00
|
|
|
|
self.tx
|
2025-07-18 17:04:04 -07:00
|
|
|
|
.send(RolloutCmd::UpdateState(state))
|
2025-05-15 14:07:16 -07:00
|
|
|
|
.await
|
2025-07-18 17:04:04 -07:00
|
|
|
|
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pub async fn resume(path: &Path) -> std::io::Result<(Self, SavedSession)> {
|
|
|
|
|
|
info!("Resuming rollout from {path:?}");
|
|
|
|
|
|
let text = tokio::fs::read_to_string(path).await?;
|
|
|
|
|
|
let mut lines = text.lines();
|
|
|
|
|
|
let meta_line = lines
|
|
|
|
|
|
.next()
|
|
|
|
|
|
.ok_or_else(|| IoError::other("empty session file"))?;
|
|
|
|
|
|
let session: SessionMeta = serde_json::from_str(meta_line)
|
|
|
|
|
|
.map_err(|e| IoError::other(format!("failed to parse session meta: {e}")))?;
|
|
|
|
|
|
let mut items = Vec::new();
|
|
|
|
|
|
let mut state = SessionStateSnapshot::default();
|
|
|
|
|
|
|
|
|
|
|
|
for line in lines {
|
|
|
|
|
|
if line.trim().is_empty() {
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
let v: Value = match serde_json::from_str(line) {
|
|
|
|
|
|
Ok(v) => v,
|
|
|
|
|
|
Err(_) => continue,
|
|
|
|
|
|
};
|
|
|
|
|
|
if v.get("record_type")
|
|
|
|
|
|
.and_then(|rt| rt.as_str())
|
|
|
|
|
|
.map(|s| s == "state")
|
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
|
{
|
|
|
|
|
|
if let Ok(s) = serde_json::from_value::<SessionStateSnapshot>(v.clone()) {
|
|
|
|
|
|
state = s
|
|
|
|
|
|
}
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-07-23 10:37:45 -07:00
|
|
|
|
match serde_json::from_value::<ResponseItem>(v.clone()) {
|
|
|
|
|
|
Ok(item) => match item {
|
2025-07-18 17:04:04 -07:00
|
|
|
|
ResponseItem::Message { .. }
|
|
|
|
|
|
| ResponseItem::LocalShellCall { .. }
|
|
|
|
|
|
| ResponseItem::FunctionCall { .. }
|
2025-07-23 10:37:45 -07:00
|
|
|
|
| ResponseItem::FunctionCallOutput { .. }
|
|
|
|
|
|
| ResponseItem::Reasoning { .. } => items.push(item),
|
|
|
|
|
|
ResponseItem::Other => {}
|
|
|
|
|
|
},
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
|
warn!("failed to parse item: {v:?}, error: {e}");
|
2025-07-18 17:04:04 -07:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let saved = SavedSession {
|
|
|
|
|
|
session: session.clone(),
|
|
|
|
|
|
items: items.clone(),
|
|
|
|
|
|
state: state.clone(),
|
|
|
|
|
|
session_id: session.id,
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let file = std::fs::OpenOptions::new()
|
|
|
|
|
|
.append(true)
|
|
|
|
|
|
.read(true)
|
|
|
|
|
|
.open(path)?;
|
|
|
|
|
|
|
|
|
|
|
|
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
|
|
|
|
|
|
tokio::task::spawn(rollout_writer(tokio::fs::File::from_std(file), rx, None));
|
|
|
|
|
|
info!("Resumed rollout successfully from {path:?}");
|
|
|
|
|
|
Ok((Self { tx }, saved))
|
2025-05-07 13:49:15 -07:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
struct LogFileInfo {
|
|
|
|
|
|
/// Opened file handle to the rollout file.
|
|
|
|
|
|
file: File,
|
|
|
|
|
|
|
|
|
|
|
|
/// Session ID (also embedded in filename).
|
|
|
|
|
|
session_id: Uuid,
|
|
|
|
|
|
|
|
|
|
|
|
/// Timestamp for the start of the session.
|
|
|
|
|
|
timestamp: OffsetDateTime,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-05-15 00:30:13 -07:00
|
|
|
|
fn create_log_file(config: &Config, session_id: Uuid) -> std::io::Result<LogFileInfo> {
|
2025-07-17 10:12:15 -07:00
|
|
|
|
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
|
|
|
|
|
|
let timestamp = OffsetDateTime::now_local()
|
|
|
|
|
|
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
|
2025-05-15 00:30:13 -07:00
|
|
|
|
let mut dir = config.codex_home.clone();
|
2025-05-07 13:49:15 -07:00
|
|
|
|
dir.push(SESSIONS_SUBDIR);
|
2025-07-17 10:12:15 -07:00
|
|
|
|
dir.push(timestamp.year().to_string());
|
|
|
|
|
|
dir.push(format!("{:02}", u8::from(timestamp.month())));
|
|
|
|
|
|
dir.push(format!("{:02}", timestamp.day()));
|
2025-05-07 13:49:15 -07:00
|
|
|
|
fs::create_dir_all(&dir)?;
|
|
|
|
|
|
|
2025-05-13 19:22:16 -07:00
|
|
|
|
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
|
|
|
|
|
|
// compatibility with filesystems that do not allow colons in filenames.
|
|
|
|
|
|
let format: &[FormatItem] =
|
|
|
|
|
|
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
2025-05-08 09:46:18 -07:00
|
|
|
|
let date_str = timestamp
|
|
|
|
|
|
.format(format)
|
2025-05-15 14:07:16 -07:00
|
|
|
|
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
let filename = format!("rollout-{date_str}-{session_id}.jsonl");
|
|
|
|
|
|
|
|
|
|
|
|
let path = dir.join(filename);
|
|
|
|
|
|
let file = std::fs::OpenOptions::new()
|
|
|
|
|
|
.append(true)
|
|
|
|
|
|
.create(true)
|
|
|
|
|
|
.open(&path)?;
|
|
|
|
|
|
|
|
|
|
|
|
Ok(LogFileInfo {
|
|
|
|
|
|
file,
|
|
|
|
|
|
session_id,
|
|
|
|
|
|
timestamp,
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
2025-07-18 17:04:04 -07:00
|
|
|
|
|
|
|
|
|
|
async fn rollout_writer(
|
|
|
|
|
|
mut file: tokio::fs::File,
|
|
|
|
|
|
mut rx: mpsc::Receiver<RolloutCmd>,
|
|
|
|
|
|
meta: Option<SessionMeta>,
|
|
|
|
|
|
) {
|
|
|
|
|
|
if let Some(meta) = meta {
|
|
|
|
|
|
if let Ok(json) = serde_json::to_string(&meta) {
|
|
|
|
|
|
let _ = file.write_all(json.as_bytes()).await;
|
|
|
|
|
|
let _ = file.write_all(b"\n").await;
|
|
|
|
|
|
let _ = file.flush().await;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
while let Some(cmd) = rx.recv().await {
|
|
|
|
|
|
match cmd {
|
|
|
|
|
|
RolloutCmd::AddItems(items) => {
|
|
|
|
|
|
for item in items {
|
|
|
|
|
|
match item {
|
|
|
|
|
|
ResponseItem::Message { .. }
|
|
|
|
|
|
| ResponseItem::LocalShellCall { .. }
|
|
|
|
|
|
| ResponseItem::FunctionCall { .. }
|
2025-07-23 10:37:45 -07:00
|
|
|
|
| ResponseItem::FunctionCallOutput { .. }
|
|
|
|
|
|
| ResponseItem::Reasoning { .. } => {
|
2025-07-18 17:04:04 -07:00
|
|
|
|
if let Ok(json) = serde_json::to_string(&item) {
|
|
|
|
|
|
let _ = file.write_all(json.as_bytes()).await;
|
|
|
|
|
|
let _ = file.write_all(b"\n").await;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-07-23 10:37:45 -07:00
|
|
|
|
ResponseItem::Other => {}
|
2025-07-18 17:04:04 -07:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
let _ = file.flush().await;
|
|
|
|
|
|
}
|
|
|
|
|
|
RolloutCmd::UpdateState(state) => {
|
|
|
|
|
|
#[derive(Serialize)]
|
|
|
|
|
|
struct StateLine<'a> {
|
|
|
|
|
|
record_type: &'static str,
|
|
|
|
|
|
#[serde(flatten)]
|
|
|
|
|
|
state: &'a SessionStateSnapshot,
|
|
|
|
|
|
}
|
|
|
|
|
|
if let Ok(json) = serde_json::to_string(&StateLine {
|
|
|
|
|
|
record_type: "state",
|
|
|
|
|
|
state: &state,
|
|
|
|
|
|
}) {
|
|
|
|
|
|
let _ = file.write_all(json.as_bytes()).await;
|
|
|
|
|
|
let _ = file.write_all(b"\n").await;
|
|
|
|
|
|
let _ = file.flush().await;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|