From 42617f8726f1869eb808585c4e093d15dc316423 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Wed, 7 May 2025 13:49:15 -0700 Subject: [PATCH] feat: save session transcripts when using Rust CLI (#845) This adds support for saving transcripts when using the Rust CLI. Like the TypeScript CLI, it saves the transcript to `~/.codex/sessions`, though it uses JSONL for the file format (and `.jsonl` for the file extension) so that even if Codex crashes, what was written to the `.jsonl` file should generally still be valid JSONL content. --- codex-rs/Cargo.lock | 11 +++ codex-rs/core/Cargo.toml | 2 + codex-rs/core/src/codex.rs | 41 ++++++++ codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/rollout.rs | 184 +++++++++++++++++++++++++++++++++++ 5 files changed, 239 insertions(+) create mode 100644 codex-rs/core/src/rollout.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 62f826d9..9e5cd850 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -539,12 +539,14 @@ dependencies = [ "serde_json", "tempfile", "thiserror 2.0.12", + "time", "tokio", "tokio-util", "toml", "tracing", "tree-sitter", "tree-sitter-bash", + "uuid", "wiremock", ] @@ -4088,6 +4090,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +dependencies = [ + "getrandom 0.3.2", +] + [[package]] name = "valuable" version = "0.1.1" diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index d989aeaf..3319ef10 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -29,6 +29,7 @@ reqwest = { version = "0.12", features = ["json", "stream"] } serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "2.0.12" +time = { version = "0.3", features = ["formatting", "macros"] } tokio = { version = "1", features = [ "io-std", "macros", @@ -41,6 +42,7 @@ toml = "0.8.20" tracing = { version = "0.1.41", features = ["log"] } tree-sitter = "0.25.3" tree-sitter-bash = "0.23.3" +uuid = { version = "1", features = ["v4"] } [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2.172" diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index cb749fac..b80e33a4 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -58,6 +58,7 @@ use crate::protocol::Op; use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; use crate::protocol::Submission; +use crate::rollout::RolloutRecorder; use crate::safety::SafetyCheck; use crate::safety::assess_command_safety; use crate::safety::assess_patch_safety; @@ -214,6 +215,10 @@ pub(crate) struct Session { /// External notifier command (will be passed as args to exec()). When /// `None` this feature is disabled. notify: Option>, + + /// Optional rollout recorder for persisting the conversation transcript so + /// sessions can be replayed or inspected later. + rollout: Mutex>, state: Mutex, } @@ -322,6 +327,23 @@ impl Session { state.approved_commands.insert(cmd); } + /// Append the given items to the session's rollout transcript (if enabled) + /// and persist them to disk. + async fn record_rollout_items(&self, items: &[ResponseItem]) { + // Clone the recorder outside of the mutex so we don’t hold the lock + // across an await point (MutexGuard is not Send). + let recorder = { + let guard = self.rollout.lock().unwrap(); + guard.as_ref().cloned() + }; + + if let Some(rec) = recorder { + if let Err(e) = rec.record_items(items).await { + error!("failed to record rollout items: {e:#}"); + } + } + } + async fn notify_exec_command_begin(&self, sub_id: &str, call_id: &str, params: &ExecParams) { let event = Event { id: sub_id.to_string(), @@ -603,6 +625,16 @@ async fn submission_loop( } }; + // Attempt to create a RolloutRecorder *before* moving the + // `instructions` value into the Session struct. + let rollout_recorder = match RolloutRecorder::new(instructions.clone()).await { + Ok(r) => Some(r), + Err(e) => { + tracing::warn!("failed to initialise rollout recorder: {e}"); + None + } + }; + sess = Some(Arc::new(Session { client, tx_event: tx_event.clone(), @@ -615,6 +647,7 @@ async fn submission_loop( mcp_connection_manager, notify, state: Mutex::new(state), + rollout: Mutex::new(rollout_recorder), })); // ack @@ -713,6 +746,10 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { net_new_turn_input }; + // Persist the input part of the turn to the rollout (user messages / + // function_call_output from previous step). + sess.record_rollout_items(&turn_input).await; + let turn_input_messages: Vec = turn_input .iter() .filter_map(|item| match item { @@ -740,6 +777,10 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { // Only attempt to take the lock if there is something to record. if !items.is_empty() { + // First persist model-generated output to the rollout file – this only borrows. + sess.record_rollout_items(&items).await; + + // For ZDR we also need to keep a transcript clone. if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() { transcript.record_items(items); } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 919d05f1..ef671a94 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -20,6 +20,7 @@ pub mod mcp_server_config; mod mcp_tool_call; mod models; pub mod protocol; +mod rollout; mod safety; mod user_notification; pub mod util; diff --git a/codex-rs/core/src/rollout.rs b/codex-rs/core/src/rollout.rs new file mode 100644 index 00000000..07d2cd91 --- /dev/null +++ b/codex-rs/core/src/rollout.rs @@ -0,0 +1,184 @@ +//! Functionality to persist a Codex conversation *rollout* – a linear list of +//! [`ResponseItem`] objects exchanged during a session – to disk so that +//! sessions can be replayed or inspected later (mirrors the behaviour of the +//! upstream TypeScript implementation). + +use std::fs::File; +use std::fs::{self}; +use std::io::Error as IoError; +use std::io::ErrorKind; + +use serde::Serialize; +use time::OffsetDateTime; +use time::format_description::FormatItem; +use time::macros::format_description; +use tokio::io::AsyncWriteExt; +use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{self}; +use uuid::Uuid; + +use crate::config::codex_dir; +use crate::models::ResponseItem; + +/// Folder inside `~/.codex` that holds saved rollouts. +const SESSIONS_SUBDIR: &str = "sessions"; + +#[derive(Serialize)] +struct SessionMeta { + id: String, + timestamp: String, + #[serde(skip_serializing_if = "Option::is_none")] + instructions: Option, +} + +/// Records all [`ResponseItem`]s for a session and flushes them to disk after +/// every update. +/// +/// Rollouts are recorded as JSONL and can be inspected with tools such as: +/// +/// ```ignore +/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl +/// $ fx ~/.codex/sessions/rollout-2025-05-07-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl +/// ``` +#[derive(Clone)] +pub(crate) struct RolloutRecorder { + tx: Sender, +} + +impl RolloutRecorder { + /// Attempt to create a new [`RolloutRecorder`]. If the sessions directory + /// cannot be created or the rollout file cannot be opened we return the + /// error so the caller can decide whether to disable persistence. + pub async fn new(instructions: Option) -> std::io::Result { + let LogFileInfo { + file, + session_id, + timestamp, + } = create_log_file()?; + + // Build the static session metadata JSON first. + let timestamp_format: &[FormatItem] = format_description!( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" + ); + let timestamp = timestamp.format(timestamp_format).map_err(|e| { + IoError::new(ErrorKind::Other, format!("failed to format timestamp: {e}")) + })?; + + let meta = SessionMeta { + timestamp, + id: session_id.to_string(), + instructions, + }; + + // A reasonably-sized bounded channel. If the buffer fills up the send + // future will yield, which is fine – we only need to ensure we do not + // perform *blocking* I/O on the caller’s thread. + let (tx, mut rx) = mpsc::channel::(256); + + // Spawn a Tokio task that owns the file handle and performs async + // writes. Using `tokio::fs::File` keeps everything on the async I/O + // driver instead of blocking the runtime. + tokio::task::spawn(async move { + let mut file = tokio::fs::File::from_std(file); + + while let Some(line) = rx.recv().await { + // Write line + newline, then flush to disk. + if let Err(e) = file.write_all(line.as_bytes()).await { + tracing::warn!("rollout writer: failed to write line: {e}"); + break; + } + if let Err(e) = file.write_all(b"\n").await { + tracing::warn!("rollout writer: failed to write newline: {e}"); + break; + } + if let Err(e) = file.flush().await { + tracing::warn!("rollout writer: failed to flush: {e}"); + break; + } + } + }); + + let recorder = Self { tx }; + // Ensure SessionMeta is the first item in the file. + recorder.record_item(&meta).await?; + Ok(recorder) + } + + /// Append `items` to the rollout file. + pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> { + for item in items { + match item { + // Note that function calls may look a bit strange if they are + // "fully qualified MCP tool calls," so we could consider + // reformatting them in that case. + ResponseItem::Message { .. } + | ResponseItem::FunctionCall { .. } + | ResponseItem::FunctionCallOutput { .. } => {} + ResponseItem::Other => { + // These should never be serialized. + continue; + } + } + self.record_item(item).await?; + } + Ok(()) + } + + async fn record_item(&self, item: &impl Serialize) -> std::io::Result<()> { + // Serialize the item to JSON first so that the writer thread only has + // to perform the actual write. + let json = serde_json::to_string(item).map_err(|e| { + IoError::new( + ErrorKind::Other, + format!("failed to serialize response items: {e}"), + ) + })?; + + self.tx.send(json).await.map_err(|e| { + IoError::new( + ErrorKind::Other, + format!("failed to queue rollout item: {e}"), + ) + }) + } +} + +struct LogFileInfo { + /// Opened file handle to the rollout file. + file: File, + + /// Session ID (also embedded in filename). + session_id: Uuid, + + /// Timestamp for the start of the session. + timestamp: OffsetDateTime, +} + +fn create_log_file() -> std::io::Result { + // Resolve ~/.codex/sessions and create it if missing. + let mut dir = codex_dir()?; + dir.push(SESSIONS_SUBDIR); + fs::create_dir_all(&dir)?; + + // Generate a v4 UUID – matches the JS CLI implementation. + let session_id = Uuid::new_v4(); + let timestamp = OffsetDateTime::now_utc(); + + // Custom format for YYYY-MM-DD. + let format: &[FormatItem] = format_description!("[year]-[month]-[day]"); + let date_str = timestamp.format(format).unwrap(); + + let filename = format!("rollout-{date_str}-{session_id}.jsonl"); + + let path = dir.join(filename); + let file = std::fs::OpenOptions::new() + .append(true) + .create(true) + .open(&path)?; + + Ok(LogFileInfo { + file, + session_id, + timestamp, + }) +}