From 6cd5309d91a60df3d90532fba85371e04e03739a Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 23 Sep 2025 17:27:20 +0100 Subject: [PATCH] feat: readiness tool (#4090) Readiness flag with token-based subscription and async wait function that waits for all the subscribers to be ready --- codex-rs/Cargo.lock | 14 ++ codex-rs/Cargo.toml | 1 + codex-rs/common/Cargo.toml | 3 + codex-rs/common/src/lib.rs | 2 + codex-rs/common/src/readiness.rs | 249 +++++++++++++++++++++++++++++++ 5 files changed, 269 insertions(+) create mode 100644 codex-rs/common/src/readiness.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 8fe05ac7..b26e893b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -316,6 +316,17 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -653,10 +664,13 @@ dependencies = [ name = "codex-common" version = "0.0.0" dependencies = [ + "async-trait", "clap", "codex-core", "codex-protocol", "serde", + "thiserror 2.0.16", + "tokio", "toml", ] diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index de1c9e49..8303bda5 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -62,6 +62,7 @@ askama = "0.12" assert_cmd = "2" async-channel = "2.3.1" async-stream = "0.3.6" +async-trait = "0.1.89" base64 = "0.22.1" bytes = "1.10.1" chrono = "0.4.40" diff --git a/codex-rs/common/Cargo.toml b/codex-rs/common/Cargo.toml index 3ce84a6f..fe44be75 100644 --- a/codex-rs/common/Cargo.toml +++ b/codex-rs/common/Cargo.toml @@ -7,11 +7,14 @@ version = { workspace = true } workspace = true [dependencies] +async-trait = { workspace = true } clap = { workspace = true, features = ["derive", "wrap_help"], optional = true } codex-core = { workspace = true } codex-protocol = { workspace = true } serde = { workspace = true, optional = true } toml = { workspace = true, optional = true } +thiserror = { workspace = true } +tokio = { workspace = true } [features] # Separate feature so that `clap` is not a mandatory dependency. diff --git a/codex-rs/common/src/lib.rs b/codex-rs/common/src/lib.rs index 292503f7..2c5d91e1 100644 --- a/codex-rs/common/src/lib.rs +++ b/codex-rs/common/src/lib.rs @@ -34,3 +34,5 @@ pub mod model_presets; // Shared approval presets (AskForApproval + Sandbox) used by TUI and MCP server // Not to be confused with AskForApproval, which we should probably rename to EscalationPolicy. pub mod approval_presets; +// Readiness flag with token-based authorization and async waiting (Tokio). +pub mod readiness; diff --git a/codex-rs/common/src/readiness.rs b/codex-rs/common/src/readiness.rs new file mode 100644 index 00000000..9ff15e01 --- /dev/null +++ b/codex-rs/common/src/readiness.rs @@ -0,0 +1,249 @@ +//! Readiness flag with token-based authorization and async waiting (Tokio). + +use std::collections::HashSet; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicI32; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use tokio::sync::Mutex; +use tokio::sync::watch; +use tokio::time; + +/// Opaque subscription token returned by `subscribe()`. +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub struct Token(i32); + +const LOCK_TIMEOUT: Duration = Duration::from_millis(1000); + +#[async_trait::async_trait] +pub trait Readiness: Send + Sync + 'static { + /// Returns true if the flag is currently marked ready. At least one token needs to be marked + /// as ready before. + /// `true` is not reversible. + fn is_ready(&self) -> bool; + + /// Subscribe to readiness and receive an authorization token. + /// + /// If the flag is already ready, returns `FlagAlreadyReady`. + async fn subscribe(&self) -> Result; + + /// Attempt to mark the flag ready, validated by the provided token. + /// + /// Returns `true` iff: + /// - `token` is currently subscribed, and + /// - the flag was not already ready. + async fn mark_ready(&self, token: Token) -> Result; + + /// Asynchronously wait until the flag becomes ready. + async fn wait_ready(&self); +} + +pub struct ReadinessFlag { + /// Atomic for cheap reads. + ready: AtomicBool, + /// Used to generate the next i32 token. + next_id: AtomicI32, + /// Set of active subscriptions. + tokens: Mutex>, + /// Broadcasts readiness to async waiters. + tx: watch::Sender, +} + +impl ReadinessFlag { + /// Create a new, not-yet-ready flag. + pub fn new() -> Self { + let (tx, _rx) = watch::channel(false); + Self { + ready: AtomicBool::new(false), + next_id: AtomicI32::new(1), // Reserve 0. + tokens: Mutex::new(HashSet::new()), + tx, + } + } + + async fn with_tokens( + &self, + f: impl FnOnce(&mut HashSet) -> R, + ) -> Result { + let mut guard = time::timeout(LOCK_TIMEOUT, self.tokens.lock()) + .await + .map_err(|_| errors::ReadinessError::TokenLockFailed)?; + Ok(f(&mut guard)) + } +} + +impl Default for ReadinessFlag { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl Readiness for ReadinessFlag { + fn is_ready(&self) -> bool { + self.ready.load(Ordering::Acquire) + } + + async fn subscribe(&self) -> Result { + if self.is_ready() { + return Err(errors::ReadinessError::FlagAlreadyReady); + } + + // Generate a token; ensure it's not 0. + let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed)); + + // Recheck readiness while holding the lock so mark_ready can't flip the flag between the + // check above and inserting the token. + let inserted = self + .with_tokens(|tokens| { + if self.is_ready() { + return false; + } + tokens.insert(token); + true + }) + .await?; + + if !inserted { + return Err(errors::ReadinessError::FlagAlreadyReady); + } + + Ok(token) + } + + async fn mark_ready(&self, token: Token) -> Result { + if self.is_ready() { + return Ok(false); + } + if token.0 == 0 { + return Ok(false); // Never authorize. + } + + let marked = self + .with_tokens(|set| { + if !set.remove(&token) { + return false; // invalid or already used + } + self.ready.store(true, Ordering::Release); + set.clear(); // no further tokens needed once ready + true + }) + .await?; + if !marked { + return Ok(false); + } + // Best-effort broadcast; ignore error if there are no receivers. + let _ = self.tx.send(true); + Ok(true) + } + + async fn wait_ready(&self) { + if self.is_ready() { + return; + } + let mut rx = self.tx.subscribe(); + // Fast-path check before awaiting. + if *rx.borrow() { + return; + } + // Await changes until true is observed. + while rx.changed().await.is_ok() { + if *rx.borrow() { + break; + } + } + } +} + +mod errors { + use thiserror::Error; + + #[derive(Debug, Error)] + pub enum ReadinessError { + #[error("Failed to acquire readiness token lock")] + TokenLockFailed, + #[error("Flag is already ready. Impossible to subscribe")] + FlagAlreadyReady, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::Readiness; + use super::ReadinessFlag; + use super::Token; + use super::errors::ReadinessError; + + #[tokio::test] + async fn subscribe_and_mark_ready_roundtrip() -> Result<(), ReadinessError> { + let flag = ReadinessFlag::new(); + let token = flag.subscribe().await?; + + assert!(flag.mark_ready(token).await?); + assert!(flag.is_ready()); + Ok(()) + } + + #[tokio::test] + async fn subscribe_after_ready_returns_none() -> Result<(), ReadinessError> { + let flag = ReadinessFlag::new(); + let token = flag.subscribe().await?; + assert!(flag.mark_ready(token).await?); + + assert!(flag.subscribe().await.is_err()); + Ok(()) + } + + #[tokio::test] + async fn mark_ready_rejects_unknown_token() -> Result<(), ReadinessError> { + let flag = ReadinessFlag::new(); + assert!(!flag.mark_ready(Token(42)).await?); + assert!(!flag.is_ready()); + Ok(()) + } + + #[tokio::test] + async fn wait_ready_unblocks_after_mark_ready() -> Result<(), ReadinessError> { + let flag = Arc::new(ReadinessFlag::new()); + let token = flag.subscribe().await?; + + let waiter = { + let flag = Arc::clone(&flag); + tokio::spawn(async move { + flag.wait_ready().await; + }) + }; + + assert!(flag.mark_ready(token).await?); + waiter.await.expect("waiting task should not panic"); + Ok(()) + } + + #[tokio::test] + async fn mark_ready_twice_uses_single_token() -> Result<(), ReadinessError> { + let flag = ReadinessFlag::new(); + let token = flag.subscribe().await?; + + assert!(flag.mark_ready(token).await?); + assert!(!flag.mark_ready(token).await?); + Ok(()) + } + + #[tokio::test] + async fn subscribe_returns_error_when_lock_is_held() { + let flag = ReadinessFlag::new(); + let _guard = flag + .tokens + .try_lock() + .expect("initial lock acquisition should succeed"); + + let err = flag + .subscribe() + .await + .expect_err("contended subscribe should report a lock failure"); + assert!(matches!(err, ReadinessError::TokenLockFailed)); + } +}