feat: save session transcripts when using Rust CLI (#845)
This adds support for saving transcripts when using the Rust CLI. Like the TypeScript CLI, it saves the transcript to `~/.codex/sessions`, though it uses JSONL for the file format (and `.jsonl` for the file extension) so that even if Codex crashes, what was written to the `.jsonl` file should generally still be valid JSONL content.
This commit is contained in:
11
codex-rs/Cargo.lock
generated
11
codex-rs/Cargo.lock
generated
@@ -539,12 +539,14 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror 2.0.12",
|
"thiserror 2.0.12",
|
||||||
|
"time",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"toml",
|
"toml",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tree-sitter",
|
"tree-sitter",
|
||||||
"tree-sitter-bash",
|
"tree-sitter-bash",
|
||||||
|
"uuid",
|
||||||
"wiremock",
|
"wiremock",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -4088,6 +4090,15 @@ version = "0.2.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "uuid"
|
||||||
|
version = "1.16.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
|
||||||
|
dependencies = [
|
||||||
|
"getrandom 0.3.2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "valuable"
|
name = "valuable"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ reqwest = { version = "0.12", features = ["json", "stream"] }
|
|||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
thiserror = "2.0.12"
|
thiserror = "2.0.12"
|
||||||
|
time = { version = "0.3", features = ["formatting", "macros"] }
|
||||||
tokio = { version = "1", features = [
|
tokio = { version = "1", features = [
|
||||||
"io-std",
|
"io-std",
|
||||||
"macros",
|
"macros",
|
||||||
@@ -41,6 +42,7 @@ toml = "0.8.20"
|
|||||||
tracing = { version = "0.1.41", features = ["log"] }
|
tracing = { version = "0.1.41", features = ["log"] }
|
||||||
tree-sitter = "0.25.3"
|
tree-sitter = "0.25.3"
|
||||||
tree-sitter-bash = "0.23.3"
|
tree-sitter-bash = "0.23.3"
|
||||||
|
uuid = { version = "1", features = ["v4"] }
|
||||||
|
|
||||||
[target.'cfg(target_os = "linux")'.dependencies]
|
[target.'cfg(target_os = "linux")'.dependencies]
|
||||||
libc = "0.2.172"
|
libc = "0.2.172"
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ use crate::protocol::Op;
|
|||||||
use crate::protocol::ReviewDecision;
|
use crate::protocol::ReviewDecision;
|
||||||
use crate::protocol::SandboxPolicy;
|
use crate::protocol::SandboxPolicy;
|
||||||
use crate::protocol::Submission;
|
use crate::protocol::Submission;
|
||||||
|
use crate::rollout::RolloutRecorder;
|
||||||
use crate::safety::SafetyCheck;
|
use crate::safety::SafetyCheck;
|
||||||
use crate::safety::assess_command_safety;
|
use crate::safety::assess_command_safety;
|
||||||
use crate::safety::assess_patch_safety;
|
use crate::safety::assess_patch_safety;
|
||||||
@@ -214,6 +215,10 @@ pub(crate) struct Session {
|
|||||||
/// External notifier command (will be passed as args to exec()). When
|
/// External notifier command (will be passed as args to exec()). When
|
||||||
/// `None` this feature is disabled.
|
/// `None` this feature is disabled.
|
||||||
notify: Option<Vec<String>>,
|
notify: Option<Vec<String>>,
|
||||||
|
|
||||||
|
/// Optional rollout recorder for persisting the conversation transcript so
|
||||||
|
/// sessions can be replayed or inspected later.
|
||||||
|
rollout: Mutex<Option<crate::rollout::RolloutRecorder>>,
|
||||||
state: Mutex<State>,
|
state: Mutex<State>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -322,6 +327,23 @@ impl Session {
|
|||||||
state.approved_commands.insert(cmd);
|
state.approved_commands.insert(cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Append the given items to the session's rollout transcript (if enabled)
|
||||||
|
/// and persist them to disk.
|
||||||
|
async fn record_rollout_items(&self, items: &[ResponseItem]) {
|
||||||
|
// Clone the recorder outside of the mutex so we don’t hold the lock
|
||||||
|
// across an await point (MutexGuard is not Send).
|
||||||
|
let recorder = {
|
||||||
|
let guard = self.rollout.lock().unwrap();
|
||||||
|
guard.as_ref().cloned()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(rec) = recorder {
|
||||||
|
if let Err(e) = rec.record_items(items).await {
|
||||||
|
error!("failed to record rollout items: {e:#}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn notify_exec_command_begin(&self, sub_id: &str, call_id: &str, params: &ExecParams) {
|
async fn notify_exec_command_begin(&self, sub_id: &str, call_id: &str, params: &ExecParams) {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: sub_id.to_string(),
|
id: sub_id.to_string(),
|
||||||
@@ -603,6 +625,16 @@ async fn submission_loop(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Attempt to create a RolloutRecorder *before* moving the
|
||||||
|
// `instructions` value into the Session struct.
|
||||||
|
let rollout_recorder = match RolloutRecorder::new(instructions.clone()).await {
|
||||||
|
Ok(r) => Some(r),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("failed to initialise rollout recorder: {e}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
sess = Some(Arc::new(Session {
|
sess = Some(Arc::new(Session {
|
||||||
client,
|
client,
|
||||||
tx_event: tx_event.clone(),
|
tx_event: tx_event.clone(),
|
||||||
@@ -615,6 +647,7 @@ async fn submission_loop(
|
|||||||
mcp_connection_manager,
|
mcp_connection_manager,
|
||||||
notify,
|
notify,
|
||||||
state: Mutex::new(state),
|
state: Mutex::new(state),
|
||||||
|
rollout: Mutex::new(rollout_recorder),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// ack
|
// ack
|
||||||
@@ -713,6 +746,10 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
|
|||||||
net_new_turn_input
|
net_new_turn_input
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Persist the input part of the turn to the rollout (user messages /
|
||||||
|
// function_call_output from previous step).
|
||||||
|
sess.record_rollout_items(&turn_input).await;
|
||||||
|
|
||||||
let turn_input_messages: Vec<String> = turn_input
|
let turn_input_messages: Vec<String> = turn_input
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|item| match item {
|
.filter_map(|item| match item {
|
||||||
@@ -740,6 +777,10 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
|
|||||||
|
|
||||||
// Only attempt to take the lock if there is something to record.
|
// Only attempt to take the lock if there is something to record.
|
||||||
if !items.is_empty() {
|
if !items.is_empty() {
|
||||||
|
// First persist model-generated output to the rollout file – this only borrows.
|
||||||
|
sess.record_rollout_items(&items).await;
|
||||||
|
|
||||||
|
// For ZDR we also need to keep a transcript clone.
|
||||||
if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() {
|
if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() {
|
||||||
transcript.record_items(items);
|
transcript.record_items(items);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ pub mod mcp_server_config;
|
|||||||
mod mcp_tool_call;
|
mod mcp_tool_call;
|
||||||
mod models;
|
mod models;
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
|
mod rollout;
|
||||||
mod safety;
|
mod safety;
|
||||||
mod user_notification;
|
mod user_notification;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|||||||
184
codex-rs/core/src/rollout.rs
Normal file
184
codex-rs/core/src/rollout.rs
Normal file
@@ -0,0 +1,184 @@
|
|||||||
|
//! Functionality to persist a Codex conversation *rollout* – a linear list of
|
||||||
|
//! [`ResponseItem`] objects exchanged during a session – to disk so that
|
||||||
|
//! sessions can be replayed or inspected later (mirrors the behaviour of the
|
||||||
|
//! upstream TypeScript implementation).
|
||||||
|
|
||||||
|
use std::fs::File;
|
||||||
|
use std::fs::{self};
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
|
||||||
|
use serde::Serialize;
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
use time::format_description::FormatItem;
|
||||||
|
use time::macros::format_description;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use tokio::sync::mpsc::{self};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::config::codex_dir;
|
||||||
|
use crate::models::ResponseItem;
|
||||||
|
|
||||||
|
/// Folder inside `~/.codex` that holds saved rollouts.
|
||||||
|
const SESSIONS_SUBDIR: &str = "sessions";
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct SessionMeta {
|
||||||
|
id: String,
|
||||||
|
timestamp: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
instructions: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
|
||||||
|
/// every update.
|
||||||
|
///
|
||||||
|
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
|
||||||
|
///
|
||||||
|
/// ```ignore
|
||||||
|
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
||||||
|
/// $ fx ~/.codex/sessions/rollout-2025-05-07-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
||||||
|
/// ```
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct RolloutRecorder {
|
||||||
|
tx: Sender<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RolloutRecorder {
|
||||||
|
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
|
||||||
|
/// cannot be created or the rollout file cannot be opened we return the
|
||||||
|
/// error so the caller can decide whether to disable persistence.
|
||||||
|
pub async fn new(instructions: Option<String>) -> std::io::Result<Self> {
|
||||||
|
let LogFileInfo {
|
||||||
|
file,
|
||||||
|
session_id,
|
||||||
|
timestamp,
|
||||||
|
} = create_log_file()?;
|
||||||
|
|
||||||
|
// Build the static session metadata JSON first.
|
||||||
|
let timestamp_format: &[FormatItem] = format_description!(
|
||||||
|
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
||||||
|
);
|
||||||
|
let timestamp = timestamp.format(timestamp_format).map_err(|e| {
|
||||||
|
IoError::new(ErrorKind::Other, format!("failed to format timestamp: {e}"))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let meta = SessionMeta {
|
||||||
|
timestamp,
|
||||||
|
id: session_id.to_string(),
|
||||||
|
instructions,
|
||||||
|
};
|
||||||
|
|
||||||
|
// A reasonably-sized bounded channel. If the buffer fills up the send
|
||||||
|
// future will yield, which is fine – we only need to ensure we do not
|
||||||
|
// perform *blocking* I/O on the caller’s thread.
|
||||||
|
let (tx, mut rx) = mpsc::channel::<String>(256);
|
||||||
|
|
||||||
|
// Spawn a Tokio task that owns the file handle and performs async
|
||||||
|
// writes. Using `tokio::fs::File` keeps everything on the async I/O
|
||||||
|
// driver instead of blocking the runtime.
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let mut file = tokio::fs::File::from_std(file);
|
||||||
|
|
||||||
|
while let Some(line) = rx.recv().await {
|
||||||
|
// Write line + newline, then flush to disk.
|
||||||
|
if let Err(e) = file.write_all(line.as_bytes()).await {
|
||||||
|
tracing::warn!("rollout writer: failed to write line: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if let Err(e) = file.write_all(b"\n").await {
|
||||||
|
tracing::warn!("rollout writer: failed to write newline: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if let Err(e) = file.flush().await {
|
||||||
|
tracing::warn!("rollout writer: failed to flush: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let recorder = Self { tx };
|
||||||
|
// Ensure SessionMeta is the first item in the file.
|
||||||
|
recorder.record_item(&meta).await?;
|
||||||
|
Ok(recorder)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Append `items` to the rollout file.
|
||||||
|
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
|
||||||
|
for item in items {
|
||||||
|
match item {
|
||||||
|
// Note that function calls may look a bit strange if they are
|
||||||
|
// "fully qualified MCP tool calls," so we could consider
|
||||||
|
// reformatting them in that case.
|
||||||
|
ResponseItem::Message { .. }
|
||||||
|
| ResponseItem::FunctionCall { .. }
|
||||||
|
| ResponseItem::FunctionCallOutput { .. } => {}
|
||||||
|
ResponseItem::Other => {
|
||||||
|
// These should never be serialized.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.record_item(item).await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn record_item(&self, item: &impl Serialize) -> std::io::Result<()> {
|
||||||
|
// Serialize the item to JSON first so that the writer thread only has
|
||||||
|
// to perform the actual write.
|
||||||
|
let json = serde_json::to_string(item).map_err(|e| {
|
||||||
|
IoError::new(
|
||||||
|
ErrorKind::Other,
|
||||||
|
format!("failed to serialize response items: {e}"),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.tx.send(json).await.map_err(|e| {
|
||||||
|
IoError::new(
|
||||||
|
ErrorKind::Other,
|
||||||
|
format!("failed to queue rollout item: {e}"),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LogFileInfo {
|
||||||
|
/// Opened file handle to the rollout file.
|
||||||
|
file: File,
|
||||||
|
|
||||||
|
/// Session ID (also embedded in filename).
|
||||||
|
session_id: Uuid,
|
||||||
|
|
||||||
|
/// Timestamp for the start of the session.
|
||||||
|
timestamp: OffsetDateTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_log_file() -> std::io::Result<LogFileInfo> {
|
||||||
|
// Resolve ~/.codex/sessions and create it if missing.
|
||||||
|
let mut dir = codex_dir()?;
|
||||||
|
dir.push(SESSIONS_SUBDIR);
|
||||||
|
fs::create_dir_all(&dir)?;
|
||||||
|
|
||||||
|
// Generate a v4 UUID – matches the JS CLI implementation.
|
||||||
|
let session_id = Uuid::new_v4();
|
||||||
|
let timestamp = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
|
// Custom format for YYYY-MM-DD.
|
||||||
|
let format: &[FormatItem] = format_description!("[year]-[month]-[day]");
|
||||||
|
let date_str = timestamp.format(format).unwrap();
|
||||||
|
|
||||||
|
let filename = format!("rollout-{date_str}-{session_id}.jsonl");
|
||||||
|
|
||||||
|
let path = dir.join(filename);
|
||||||
|
let file = std::fs::OpenOptions::new()
|
||||||
|
.append(true)
|
||||||
|
.create(true)
|
||||||
|
.open(&path)?;
|
||||||
|
|
||||||
|
Ok(LogFileInfo {
|
||||||
|
file,
|
||||||
|
session_id,
|
||||||
|
timestamp,
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user