feat: readiness tool (#4090)
Readiness flag with token-based subscription and async wait function that waits for all the subscribers to be ready
This commit is contained in:
14
codex-rs/Cargo.lock
generated
14
codex-rs/Cargo.lock
generated
@@ -316,6 +316,17 @@ dependencies = [
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.89"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
@@ -653,10 +664,13 @@ dependencies = [
|
||||
name = "codex-common"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap",
|
||||
"codex-core",
|
||||
"codex-protocol",
|
||||
"serde",
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
"toml",
|
||||
]
|
||||
|
||||
|
||||
@@ -62,6 +62,7 @@ askama = "0.12"
|
||||
assert_cmd = "2"
|
||||
async-channel = "2.3.1"
|
||||
async-stream = "0.3.6"
|
||||
async-trait = "0.1.89"
|
||||
base64 = "0.22.1"
|
||||
bytes = "1.10.1"
|
||||
chrono = "0.4.40"
|
||||
|
||||
@@ -7,11 +7,14 @@ version = { workspace = true }
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive", "wrap_help"], optional = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
serde = { workspace = true, optional = true }
|
||||
toml = { workspace = true, optional = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
||||
[features]
|
||||
# Separate feature so that `clap` is not a mandatory dependency.
|
||||
|
||||
@@ -34,3 +34,5 @@ pub mod model_presets;
|
||||
// Shared approval presets (AskForApproval + Sandbox) used by TUI and MCP server
|
||||
// Not to be confused with AskForApproval, which we should probably rename to EscalationPolicy.
|
||||
pub mod approval_presets;
|
||||
// Readiness flag with token-based authorization and async waiting (Tokio).
|
||||
pub mod readiness;
|
||||
|
||||
249
codex-rs/common/src/readiness.rs
Normal file
249
codex-rs/common/src/readiness.rs
Normal file
@@ -0,0 +1,249 @@
|
||||
//! Readiness flag with token-based authorization and async waiting (Tokio).
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time;
|
||||
|
||||
/// Opaque subscription token returned by `subscribe()`.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||
pub struct Token(i32);
|
||||
|
||||
const LOCK_TIMEOUT: Duration = Duration::from_millis(1000);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Readiness: Send + Sync + 'static {
|
||||
/// Returns true if the flag is currently marked ready. At least one token needs to be marked
|
||||
/// as ready before.
|
||||
/// `true` is not reversible.
|
||||
fn is_ready(&self) -> bool;
|
||||
|
||||
/// Subscribe to readiness and receive an authorization token.
|
||||
///
|
||||
/// If the flag is already ready, returns `FlagAlreadyReady`.
|
||||
async fn subscribe(&self) -> Result<Token, errors::ReadinessError>;
|
||||
|
||||
/// Attempt to mark the flag ready, validated by the provided token.
|
||||
///
|
||||
/// Returns `true` iff:
|
||||
/// - `token` is currently subscribed, and
|
||||
/// - the flag was not already ready.
|
||||
async fn mark_ready(&self, token: Token) -> Result<bool, errors::ReadinessError>;
|
||||
|
||||
/// Asynchronously wait until the flag becomes ready.
|
||||
async fn wait_ready(&self);
|
||||
}
|
||||
|
||||
pub struct ReadinessFlag {
|
||||
/// Atomic for cheap reads.
|
||||
ready: AtomicBool,
|
||||
/// Used to generate the next i32 token.
|
||||
next_id: AtomicI32,
|
||||
/// Set of active subscriptions.
|
||||
tokens: Mutex<HashSet<Token>>,
|
||||
/// Broadcasts readiness to async waiters.
|
||||
tx: watch::Sender<bool>,
|
||||
}
|
||||
|
||||
impl ReadinessFlag {
|
||||
/// Create a new, not-yet-ready flag.
|
||||
pub fn new() -> Self {
|
||||
let (tx, _rx) = watch::channel(false);
|
||||
Self {
|
||||
ready: AtomicBool::new(false),
|
||||
next_id: AtomicI32::new(1), // Reserve 0.
|
||||
tokens: Mutex::new(HashSet::new()),
|
||||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
async fn with_tokens<R>(
|
||||
&self,
|
||||
f: impl FnOnce(&mut HashSet<Token>) -> R,
|
||||
) -> Result<R, errors::ReadinessError> {
|
||||
let mut guard = time::timeout(LOCK_TIMEOUT, self.tokens.lock())
|
||||
.await
|
||||
.map_err(|_| errors::ReadinessError::TokenLockFailed)?;
|
||||
Ok(f(&mut guard))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ReadinessFlag {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Readiness for ReadinessFlag {
|
||||
fn is_ready(&self) -> bool {
|
||||
self.ready.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
async fn subscribe(&self) -> Result<Token, errors::ReadinessError> {
|
||||
if self.is_ready() {
|
||||
return Err(errors::ReadinessError::FlagAlreadyReady);
|
||||
}
|
||||
|
||||
// Generate a token; ensure it's not 0.
|
||||
let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed));
|
||||
|
||||
// Recheck readiness while holding the lock so mark_ready can't flip the flag between the
|
||||
// check above and inserting the token.
|
||||
let inserted = self
|
||||
.with_tokens(|tokens| {
|
||||
if self.is_ready() {
|
||||
return false;
|
||||
}
|
||||
tokens.insert(token);
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
|
||||
if !inserted {
|
||||
return Err(errors::ReadinessError::FlagAlreadyReady);
|
||||
}
|
||||
|
||||
Ok(token)
|
||||
}
|
||||
|
||||
async fn mark_ready(&self, token: Token) -> Result<bool, errors::ReadinessError> {
|
||||
if self.is_ready() {
|
||||
return Ok(false);
|
||||
}
|
||||
if token.0 == 0 {
|
||||
return Ok(false); // Never authorize.
|
||||
}
|
||||
|
||||
let marked = self
|
||||
.with_tokens(|set| {
|
||||
if !set.remove(&token) {
|
||||
return false; // invalid or already used
|
||||
}
|
||||
self.ready.store(true, Ordering::Release);
|
||||
set.clear(); // no further tokens needed once ready
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
if !marked {
|
||||
return Ok(false);
|
||||
}
|
||||
// Best-effort broadcast; ignore error if there are no receivers.
|
||||
let _ = self.tx.send(true);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn wait_ready(&self) {
|
||||
if self.is_ready() {
|
||||
return;
|
||||
}
|
||||
let mut rx = self.tx.subscribe();
|
||||
// Fast-path check before awaiting.
|
||||
if *rx.borrow() {
|
||||
return;
|
||||
}
|
||||
// Await changes until true is observed.
|
||||
while rx.changed().await.is_ok() {
|
||||
if *rx.borrow() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod errors {
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ReadinessError {
|
||||
#[error("Failed to acquire readiness token lock")]
|
||||
TokenLockFailed,
|
||||
#[error("Flag is already ready. Impossible to subscribe")]
|
||||
FlagAlreadyReady,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::Readiness;
|
||||
use super::ReadinessFlag;
|
||||
use super::Token;
|
||||
use super::errors::ReadinessError;
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_and_mark_ready_roundtrip() -> Result<(), ReadinessError> {
|
||||
let flag = ReadinessFlag::new();
|
||||
let token = flag.subscribe().await?;
|
||||
|
||||
assert!(flag.mark_ready(token).await?);
|
||||
assert!(flag.is_ready());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_after_ready_returns_none() -> Result<(), ReadinessError> {
|
||||
let flag = ReadinessFlag::new();
|
||||
let token = flag.subscribe().await?;
|
||||
assert!(flag.mark_ready(token).await?);
|
||||
|
||||
assert!(flag.subscribe().await.is_err());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_ready_rejects_unknown_token() -> Result<(), ReadinessError> {
|
||||
let flag = ReadinessFlag::new();
|
||||
assert!(!flag.mark_ready(Token(42)).await?);
|
||||
assert!(!flag.is_ready());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_ready_unblocks_after_mark_ready() -> Result<(), ReadinessError> {
|
||||
let flag = Arc::new(ReadinessFlag::new());
|
||||
let token = flag.subscribe().await?;
|
||||
|
||||
let waiter = {
|
||||
let flag = Arc::clone(&flag);
|
||||
tokio::spawn(async move {
|
||||
flag.wait_ready().await;
|
||||
})
|
||||
};
|
||||
|
||||
assert!(flag.mark_ready(token).await?);
|
||||
waiter.await.expect("waiting task should not panic");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_ready_twice_uses_single_token() -> Result<(), ReadinessError> {
|
||||
let flag = ReadinessFlag::new();
|
||||
let token = flag.subscribe().await?;
|
||||
|
||||
assert!(flag.mark_ready(token).await?);
|
||||
assert!(!flag.mark_ready(token).await?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_returns_error_when_lock_is_held() {
|
||||
let flag = ReadinessFlag::new();
|
||||
let _guard = flag
|
||||
.tokens
|
||||
.try_lock()
|
||||
.expect("initial lock acquisition should succeed");
|
||||
|
||||
let err = flag
|
||||
.subscribe()
|
||||
.await
|
||||
.expect_err("contended subscribe should report a lock failure");
|
||||
assert!(matches!(err, ReadinessError::TokenLockFailed));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user