Files
llmx/codex-rs/core/src/rollout.rs

355 lines
12 KiB
Rust
Raw Normal View History

//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
use std::fs::File;
use std::fs::{self};
use std::io::Error as IoError;
use std::path::Path;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
use uuid::Uuid;
use crate::config::Config;
use crate::git_info::GitInfo;
use crate::git_info::collect_git_info;
use crate::models::ResponseItem;
const SESSIONS_SUBDIR: &str = "sessions";
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct SessionMeta {
pub id: Uuid,
pub timestamp: String,
pub instructions: Option<String>,
}
#[derive(Serialize)]
struct SessionMetaWithGit {
#[serde(flatten)]
meta: SessionMeta,
#[serde(skip_serializing_if = "Option::is_none")]
git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct SessionStateSnapshot {}
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct SavedSession {
pub session: SessionMeta,
#[serde(default)]
pub items: Vec<ResponseItem>,
#[serde(default)]
pub state: SessionStateSnapshot,
pub session_id: Uuid,
}
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
/// every update.
///
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
///
/// ```ignore
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
/// $ fx ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
/// ```
#[derive(Clone)]
pub(crate) struct RolloutRecorder {
tx: Sender<RolloutCmd>,
}
enum RolloutCmd {
AddItems(Vec<ResponseItem>),
UpdateState(SessionStateSnapshot),
Shutdown { ack: oneshot::Sender<()> },
}
impl RolloutRecorder {
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
/// cannot be created or the rollout file cannot be opened we return the
/// error so the caller can decide whether to disable persistence.
pub async fn new(
config: &Config,
uuid: Uuid,
instructions: Option<String>,
) -> std::io::Result<Self> {
let LogFileInfo {
file,
session_id,
timestamp,
} = create_log_file(config, uuid)?;
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = timestamp
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
// Clone the cwd for the spawned task to collect git info asynchronously
let cwd = config.cwd.clone();
// A reasonably-sized bounded channel. If the buffer fills up the send
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
// Spawn a Tokio task that owns the file handle and performs async
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(
tokio::fs::File::from_std(file),
rx,
Some(SessionMeta {
timestamp,
id: session_id,
instructions,
}),
cwd,
));
Ok(Self { tx })
}
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
match item {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
ResponseItem::Message { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::Reasoning { .. } => filtered.push(item.clone()),
ResponseItem::Other => {
// These should never be serialized.
continue;
}
}
}
if filtered.is_empty() {
return Ok(());
}
self.tx
.send(RolloutCmd::AddItems(filtered))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::UpdateState(state))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
}
pub async fn resume(
path: &Path,
cwd: std::path::PathBuf,
) -> std::io::Result<(Self, SavedSession)> {
info!("Resuming rollout from {path:?}");
let text = tokio::fs::read_to_string(path).await?;
let mut lines = text.lines();
let meta_line = lines
.next()
.ok_or_else(|| IoError::other("empty session file"))?;
let session: SessionMeta = serde_json::from_str(meta_line)
.map_err(|e| IoError::other(format!("failed to parse session meta: {e}")))?;
let mut items = Vec::new();
let mut state = SessionStateSnapshot::default();
for line in lines {
if line.trim().is_empty() {
continue;
}
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
if v.get("record_type")
.and_then(|rt| rt.as_str())
.map(|s| s == "state")
.unwrap_or(false)
{
if let Ok(s) = serde_json::from_value::<SessionStateSnapshot>(v.clone()) {
state = s
}
continue;
}
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => match item {
ResponseItem::Message { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::Reasoning { .. } => items.push(item),
ResponseItem::Other => {}
},
Err(e) => {
warn!("failed to parse item: {v:?}, error: {e}");
}
}
}
let saved = SavedSession {
session: session.clone(),
items: items.clone(),
state: state.clone(),
session_id: session.id,
};
let file = std::fs::OpenOptions::new()
.append(true)
.read(true)
.open(path)?;
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
tokio::task::spawn(rollout_writer(
tokio::fs::File::from_std(file),
rx,
None,
cwd,
));
info!("Resumed rollout successfully from {path:?}");
Ok((Self { tx }, saved))
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
Ok(_) => rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}"))),
Err(e) => {
warn!("failed to send rollout shutdown command: {e}");
Err(IoError::other(format!(
"failed to send rollout shutdown command: {e}"
)))
}
}
}
}
struct LogFileInfo {
/// Opened file handle to the rollout file.
file: File,
/// Session ID (also embedded in filename).
session_id: Uuid,
/// Timestamp for the start of the session.
timestamp: OffsetDateTime,
}
fn create_log_file(config: &Config, session_id: Uuid) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let mut dir = config.codex_home.clone();
dir.push(SESSIONS_SUBDIR);
dir.push(timestamp.year().to_string());
dir.push(format!("{:02}", u8::from(timestamp.month())));
dir.push(format!("{:02}", timestamp.day()));
fs::create_dir_all(&dir)?;
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
// compatibility with filesystems that do not allow colons in filenames.
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
let date_str = timestamp
.format(format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let filename = format!("rollout-{date_str}-{session_id}.jsonl");
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(&path)?;
Ok(LogFileInfo {
file,
session_id,
timestamp,
})
}
async fn rollout_writer(
mut file: tokio::fs::File,
mut rx: mpsc::Receiver<RolloutCmd>,
mut meta: Option<SessionMeta>,
cwd: std::path::PathBuf,
) {
// If we have a meta, collect git info asynchronously and write meta first
if let Some(session_meta) = meta.take() {
let git_info = collect_git_info(&cwd).await;
let session_meta_with_git = SessionMetaWithGit {
meta: session_meta,
git: git_info,
};
// Write the SessionMeta as the first item in the file
if let Ok(json) = serde_json::to_string(&session_meta_with_git) {
let _ = file.write_all(json.as_bytes()).await;
let _ = file.write_all(b"\n").await;
let _ = file.flush().await;
}
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
for item in items {
match item {
ResponseItem::Message { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::Reasoning { .. } => {
if let Ok(json) = serde_json::to_string(&item) {
let _ = file.write_all(json.as_bytes()).await;
let _ = file.write_all(b"\n").await;
}
}
ResponseItem::Other => {}
}
}
let _ = file.flush().await;
}
RolloutCmd::UpdateState(state) => {
#[derive(Serialize)]
struct StateLine<'a> {
record_type: &'static str,
#[serde(flatten)]
state: &'a SessionStateSnapshot,
}
if let Ok(json) = serde_json::to_string(&StateLine {
record_type: "state",
state: &state,
}) {
let _ = file.write_all(json.as_bytes()).await;
let _ = file.write_all(b"\n").await;
let _ = file.flush().await;
}
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}
}
}
}