//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later. use std::fs::File; use std::fs::{self}; use std::io::Error as IoError; use std::path::Path; use serde::Deserialize; use serde::Serialize; use serde_json::Value; use time::OffsetDateTime; use time::format_description::FormatItem; use time::macros::format_description; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; use tokio::sync::oneshot; use tracing::info; use tracing::warn; use uuid::Uuid; use crate::config::Config; use crate::git_info::GitInfo; use crate::git_info::collect_git_info; use codex_protocol::models::ResponseItem; const SESSIONS_SUBDIR: &str = "sessions"; #[derive(Serialize, Deserialize, Clone, Default)] pub struct SessionMeta { pub id: Uuid, pub timestamp: String, pub instructions: Option, } #[derive(Serialize)] struct SessionMetaWithGit { #[serde(flatten)] meta: SessionMeta, #[serde(skip_serializing_if = "Option::is_none")] git: Option, } #[derive(Serialize, Deserialize, Default, Clone)] pub struct SessionStateSnapshot {} #[derive(Serialize, Deserialize, Default, Clone)] pub struct SavedSession { pub session: SessionMeta, #[serde(default)] pub items: Vec, #[serde(default)] pub state: SessionStateSnapshot, pub session_id: Uuid, } /// Records all [`ResponseItem`]s for a session and flushes them to disk after /// every update. /// /// Rollouts are recorded as JSONL and can be inspected with tools such as: /// /// ```ignore /// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl /// $ fx ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl /// ``` #[derive(Clone)] pub(crate) struct RolloutRecorder { tx: Sender, } enum RolloutCmd { AddItems(Vec), UpdateState(SessionStateSnapshot), Shutdown { ack: oneshot::Sender<()> }, } impl RolloutRecorder { /// Attempt to create a new [`RolloutRecorder`]. If the sessions directory /// cannot be created or the rollout file cannot be opened we return the /// error so the caller can decide whether to disable persistence. pub async fn new( config: &Config, uuid: Uuid, instructions: Option, ) -> std::io::Result { let LogFileInfo { file, session_id, timestamp, } = create_log_file(config, uuid)?; let timestamp_format: &[FormatItem] = format_description!( "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" ); let timestamp = timestamp .format(timestamp_format) .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; // Clone the cwd for the spawned task to collect git info asynchronously let cwd = config.cwd.clone(); // A reasonably-sized bounded channel. If the buffer fills up the send // future will yield, which is fine – we only need to ensure we do not // perform *blocking* I/O on the caller's thread. let (tx, rx) = mpsc::channel::(256); // Spawn a Tokio task that owns the file handle and performs async // writes. Using `tokio::fs::File` keeps everything on the async I/O // driver instead of blocking the runtime. tokio::task::spawn(rollout_writer( tokio::fs::File::from_std(file), rx, Some(SessionMeta { timestamp, id: session_id, instructions, }), cwd, )); Ok(Self { tx }) } pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> { let mut filtered = Vec::new(); for item in items { match item { // Note that function calls may look a bit strange if they are // "fully qualified MCP tool calls," so we could consider // reformatting them in that case. ResponseItem::Message { .. } | ResponseItem::LocalShellCall { .. } | ResponseItem::FunctionCall { .. } | ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCall { .. } | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::Reasoning { .. } => filtered.push(item.clone()), ResponseItem::Other => { // These should never be serialized. continue; } } } if filtered.is_empty() { return Ok(()); } self.tx .send(RolloutCmd::AddItems(filtered)) .await .map_err(|e| IoError::other(format!("failed to queue rollout items: {e}"))) } pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> { self.tx .send(RolloutCmd::UpdateState(state)) .await .map_err(|e| IoError::other(format!("failed to queue rollout state: {e}"))) } pub async fn resume( path: &Path, cwd: std::path::PathBuf, ) -> std::io::Result<(Self, SavedSession)> { info!("Resuming rollout from {path:?}"); let text = tokio::fs::read_to_string(path).await?; let mut lines = text.lines(); let meta_line = lines .next() .ok_or_else(|| IoError::other("empty session file"))?; let session: SessionMeta = serde_json::from_str(meta_line) .map_err(|e| IoError::other(format!("failed to parse session meta: {e}")))?; let mut items = Vec::new(); let mut state = SessionStateSnapshot::default(); for line in lines { if line.trim().is_empty() { continue; } let v: Value = match serde_json::from_str(line) { Ok(v) => v, Err(_) => continue, }; if v.get("record_type") .and_then(|rt| rt.as_str()) .map(|s| s == "state") .unwrap_or(false) { if let Ok(s) = serde_json::from_value::(v.clone()) { state = s } continue; } match serde_json::from_value::(v.clone()) { Ok(item) => match item { ResponseItem::Message { .. } | ResponseItem::LocalShellCall { .. } | ResponseItem::FunctionCall { .. } | ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCall { .. } | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::Reasoning { .. } => items.push(item), ResponseItem::Other => {} }, Err(e) => { warn!("failed to parse item: {v:?}, error: {e}"); } } } let saved = SavedSession { session: session.clone(), items: items.clone(), state: state.clone(), session_id: session.id, }; let file = std::fs::OpenOptions::new() .append(true) .read(true) .open(path)?; let (tx, rx) = mpsc::channel::(256); tokio::task::spawn(rollout_writer( tokio::fs::File::from_std(file), rx, None, cwd, )); info!("Resumed rollout successfully from {path:?}"); Ok((Self { tx }, saved)) } pub async fn shutdown(&self) -> std::io::Result<()> { let (tx_done, rx_done) = oneshot::channel(); match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await { Ok(_) => rx_done .await .map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}"))), Err(e) => { warn!("failed to send rollout shutdown command: {e}"); Err(IoError::other(format!( "failed to send rollout shutdown command: {e}" ))) } } } } struct LogFileInfo { /// Opened file handle to the rollout file. file: File, /// Session ID (also embedded in filename). session_id: Uuid, /// Timestamp for the start of the session. timestamp: OffsetDateTime, } fn create_log_file(config: &Config, session_id: Uuid) -> std::io::Result { // Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing. let timestamp = OffsetDateTime::now_local() .map_err(|e| IoError::other(format!("failed to get local time: {e}")))?; let mut dir = config.codex_home.clone(); dir.push(SESSIONS_SUBDIR); dir.push(timestamp.year().to_string()); dir.push(format!("{:02}", u8::from(timestamp.month()))); dir.push(format!("{:02}", timestamp.day())); fs::create_dir_all(&dir)?; // Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for // compatibility with filesystems that do not allow colons in filenames. let format: &[FormatItem] = format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]"); let date_str = timestamp .format(format) .map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?; let filename = format!("rollout-{date_str}-{session_id}.jsonl"); let path = dir.join(filename); let file = std::fs::OpenOptions::new() .append(true) .create(true) .open(&path)?; Ok(LogFileInfo { file, session_id, timestamp, }) } async fn rollout_writer( file: tokio::fs::File, mut rx: mpsc::Receiver, mut meta: Option, cwd: std::path::PathBuf, ) -> std::io::Result<()> { let mut writer = JsonlWriter { file }; // If we have a meta, collect git info asynchronously and write meta first if let Some(session_meta) = meta.take() { let git_info = collect_git_info(&cwd).await; let session_meta_with_git = SessionMetaWithGit { meta: session_meta, git: git_info, }; // Write the SessionMeta as the first item in the file writer.write_line(&session_meta_with_git).await?; } // Process rollout commands while let Some(cmd) = rx.recv().await { match cmd { RolloutCmd::AddItems(items) => { for item in items { match item { ResponseItem::Message { .. } | ResponseItem::LocalShellCall { .. } | ResponseItem::FunctionCall { .. } | ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCall { .. } | ResponseItem::CustomToolCallOutput { .. } | ResponseItem::Reasoning { .. } => { writer.write_line(&item).await?; } ResponseItem::Other => {} } } } RolloutCmd::UpdateState(state) => { #[derive(Serialize)] struct StateLine<'a> { record_type: &'static str, #[serde(flatten)] state: &'a SessionStateSnapshot, } writer .write_line(&StateLine { record_type: "state", state: &state, }) .await?; } RolloutCmd::Shutdown { ack } => { let _ = ack.send(()); } } } Ok(()) } struct JsonlWriter { file: tokio::fs::File, } impl JsonlWriter { async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> { let mut json = serde_json::to_string(item)?; json.push('\n'); let _ = self.file.write_all(json.as_bytes()).await; self.file.flush().await?; Ok(()) } }