Adding the `rollout_path` to the `NewConversationResponse` makes it so a client can perform subsequent operations on a `(ConversationId, PathBuf)` pair. #3353 will introduce support for `ArchiveConversation`. --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/3352). * #3353 * __->__ #3352
410 lines
13 KiB
Rust
410 lines
13 KiB
Rust
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
|
||
|
||
use std::fs::File;
|
||
use std::fs::{self};
|
||
use std::io::Error as IoError;
|
||
use std::path::Path;
|
||
use std::path::PathBuf;
|
||
|
||
use codex_protocol::mcp_protocol::ConversationId;
|
||
use serde::Deserialize;
|
||
use serde::Serialize;
|
||
use serde_json::Value;
|
||
use time::OffsetDateTime;
|
||
use time::format_description::FormatItem;
|
||
use time::macros::format_description;
|
||
use tokio::io::AsyncWriteExt;
|
||
use tokio::sync::mpsc::Sender;
|
||
use tokio::sync::mpsc::{self};
|
||
use tokio::sync::oneshot;
|
||
use tracing::info;
|
||
use tracing::warn;
|
||
|
||
use super::SESSIONS_SUBDIR;
|
||
use super::list::ConversationsPage;
|
||
use super::list::Cursor;
|
||
use super::list::get_conversations;
|
||
use super::policy::is_persisted_response_item;
|
||
use crate::config::Config;
|
||
use crate::conversation_manager::InitialHistory;
|
||
use crate::conversation_manager::ResumedHistory;
|
||
use crate::git_info::GitInfo;
|
||
use crate::git_info::collect_git_info;
|
||
use codex_protocol::models::ResponseItem;
|
||
|
||
#[derive(Serialize, Deserialize, Clone, Default)]
|
||
pub struct SessionMeta {
|
||
pub id: ConversationId,
|
||
pub timestamp: String,
|
||
pub instructions: Option<String>,
|
||
}
|
||
|
||
#[derive(Serialize)]
|
||
struct SessionMetaWithGit {
|
||
#[serde(flatten)]
|
||
meta: SessionMeta,
|
||
#[serde(skip_serializing_if = "Option::is_none")]
|
||
git: Option<GitInfo>,
|
||
}
|
||
|
||
#[derive(Serialize, Deserialize, Default, Clone)]
|
||
pub struct SessionStateSnapshot {}
|
||
|
||
#[derive(Serialize, Deserialize, Default, Clone)]
|
||
pub struct SavedSession {
|
||
pub session: SessionMeta,
|
||
#[serde(default)]
|
||
pub items: Vec<ResponseItem>,
|
||
#[serde(default)]
|
||
pub state: SessionStateSnapshot,
|
||
pub session_id: ConversationId,
|
||
}
|
||
|
||
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
|
||
/// every update.
|
||
///
|
||
/// Rollouts are recorded as JSONL and can be inspected with tools such as:
|
||
///
|
||
/// ```ignore
|
||
/// $ jq -C . ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
||
/// $ fx ~/.codex/sessions/rollout-2025-05-07T17-24-21-5973b6c0-94b8-487b-a530-2aeb6098ae0e.jsonl
|
||
/// ```
|
||
#[derive(Clone)]
|
||
pub struct RolloutRecorder {
|
||
tx: Sender<RolloutCmd>,
|
||
pub(crate) rollout_path: PathBuf,
|
||
}
|
||
|
||
#[derive(Clone)]
|
||
pub enum RolloutRecorderParams {
|
||
Create {
|
||
conversation_id: ConversationId,
|
||
instructions: Option<String>,
|
||
},
|
||
Resume {
|
||
path: PathBuf,
|
||
},
|
||
}
|
||
|
||
enum RolloutCmd {
|
||
AddItems(Vec<ResponseItem>),
|
||
UpdateState(SessionStateSnapshot),
|
||
Shutdown { ack: oneshot::Sender<()> },
|
||
}
|
||
|
||
impl RolloutRecorderParams {
|
||
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
|
||
Self::Create {
|
||
conversation_id,
|
||
instructions,
|
||
}
|
||
}
|
||
|
||
pub fn resume(path: PathBuf) -> Self {
|
||
Self::Resume { path }
|
||
}
|
||
}
|
||
|
||
impl RolloutRecorder {
|
||
#[allow(dead_code)]
|
||
/// List conversations (rollout files) under the provided Codex home directory.
|
||
pub async fn list_conversations(
|
||
codex_home: &Path,
|
||
page_size: usize,
|
||
cursor: Option<&Cursor>,
|
||
) -> std::io::Result<ConversationsPage> {
|
||
get_conversations(codex_home, page_size, cursor).await
|
||
}
|
||
|
||
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
|
||
/// cannot be created or the rollout file cannot be opened we return the
|
||
/// error so the caller can decide whether to disable persistence.
|
||
pub async fn new(config: &Config, params: RolloutRecorderParams) -> std::io::Result<Self> {
|
||
let (file, rollout_path, meta) = match params {
|
||
RolloutRecorderParams::Create {
|
||
conversation_id,
|
||
instructions,
|
||
} => {
|
||
let LogFileInfo {
|
||
file,
|
||
path,
|
||
conversation_id: session_id,
|
||
timestamp,
|
||
} = create_log_file(config, conversation_id)?;
|
||
|
||
let timestamp_format: &[FormatItem] = format_description!(
|
||
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
|
||
);
|
||
let timestamp = timestamp
|
||
.to_offset(time::UtcOffset::UTC)
|
||
.format(timestamp_format)
|
||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||
|
||
(
|
||
tokio::fs::File::from_std(file),
|
||
path,
|
||
Some(SessionMeta {
|
||
timestamp,
|
||
id: session_id,
|
||
instructions,
|
||
}),
|
||
)
|
||
}
|
||
RolloutRecorderParams::Resume { path } => (
|
||
tokio::fs::OpenOptions::new()
|
||
.append(true)
|
||
.open(&path)
|
||
.await?,
|
||
path,
|
||
None,
|
||
),
|
||
};
|
||
|
||
// Clone the cwd for the spawned task to collect git info asynchronously
|
||
let cwd = config.cwd.clone();
|
||
|
||
// A reasonably-sized bounded channel. If the buffer fills up the send
|
||
// future will yield, which is fine – we only need to ensure we do not
|
||
// perform *blocking* I/O on the caller's thread.
|
||
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
|
||
|
||
// Spawn a Tokio task that owns the file handle and performs async
|
||
// writes. Using `tokio::fs::File` keeps everything on the async I/O
|
||
// driver instead of blocking the runtime.
|
||
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
|
||
|
||
Ok(Self { tx, rollout_path })
|
||
}
|
||
|
||
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
|
||
let mut filtered = Vec::new();
|
||
for item in items {
|
||
// Note that function calls may look a bit strange if they are
|
||
// "fully qualified MCP tool calls," so we could consider
|
||
// reformatting them in that case.
|
||
if is_persisted_response_item(item) {
|
||
filtered.push(item.clone());
|
||
}
|
||
}
|
||
if filtered.is_empty() {
|
||
return Ok(());
|
||
}
|
||
self.tx
|
||
.send(RolloutCmd::AddItems(filtered))
|
||
.await
|
||
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
|
||
}
|
||
|
||
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
|
||
self.tx
|
||
.send(RolloutCmd::UpdateState(state))
|
||
.await
|
||
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
|
||
}
|
||
|
||
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
|
||
info!("Resuming rollout from {path:?}");
|
||
tracing::error!("Resuming rollout from {path:?}");
|
||
let text = tokio::fs::read_to_string(path).await?;
|
||
let mut lines = text.lines();
|
||
let first_line = lines
|
||
.next()
|
||
.ok_or_else(|| IoError::other("empty session file"))?;
|
||
let conversation_id = match serde_json::from_str::<SessionMeta>(first_line) {
|
||
Ok(rollout_session_meta) => {
|
||
tracing::error!(
|
||
"Parsed conversation ID from rollout file: {:?}",
|
||
rollout_session_meta.id
|
||
);
|
||
Some(rollout_session_meta.id)
|
||
}
|
||
Err(e) => {
|
||
return Err(IoError::other(format!(
|
||
"failed to parse first line of rollout file as SessionMeta: {e}"
|
||
)));
|
||
}
|
||
};
|
||
|
||
let mut items = Vec::new();
|
||
for line in lines {
|
||
if line.trim().is_empty() {
|
||
continue;
|
||
}
|
||
let v: Value = match serde_json::from_str(line) {
|
||
Ok(v) => v,
|
||
Err(_) => continue,
|
||
};
|
||
if v.get("record_type")
|
||
.and_then(|rt| rt.as_str())
|
||
.map(|s| s == "state")
|
||
.unwrap_or(false)
|
||
{
|
||
continue;
|
||
}
|
||
match serde_json::from_value::<ResponseItem>(v.clone()) {
|
||
Ok(item) => {
|
||
if is_persisted_response_item(&item) {
|
||
items.push(item);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
warn!("failed to parse item: {v:?}, error: {e}");
|
||
}
|
||
}
|
||
}
|
||
|
||
tracing::error!(
|
||
"Resumed rollout with {} items, conversation ID: {:?}",
|
||
items.len(),
|
||
conversation_id
|
||
);
|
||
let conversation_id = conversation_id
|
||
.ok_or_else(|| IoError::other("failed to parse conversation ID from rollout file"))?;
|
||
|
||
if items.is_empty() {
|
||
return Ok(InitialHistory::New);
|
||
}
|
||
|
||
info!("Resumed rollout successfully from {path:?}");
|
||
Ok(InitialHistory::Resumed(ResumedHistory {
|
||
conversation_id,
|
||
history: items,
|
||
rollout_path: path.to_path_buf(),
|
||
}))
|
||
}
|
||
|
||
pub async fn shutdown(&self) -> std::io::Result<()> {
|
||
let (tx_done, rx_done) = oneshot::channel();
|
||
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
|
||
Ok(_) => rx_done
|
||
.await
|
||
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}"))),
|
||
Err(e) => {
|
||
warn!("failed to send rollout shutdown command: {e}");
|
||
Err(IoError::other(format!(
|
||
"failed to send rollout shutdown command: {e}"
|
||
)))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
struct LogFileInfo {
|
||
/// Opened file handle to the rollout file.
|
||
file: File,
|
||
|
||
/// Full path to the rollout file.
|
||
path: PathBuf,
|
||
|
||
/// Session ID (also embedded in filename).
|
||
conversation_id: ConversationId,
|
||
|
||
/// Timestamp for the start of the session.
|
||
timestamp: OffsetDateTime,
|
||
}
|
||
|
||
fn create_log_file(
|
||
config: &Config,
|
||
conversation_id: ConversationId,
|
||
) -> std::io::Result<LogFileInfo> {
|
||
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
|
||
let timestamp = OffsetDateTime::now_local()
|
||
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
|
||
let mut dir = config.codex_home.clone();
|
||
dir.push(SESSIONS_SUBDIR);
|
||
dir.push(timestamp.year().to_string());
|
||
dir.push(format!("{:02}", u8::from(timestamp.month())));
|
||
dir.push(format!("{:02}", timestamp.day()));
|
||
fs::create_dir_all(&dir)?;
|
||
|
||
// Custom format for YYYY-MM-DDThh-mm-ss. Use `-` instead of `:` for
|
||
// compatibility with filesystems that do not allow colons in filenames.
|
||
let format: &[FormatItem] =
|
||
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
|
||
let date_str = timestamp
|
||
.format(format)
|
||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||
|
||
let filename = format!("rollout-{date_str}-{conversation_id}.jsonl");
|
||
|
||
let path = dir.join(filename);
|
||
let file = std::fs::OpenOptions::new()
|
||
.append(true)
|
||
.create(true)
|
||
.open(&path)?;
|
||
|
||
Ok(LogFileInfo {
|
||
file,
|
||
path,
|
||
conversation_id,
|
||
timestamp,
|
||
})
|
||
}
|
||
|
||
async fn rollout_writer(
|
||
file: tokio::fs::File,
|
||
mut rx: mpsc::Receiver<RolloutCmd>,
|
||
mut meta: Option<SessionMeta>,
|
||
cwd: std::path::PathBuf,
|
||
) -> std::io::Result<()> {
|
||
let mut writer = JsonlWriter { file };
|
||
|
||
// If we have a meta, collect git info asynchronously and write meta first
|
||
if let Some(session_meta) = meta.take() {
|
||
let git_info = collect_git_info(&cwd).await;
|
||
let session_meta_with_git = SessionMetaWithGit {
|
||
meta: session_meta,
|
||
git: git_info,
|
||
};
|
||
|
||
// Write the SessionMeta as the first item in the file
|
||
writer.write_line(&session_meta_with_git).await?;
|
||
}
|
||
|
||
// Process rollout commands
|
||
while let Some(cmd) = rx.recv().await {
|
||
match cmd {
|
||
RolloutCmd::AddItems(items) => {
|
||
for item in items {
|
||
if is_persisted_response_item(&item) {
|
||
writer.write_line(&item).await?;
|
||
}
|
||
}
|
||
}
|
||
RolloutCmd::UpdateState(state) => {
|
||
#[derive(Serialize)]
|
||
struct StateLine<'a> {
|
||
record_type: &'static str,
|
||
#[serde(flatten)]
|
||
state: &'a SessionStateSnapshot,
|
||
}
|
||
writer
|
||
.write_line(&StateLine {
|
||
record_type: "state",
|
||
state: &state,
|
||
})
|
||
.await?;
|
||
}
|
||
RolloutCmd::Shutdown { ack } => {
|
||
let _ = ack.send(());
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
struct JsonlWriter {
|
||
file: tokio::fs::File,
|
||
}
|
||
|
||
impl JsonlWriter {
|
||
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
|
||
let mut json = serde_json::to_string(item)?;
|
||
json.push('\n');
|
||
self.file.write_all(json.as_bytes()).await?;
|
||
self.file.flush().await?;
|
||
Ok(())
|
||
}
|
||
}
|