This is a large change to support a "history" feature like you would
expect in a shell like Bash.
History events are recorded in `$CODEX_HOME/history.jsonl`. Because it
is a JSONL file, it is straightforward to append new entries (as opposed
to the TypeScript file that uses `$CODEX_HOME/history.json`, so to be
valid JSON, each new entry entails rewriting the entire file). Because
it is possible for there to be multiple instances of Codex CLI writing
to `history.jsonl` at once, we use advisory file locking when working
with `history.jsonl` in `codex-rs/core/src/message_history.rs`.
Because we believe history is a sufficiently useful feature, we enable
it by default. Though to provide some safety, we set the file
permissions of `history.jsonl` to be `o600` so that other users on the
system cannot read the user's history. We do not yet support a default
list of `SENSITIVE_PATTERNS` as the TypeScript CLI does:
3fdf9df133/codex-cli/src/utils/storage/command-history.ts (L10-L17)
We are going to take a more conservative approach to this list in the
Rust CLI. For example, while `/\b[A-Za-z0-9-_]{20,}\b/` might exclude
sensitive information like API tokens, it would also exclude valuable
information such as references to Git commits.
As noted in the updated documentation, users can opt-out of history by
adding the following to `config.toml`:
```toml
[history]
persistence = "none"
```
Because `history.jsonl` could, in theory, be quite large, we take a[n
arguably overly pedantic] approach in reading history entries into
memory. Specifically, we start by telling the client the current number
of entries in the history file (`history_entry_count`) as well as the
inode (`history_log_id`) of `history.jsonl` (see the new fields on
`SessionConfiguredEvent`).
The client is responsible for keeping new entries in memory to create a
"local history," but if the user hits up enough times to go "past" the
end of local history, then the client should use the new
`GetHistoryEntryRequest` in the protocol to fetch older entries.
Specifically, it should pass the `history_log_id` it was given
originally and work backwards from `history_entry_count`. (It should
really fetch history in batches rather than one-at-a-time, but that is
something we can improve upon in subsequent PRs.)
The motivation behind this crazy scheme is that it is designed to defend
against:
* The `history.jsonl` being truncated during the session such that the
index into the history is no longer consistent with what had been read
up to that point. We do not yet have logic to enforce a `max_bytes` for
`history.jsonl`, but once we do, we will aspire to implement it in a way
that should result in a new inode for the file on most systems.
* New items from concurrent Codex CLI sessions amending to the history.
Because, in absence of truncation, `history.jsonl` is an append-only
log, so long as the client reads backwards from `history_entry_count`,
it should always get a consistent view of history. (That said, it will
not be able to read _new_ commands from concurrent sessions, but perhaps
we will introduce a `/` command to reload latest history or something
down the road.)
Admittedly, my testing of this feature thus far has been fairly light. I
expect we will find bugs and introduce enhancements/fixes going forward.
298 lines
9.3 KiB
Rust
298 lines
9.3 KiB
Rust
//! Persistence layer for the global, append-only *message history* file.
|
|
//!
|
|
//! The history is stored at `~/.codex/history.jsonl` with **one JSON object per
|
|
//! line** so that it can be efficiently appended to and parsed with standard
|
|
//! JSON-Lines tooling. Each record has the following schema:
|
|
//!
|
|
//! ````text
|
|
//! {"session_id":"<uuid>","ts":<unix_seconds>,"text":"<message>"}
|
|
//! ````
|
|
//!
|
|
//! To minimise the chance of interleaved writes when multiple processes are
|
|
//! appending concurrently, callers should *prepare the full line* (record +
|
|
//! trailing `\n`) and write it with a **single `write(2)` system call** while
|
|
//! the file descriptor is opened with the `O_APPEND` flag. POSIX guarantees
|
|
//! that writes up to `PIPE_BUF` bytes are atomic in that case.
|
|
|
|
use std::fs::File;
|
|
use std::fs::OpenOptions;
|
|
use std::io::Result;
|
|
use std::io::Write;
|
|
use std::path::PathBuf;
|
|
|
|
use serde::Deserialize;
|
|
use serde::Serialize;
|
|
use std::time::Duration;
|
|
use tokio::fs;
|
|
use tokio::io::AsyncReadExt;
|
|
use uuid::Uuid;
|
|
|
|
use crate::config::Config;
|
|
use crate::config::HistoryPersistence;
|
|
|
|
#[cfg(unix)]
|
|
use std::os::unix::fs::OpenOptionsExt;
|
|
#[cfg(unix)]
|
|
use std::os::unix::fs::PermissionsExt;
|
|
|
|
/// Filename that stores the message history inside `~/.codex`.
|
|
const HISTORY_FILENAME: &str = "history.jsonl";
|
|
|
|
const MAX_RETRIES: usize = 10;
|
|
const RETRY_SLEEP: Duration = Duration::from_millis(100);
|
|
|
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
pub struct HistoryEntry {
|
|
pub session_id: String,
|
|
pub ts: u64,
|
|
pub text: String,
|
|
}
|
|
|
|
fn history_filepath(config: &Config) -> PathBuf {
|
|
let mut path = config.codex_home.clone();
|
|
path.push(HISTORY_FILENAME);
|
|
path
|
|
}
|
|
|
|
/// Append a `text` entry associated with `session_id` to the history file. Uses
|
|
/// advisory file locking to ensure that concurrent writes do not interleave,
|
|
/// which entails a small amount of blocking I/O internally.
|
|
pub(crate) async fn append_entry(text: &str, session_id: &Uuid, config: &Config) -> Result<()> {
|
|
match config.history.persistence {
|
|
HistoryPersistence::SaveAll => {
|
|
// Save everything: proceed.
|
|
}
|
|
HistoryPersistence::None => {
|
|
// No history persistence requested.
|
|
return Ok(());
|
|
}
|
|
}
|
|
|
|
// TODO: check `text` for sensitive patterns
|
|
|
|
// Resolve `~/.codex/history.jsonl` and ensure the parent directory exists.
|
|
let path = history_filepath(config);
|
|
if let Some(parent) = path.parent() {
|
|
tokio::fs::create_dir_all(parent).await?;
|
|
}
|
|
|
|
// Compute timestamp (seconds since the Unix epoch).
|
|
let ts = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.map_err(|e| std::io::Error::other(format!("system clock before Unix epoch: {e}")))?
|
|
.as_secs();
|
|
|
|
// Construct the JSON line first so we can write it in a single syscall.
|
|
let entry = HistoryEntry {
|
|
session_id: session_id.to_string(),
|
|
ts,
|
|
text: text.to_string(),
|
|
};
|
|
let mut line = serde_json::to_string(&entry)
|
|
.map_err(|e| std::io::Error::other(format!("failed to serialise history entry: {e}")))?;
|
|
line.push('\n');
|
|
|
|
// Open in append-only mode.
|
|
let mut options = OpenOptions::new();
|
|
options.append(true).read(true).create(true);
|
|
#[cfg(unix)]
|
|
{
|
|
options.mode(0o600);
|
|
}
|
|
|
|
let mut history_file = options.open(&path)?;
|
|
|
|
// Ensure permissions.
|
|
ensure_owner_only_permissions(&history_file).await?;
|
|
|
|
// Lock file.
|
|
acquire_exclusive_lock_with_retry(&history_file).await?;
|
|
|
|
// We use sync I/O with spawn_blocking() because we are using a
|
|
// [`std::fs::File`] instead of a [`tokio::fs::File`] to leverage an
|
|
// advisory file locking API that is not available in the async API.
|
|
tokio::task::spawn_blocking(move || -> Result<()> {
|
|
history_file.write_all(line.as_bytes())?;
|
|
history_file.flush()?;
|
|
Ok(())
|
|
})
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Attempt to acquire an exclusive advisory lock on `file`, retrying up to 10
|
|
/// times if the lock is currently held by another process. This prevents a
|
|
/// potential indefinite wait while still giving other writers some time to
|
|
/// finish their operation.
|
|
async fn acquire_exclusive_lock_with_retry(file: &std::fs::File) -> Result<()> {
|
|
use tokio::time::sleep;
|
|
|
|
for _ in 0..MAX_RETRIES {
|
|
match fs2::FileExt::try_lock_exclusive(file) {
|
|
Ok(()) => return Ok(()),
|
|
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
|
sleep(RETRY_SLEEP).await;
|
|
}
|
|
Err(e) => return Err(e),
|
|
}
|
|
}
|
|
|
|
Err(std::io::Error::new(
|
|
std::io::ErrorKind::WouldBlock,
|
|
"could not acquire exclusive lock on history file after multiple attempts",
|
|
))
|
|
}
|
|
|
|
/// Asynchronously fetch the history file's *identifier* (inode on Unix) and
|
|
/// the current number of entries by counting newline characters.
|
|
pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
|
|
let path = history_filepath(config);
|
|
|
|
#[cfg(unix)]
|
|
let log_id = {
|
|
use std::os::unix::fs::MetadataExt;
|
|
// Obtain metadata (async) to get the identifier.
|
|
let meta = match fs::metadata(&path).await {
|
|
Ok(m) => m,
|
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return (0, 0),
|
|
Err(_) => return (0, 0),
|
|
};
|
|
meta.ino()
|
|
};
|
|
#[cfg(not(unix))]
|
|
let log_id = 0u64;
|
|
|
|
// Open the file.
|
|
let mut file = match fs::File::open(&path).await {
|
|
Ok(f) => f,
|
|
Err(_) => return (log_id, 0),
|
|
};
|
|
|
|
// Count newline bytes.
|
|
let mut buf = [0u8; 8192];
|
|
let mut count = 0usize;
|
|
loop {
|
|
match file.read(&mut buf).await {
|
|
Ok(0) => break,
|
|
Ok(n) => {
|
|
count += buf[..n].iter().filter(|&&b| b == b'\n').count();
|
|
}
|
|
Err(_) => return (log_id, 0),
|
|
}
|
|
}
|
|
|
|
(log_id, count)
|
|
}
|
|
|
|
/// Given a `log_id` (on Unix this is the file's inode number) and a zero-based
|
|
/// `offset`, return the corresponding `HistoryEntry` if the identifier matches
|
|
/// the current history file **and** the requested offset exists. Any I/O or
|
|
/// parsing errors are logged and result in `None`.
|
|
///
|
|
/// Note this function is not async because it uses a sync advisory file
|
|
/// locking API.
|
|
#[cfg(unix)]
|
|
pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<HistoryEntry> {
|
|
use std::io::BufRead;
|
|
use std::io::BufReader;
|
|
use std::os::unix::fs::MetadataExt;
|
|
|
|
let path = history_filepath(config);
|
|
let file: File = match OpenOptions::new().read(true).open(&path) {
|
|
Ok(f) => f,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "failed to open history file");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
let metadata = match file.metadata() {
|
|
Ok(m) => m,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "failed to stat history file");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
if metadata.ino() != log_id {
|
|
return None;
|
|
}
|
|
|
|
// Open & lock file for reading.
|
|
if let Err(e) = acquire_shared_lock_with_retry(&file) {
|
|
tracing::warn!(error = %e, "failed to acquire shared lock on history file");
|
|
return None;
|
|
}
|
|
|
|
let reader = BufReader::new(&file);
|
|
for (idx, line_res) in reader.lines().enumerate() {
|
|
let line = match line_res {
|
|
Ok(l) => l,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "failed to read line from history file");
|
|
return None;
|
|
}
|
|
};
|
|
|
|
if idx == offset {
|
|
match serde_json::from_str::<HistoryEntry>(&line) {
|
|
Ok(entry) => return Some(entry),
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "failed to parse history entry");
|
|
return None;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Fallback stub for non-Unix systems: currently always returns `None`.
|
|
#[cfg(not(unix))]
|
|
pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<HistoryEntry> {
|
|
let _ = (log_id, offset, config);
|
|
None
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
fn acquire_shared_lock_with_retry(file: &File) -> Result<()> {
|
|
for _ in 0..MAX_RETRIES {
|
|
match fs2::FileExt::try_lock_shared(file) {
|
|
Ok(()) => return Ok(()),
|
|
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
|
std::thread::sleep(RETRY_SLEEP);
|
|
}
|
|
Err(e) => return Err(e),
|
|
}
|
|
}
|
|
|
|
Err(std::io::Error::new(
|
|
std::io::ErrorKind::WouldBlock,
|
|
"could not acquire shared lock on history file after multiple attempts",
|
|
))
|
|
}
|
|
|
|
/// On Unix systems ensure the file permissions are `0o600` (rw-------). If the
|
|
/// permissions cannot be changed the error is propagated to the caller.
|
|
#[cfg(unix)]
|
|
async fn ensure_owner_only_permissions(file: &File) -> Result<()> {
|
|
let metadata = file.metadata()?;
|
|
let current_mode = metadata.permissions().mode() & 0o777;
|
|
if current_mode != 0o600 {
|
|
let mut perms = metadata.permissions();
|
|
perms.set_mode(0o600);
|
|
let perms_clone = perms.clone();
|
|
let file_clone = file.try_clone()?;
|
|
tokio::task::spawn_blocking(move || file_clone.set_permissions(perms_clone)).await??;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(not(unix))]
|
|
async fn ensure_owner_only_permissions(_file: &File) -> Result<()> {
|
|
// For now, on non-Unix, simply succeed.
|
|
Ok(())
|
|
}
|