2025-05-07 13:49:15 -07:00
|
|
|
|
//! Functionality to persist a Codex conversation *rollout* – a linear list of
|
|
|
|
|
|
//! [`ResponseItem`] objects exchanged during a session – to disk so that
|
|
|
|
|
|
//! sessions can be replayed or inspected later (mirrors the behaviour of the
|
|
|
|
|
|
//! upstream TypeScript implementation).
|
|
|
|
|
|
|
|
|
|
|
|
use std::fs::File;
|
|
|
|
|
|
use std::fs::{self};
|
|
|
|
|
|
use std::io::Error as IoError;
|
|
|
|
|
|
|
|
|
|
|
|
use serde::Serialize;
|
|
|
|
|
|
use time::OffsetDateTime;
|
|
|
|
|
|
use time::format_description::FormatItem;
|
|
|
|
|
|
use time::macros::format_description;
|
|
|
|
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
|
|
use tokio::sync::mpsc::Sender;
|
|
|
|
|
|
use tokio::sync::mpsc::{self};
|
|
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
2025-05-15 00:30:13 -07:00
|
|
|
|
use crate::config::Config;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
use crate::models::ResponseItem;
|
|
|
|
|
|
|
|
|
|
|
|
/// Folder inside `~/.codex` that holds saved rollouts.
|
|
|
|
|
|
const SESSIONS_SUBDIR: &str = "sessions";
|
|
|
|
|
|
|
|
|
|
|
|
#[derive(Serialize)]
|
|
|
|
|
|
struct SessionMeta {
|
|
|
|
|
|
id: String,
|
|
|
|
|
|
timestamp: String,
|
|
|
|
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
|
|
|
|
instructions: Option<String>,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
|
|
|
|
|
|
/// every update.
|
|
|
|
|
|
///
|
|
|
|
|
|
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
|
|
|
|
|
|
///
|
|
|
|
|
|
/// ```ignore
|
2025-05-13 19:22:16 -07:00
|
|
|
|
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
|
|
|
|
|
/// $ fx ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
2025-05-07 13:49:15 -07:00
|
|
|
|
/// ```
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
|
pub(crate) struct RolloutRecorder {
|
|
|
|
|
|
tx: Sender<String>,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl RolloutRecorder {
|
|
|
|
|
|
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
|
|
|
|
|
|
/// cannot be created or the rollout file cannot be opened we return the
|
|
|
|
|
|
/// error so the caller can decide whether to disable persistence.
|
2025-05-15 00:30:13 -07:00
|
|
|
|
pub async fn new(
|
|
|
|
|
|
config: &Config,
|
|
|
|
|
|
uuid: Uuid,
|
|
|
|
|
|
instructions: Option<String>,
|
|
|
|
|
|
) -> std::io::Result<Self> {
|
2025-05-07 13:49:15 -07:00
|
|
|
|
let LogFileInfo {
|
|
|
|
|
|
file,
|
|
|
|
|
|
session_id,
|
|
|
|
|
|
timestamp,
|
2025-05-15 00:30:13 -07:00
|
|
|
|
} = create_log_file(config, uuid)?;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
// Build the static session metadata JSON first.
|
|
|
|
|
|
let timestamp_format: &[FormatItem] = format_description!(
|
|
|
|
|
|
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
|
|
|
|
|
);
|
2025-05-15 14:07:16 -07:00
|
|
|
|
let timestamp = timestamp
|
|
|
|
|
|
.format(timestamp_format)
|
|
|
|
|
|
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
let meta = SessionMeta {
|
|
|
|
|
|
timestamp,
|
|
|
|
|
|
id: session_id.to_string(),
|
|
|
|
|
|
instructions,
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// A reasonably-sized bounded channel. If the buffer fills up the send
|
|
|
|
|
|
// future will yield, which is fine – we only need to ensure we do not
|
|
|
|
|
|
// perform *blocking* I/O on the caller’s thread.
|
|
|
|
|
|
let (tx, mut rx) = mpsc::channel::<String>(256);
|
|
|
|
|
|
|
|
|
|
|
|
// Spawn a Tokio task that owns the file handle and performs async
|
|
|
|
|
|
// writes. Using `tokio::fs::File` keeps everything on the async I/O
|
|
|
|
|
|
// driver instead of blocking the runtime.
|
|
|
|
|
|
tokio::task::spawn(async move {
|
|
|
|
|
|
let mut file = tokio::fs::File::from_std(file);
|
|
|
|
|
|
|
|
|
|
|
|
while let Some(line) = rx.recv().await {
|
|
|
|
|
|
// Write line + newline, then flush to disk.
|
|
|
|
|
|
if let Err(e) = file.write_all(line.as_bytes()).await {
|
|
|
|
|
|
tracing::warn!("rollout writer: failed to write line: {e}");
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
if let Err(e) = file.write_all(b"\n").await {
|
|
|
|
|
|
tracing::warn!("rollout writer: failed to write newline: {e}");
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
if let Err(e) = file.flush().await {
|
|
|
|
|
|
tracing::warn!("rollout writer: failed to flush: {e}");
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
let recorder = Self { tx };
|
|
|
|
|
|
// Ensure SessionMeta is the first item in the file.
|
|
|
|
|
|
recorder.record_item(&meta).await?;
|
|
|
|
|
|
Ok(recorder)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// Append `items` to the rollout file.
|
|
|
|
|
|
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
|
|
|
|
|
|
for item in items {
|
|
|
|
|
|
match item {
|
|
|
|
|
|
// Note that function calls may look a bit strange if they are
|
|
|
|
|
|
// "fully qualified MCP tool calls," so we could consider
|
|
|
|
|
|
// reformatting them in that case.
|
|
|
|
|
|
ResponseItem::Message { .. }
|
2025-05-16 14:38:08 -07:00
|
|
|
|
| ResponseItem::LocalShellCall { .. }
|
2025-05-07 13:49:15 -07:00
|
|
|
|
| ResponseItem::FunctionCall { .. }
|
|
|
|
|
|
| ResponseItem::FunctionCallOutput { .. } => {}
|
2025-05-10 21:43:27 -07:00
|
|
|
|
ResponseItem::Reasoning { .. } | ResponseItem::Other => {
|
2025-05-07 13:49:15 -07:00
|
|
|
|
// These should never be serialized.
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
self.record_item(item).await?;
|
|
|
|
|
|
}
|
|
|
|
|
|
Ok(())
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async fn record_item(&self, item: &impl Serialize) -> std::io::Result<()> {
|
|
|
|
|
|
// Serialize the item to JSON first so that the writer thread only has
|
|
|
|
|
|
// to perform the actual write.
|
2025-05-15 14:07:16 -07:00
|
|
|
|
let json = serde_json::to_string(item)
|
|
|
|
|
|
.map_err(|e| IoError::other(format!("failed to serialize response items: {e}")))?;
|
|
|
|
|
|
|
|
|
|
|
|
self.tx
|
|
|
|
|
|
.send(json)
|
|
|
|
|
|
.await
|
|
|
|
|
|
.map_err(|e| IoError::other(format!("failed to queue rollout item: {e}")))
|
2025-05-07 13:49:15 -07:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
struct LogFileInfo {
|
|
|
|
|
|
/// Opened file handle to the rollout file.
|
|
|
|
|
|
file: File,
|
|
|
|
|
|
|
|
|
|
|
|
/// Session ID (also embedded in filename).
|
|
|
|
|
|
session_id: Uuid,
|
|
|
|
|
|
|
|
|
|
|
|
/// Timestamp for the start of the session.
|
|
|
|
|
|
timestamp: OffsetDateTime,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-05-15 00:30:13 -07:00
|
|
|
|
fn create_log_file(config: &Config, session_id: Uuid) -> std::io::Result<LogFileInfo> {
|
2025-05-07 13:49:15 -07:00
|
|
|
|
// Resolve ~/.codex/sessions and create it if missing.
|
2025-05-15 00:30:13 -07:00
|
|
|
|
let mut dir = config.codex_home.clone();
|
2025-05-07 13:49:15 -07:00
|
|
|
|
dir.push(SESSIONS_SUBDIR);
|
|
|
|
|
|
fs::create_dir_all(&dir)?;
|
|
|
|
|
|
|
2025-05-13 19:22:16 -07:00
|
|
|
|
let timestamp = OffsetDateTime::now_local()
|
2025-05-15 14:07:16 -07:00
|
|
|
|
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
2025-05-13 19:22:16 -07:00
|
|
|
|
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
|
|
|
|
|
|
// compatibility with filesystems that do not allow colons in filenames.
|
|
|
|
|
|
let format: &[FormatItem] =
|
|
|
|
|
|
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
2025-05-08 09:46:18 -07:00
|
|
|
|
let date_str = timestamp
|
|
|
|
|
|
.format(format)
|
2025-05-15 14:07:16 -07:00
|
|
|
|
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
2025-05-07 13:49:15 -07:00
|
|
|
|
|
|
|
|
|
|
let filename = format!("rollout-{date_str}-{session_id}.jsonl");
|
|
|
|
|
|
|
|
|
|
|
|
let path = dir.join(filename);
|
|
|
|
|
|
let file = std::fs::OpenOptions::new()
|
|
|
|
|
|
.append(true)
|
|
|
|
|
|
.create(true)
|
|
|
|
|
|
.open(&path)?;
|
|
|
|
|
|
|
|
|
|
|
|
Ok(LogFileInfo {
|
|
|
|
|
|
file,
|
|
|
|
|
|
session_id,
|
|
|
|
|
|
timestamp,
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|