diff --git a/codex-rs/core/src/message_history.rs b/codex-rs/core/src/message_history.rs index 9f13abab..75bc5aa7 100644 --- a/codex-rs/core/src/message_history.rs +++ b/codex-rs/core/src/message_history.rs @@ -105,47 +105,34 @@ pub(crate) async fn append_entry(text: &str, session_id: &Uuid, config: &Config) // Ensure permissions. ensure_owner_only_permissions(&history_file).await?; - // Lock file. - acquire_exclusive_lock_with_retry(&history_file).await?; - - // We use sync I/O with spawn_blocking() because we are using a - // [`std::fs::File`] instead of a [`tokio::fs::File`] to leverage an - // advisory file locking API that is not available in the async API. + // Perform a blocking write under an advisory write lock using std::fs. tokio::task::spawn_blocking(move || -> Result<()> { - history_file.write_all(line.as_bytes())?; - history_file.flush()?; - Ok(()) + // Retry a few times to avoid indefinite blocking when contended. + for _ in 0..MAX_RETRIES { + match history_file.try_lock() { + Ok(()) => { + // While holding the exclusive lock, write the full line. + history_file.write_all(line.as_bytes())?; + history_file.flush()?; + return Ok(()); + } + Err(std::fs::TryLockError::WouldBlock) => { + std::thread::sleep(RETRY_SLEEP); + } + Err(e) => return Err(e.into()), + } + } + + Err(std::io::Error::new( + std::io::ErrorKind::WouldBlock, + "could not acquire exclusive lock on history file after multiple attempts", + )) }) .await??; Ok(()) } -/// Attempt to acquire an exclusive advisory lock on `file`, retrying up to 10 -/// times if the lock is currently held by another process. This prevents a -/// potential indefinite wait while still giving other writers some time to -/// finish their operation. -async fn acquire_exclusive_lock_with_retry(file: &File) -> Result<()> { - use tokio::time::sleep; - - for _ in 0..MAX_RETRIES { - match file.try_lock() { - Ok(()) => return Ok(()), - Err(e) => match e { - std::fs::TryLockError::WouldBlock => { - sleep(RETRY_SLEEP).await; - } - other => return Err(other.into()), - }, - } - } - - Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "could not acquire exclusive lock on history file after multiple attempts", - )) -} - /// Asynchronously fetch the history file's *identifier* (inode on Unix) and /// the current number of entries by counting newline characters. pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) { @@ -221,29 +208,42 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option l, - Err(e) => { - tracing::warn!(error = %e, "failed to read line from history file"); + match lock_result { + Ok(()) => { + let reader = BufReader::new(&file); + for (idx, line_res) in reader.lines().enumerate() { + let line = match line_res { + Ok(l) => l, + Err(e) => { + tracing::warn!(error = %e, "failed to read line from history file"); + return None; + } + }; + + if idx == offset { + match serde_json::from_str::(&line) { + Ok(entry) => return Some(entry), + Err(e) => { + tracing::warn!(error = %e, "failed to parse history entry"); + return None; + } + } + } + } + // Not found at requested offset. return None; } - }; - - if idx == offset { - match serde_json::from_str::(&line) { - Ok(entry) => return Some(entry), - Err(e) => { - tracing::warn!(error = %e, "failed to parse history entry"); - return None; - } + Err(std::fs::TryLockError::WouldBlock) => { + std::thread::sleep(RETRY_SLEEP); + } + Err(e) => { + tracing::warn!(error = %e, "failed to acquire shared lock on history file"); + return None; } } } @@ -258,26 +258,6 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option Result<()> { - for _ in 0..MAX_RETRIES { - match file.try_lock_shared() { - Ok(()) => return Ok(()), - Err(e) => match e { - std::fs::TryLockError::WouldBlock => { - std::thread::sleep(RETRY_SLEEP); - } - other => return Err(other.into()), - }, - } - } - - Err(std::io::Error::new( - std::io::ErrorKind::WouldBlock, - "could not acquire shared lock on history file after multiple attempts", - )) -} - /// On Unix systems ensure the file permissions are `0o600` (rw-------). If the /// permissions cannot be changed the error is propagated to the caller. #[cfg(unix)]