diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 71681aa5..768a7cc3 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1076,6 +1076,7 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" name = "core_test_support" version = "0.0.0" dependencies = [ + "anyhow", "codex-core", "serde_json", "tempfile", diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 08b28bde..b89ebc73 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -11,6 +11,7 @@ use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; use crate::event_mapping::map_response_item_to_event_messages; use crate::review_format::format_review_findings_block; +use crate::user_notification::UserNotifier; use async_channel::Receiver; use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; @@ -186,7 +187,7 @@ impl Codex { base_instructions: config.base_instructions.clone(), approval_policy: config.approval_policy, sandbox_policy: config.sandbox_policy.clone(), - notify: config.notify.clone(), + notify: UserNotifier::new(config.notify.clone()), cwd: config.cwd.clone(), }; @@ -274,9 +275,7 @@ pub(crate) struct Session { session_manager: ExecSessionManager, unified_exec_manager: UnifiedExecSessionManager, - /// External notifier command (will be passed as args to exec()). When - /// `None` this feature is disabled. - notify: Option>, + notifier: UserNotifier, /// Optional rollout recorder for persisting the conversation transcript so /// sessions can be replayed or inspected later. @@ -335,10 +334,7 @@ struct ConfigureSession { /// How to sandbox commands executed in the system sandbox_policy: SandboxPolicy, - /// Optional external notifier command tokens. Present only when the - /// client wants the agent to spawn a program after each completed - /// turn. - notify: Option>, + notify: UserNotifier, /// Working directory that should be treated as the *root* of the /// session. All relative paths supplied by the model as well as the @@ -480,7 +476,7 @@ impl Session { mcp_connection_manager, session_manager: ExecSessionManager::default(), unified_exec_manager: UnifiedExecSessionManager::default(), - notify, + notifier: notify, state: Mutex::new(state), rollout: Mutex::new(Some(rollout_recorder)), codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), @@ -586,7 +582,7 @@ impl Session { command: Vec, cwd: PathBuf, reason: Option, - ) -> oneshot::Receiver { + ) -> ReviewDecision { // Add the tx_approve callback to the map before sending the request. let (tx_approve, rx_approve) = oneshot::channel(); let event_id = sub_id.clone(); @@ -608,7 +604,7 @@ impl Session { }), }; self.send_event(event).await; - rx_approve + rx_approve.await.unwrap_or_default() } pub async fn request_patch_approval( @@ -1034,33 +1030,8 @@ impl Session { } } - /// Spawn the configured notifier (if any) with the given JSON payload as - /// the last argument. Failures are logged but otherwise ignored so that - /// notification issues do not interfere with the main workflow. - fn maybe_notify(&self, notification: UserNotification) { - let Some(notify_command) = &self.notify else { - return; - }; - - if notify_command.is_empty() { - return; - } - - let Ok(json) = serde_json::to_string(¬ification) else { - error!("failed to serialise notification payload"); - return; - }; - - let mut command = std::process::Command::new(¬ify_command[0]); - if notify_command.len() > 1 { - command.args(¬ify_command[1..]); - } - command.arg(json); - - // Fire-and-forget – we do not wait for completion. - if let Err(e) = command.spawn() { - warn!("failed to spawn notifier '{}': {e}", notify_command[0]); - } + pub(crate) fn notifier(&self) -> &UserNotifier { + &self.notifier } } @@ -1883,11 +1854,12 @@ async fn run_task( last_agent_message = get_last_assistant_message_from_turn( &items_to_record_in_conversation_history, ); - sess.maybe_notify(UserNotification::AgentTurnComplete { - turn_id: sub_id.clone(), - input_messages: turn_input_messages, - last_assistant_message: last_agent_message.clone(), - }); + sess.notifier() + .notify(&UserNotification::AgentTurnComplete { + turn_id: sub_id.clone(), + input_messages: turn_input_messages, + last_assistant_message: last_agent_message.clone(), + }); break; } continue; @@ -2842,7 +2814,7 @@ async fn handle_container_exec_with_params( let sandbox_type = match safety { SafetyCheck::AutoApprove { sandbox_type } => sandbox_type, SafetyCheck::AskUser => { - let rx_approve = sess + let decision = sess .request_command_approval( sub_id.clone(), call_id.clone(), @@ -2851,7 +2823,7 @@ async fn handle_container_exec_with_params( params.justification.clone(), ) .await; - match rx_approve.await.unwrap_or_default() { + match decision { ReviewDecision::Approved => (), ReviewDecision::ApprovedForSession => { sess.add_approved_command(params.command.clone()).await; @@ -3012,7 +2984,7 @@ async fn handle_sandbox_error( sess.notify_background_event(&sub_id, format!("Execution failed: {error}")) .await; - let rx_approve = sess + let decision = sess .request_command_approval( sub_id.clone(), call_id.clone(), @@ -3022,7 +2994,7 @@ async fn handle_sandbox_error( ) .await; - match rx_approve.await.unwrap_or_default() { + match decision { ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { // Persist this command as pre‑approved for the // remainder of the session so future @@ -3642,7 +3614,7 @@ mod tests { mcp_connection_manager: McpConnectionManager::default(), session_manager: ExecSessionManager::default(), unified_exec_manager: UnifiedExecSessionManager::default(), - notify: None, + notifier: UserNotifier::default(), rollout: Mutex::new(None), state: Mutex::new(State { history: ConversationHistory::new(), diff --git a/codex-rs/core/src/user_notification.rs b/codex-rs/core/src/user_notification.rs index 0a3cb49e..b2a2f4fb 100644 --- a/codex-rs/core/src/user_notification.rs +++ b/codex-rs/core/src/user_notification.rs @@ -1,4 +1,45 @@ use serde::Serialize; +use tracing::error; +use tracing::warn; + +#[derive(Debug, Default)] +pub(crate) struct UserNotifier { + notify_command: Option>, +} + +impl UserNotifier { + pub(crate) fn notify(&self, notification: &UserNotification) { + if let Some(notify_command) = &self.notify_command + && !notify_command.is_empty() + { + self.invoke_notify(notify_command, notification) + } + } + + fn invoke_notify(&self, notify_command: &[String], notification: &UserNotification) { + let Ok(json) = serde_json::to_string(¬ification) else { + error!("failed to serialise notification payload"); + return; + }; + + let mut command = std::process::Command::new(¬ify_command[0]); + if notify_command.len() > 1 { + command.args(¬ify_command[1..]); + } + command.arg(json); + + // Fire-and-forget – we do not wait for completion. + if let Err(e) = command.spawn() { + warn!("failed to spawn notifier '{}': {e}", notify_command[0]); + } + } + + pub(crate) fn new(notify: Option>) -> Self { + Self { + notify_command: notify, + } + } +} /// User can configure a program that will receive notifications. Each /// notification is serialized as JSON and passed as an argument to the diff --git a/codex-rs/core/tests/common/Cargo.toml b/codex-rs/core/tests/common/Cargo.toml index 0a33be39..f85aaa9e 100644 --- a/codex-rs/core/tests/common/Cargo.toml +++ b/codex-rs/core/tests/common/Cargo.toml @@ -1,12 +1,13 @@ [package] +edition = "2024" name = "core_test_support" version = { workspace = true } -edition = "2024" [lib] path = "lib.rs" [dependencies] +anyhow = { workspace = true } codex-core = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 9b86694e..0fdd6038 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -8,6 +8,7 @@ use codex_core::config::ConfigOverrides; use codex_core::config::ConfigToml; pub mod responses; +pub mod test_codex; /// Returns a default `Config` whose on-disk state is confined to the provided /// temporary directory. Using a per-test directory keeps tests hermetic and diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs new file mode 100644 index 00000000..65b25b13 --- /dev/null +++ b/codex-rs/core/tests/common/test_codex.rs @@ -0,0 +1,75 @@ +use std::mem::swap; +use std::sync::Arc; + +use codex_core::CodexAuth; +use codex_core::CodexConversation; +use codex_core::ConversationManager; +use codex_core::ModelProviderInfo; +use codex_core::NewConversation; +use codex_core::built_in_model_providers; +use codex_core::config::Config; +use codex_core::protocol::SessionConfiguredEvent; +use tempfile::TempDir; + +use crate::load_default_config_for_test; + +type ConfigMutator = dyn FnOnce(&mut Config); + +pub struct TestCodexBuilder { + config_mutators: Vec>, +} + +impl TestCodexBuilder { + pub fn with_config(mut self, mutator: T) -> Self + where + T: FnOnce(&mut Config) + 'static, + { + self.config_mutators.push(Box::new(mutator)); + self + } + + pub async fn build(&mut self, server: &wiremock::MockServer) -> anyhow::Result { + // Build config pointing to the mock server and spawn Codex. + let model_provider = ModelProviderInfo { + base_url: Some(format!("{}/v1", server.uri())), + ..built_in_model_providers()["openai"].clone() + }; + let home = TempDir::new()?; + let cwd = TempDir::new()?; + let mut config = load_default_config_for_test(&home); + config.cwd = cwd.path().to_path_buf(); + config.model_provider = model_provider; + let mut mutators = vec![]; + swap(&mut self.config_mutators, &mut mutators); + + for mutator in mutators { + mutator(&mut config) + } + let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy")); + let NewConversation { + conversation, + session_configured, + .. + } = conversation_manager.new_conversation(config).await?; + + Ok(TestCodex { + home, + cwd, + codex: conversation, + session_configured, + }) + } +} + +pub struct TestCodex { + pub home: TempDir, + pub cwd: TempDir, + pub codex: Arc, + pub session_configured: SessionConfiguredEvent, +} + +pub fn test_codex() -> TestCodexBuilder { + TestCodexBuilder { + config_mutators: vec![], + } +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 4c217b63..d4fb4460 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -15,3 +15,4 @@ mod rollout_list_find; mod seatbelt; mod stream_error_allows_next_turn; mod stream_no_completed; +mod user_notification; diff --git a/codex-rs/core/tests/suite/user_notification.rs b/codex-rs/core/tests/suite/user_notification.rs new file mode 100644 index 00000000..c7acf35e --- /dev/null +++ b/codex-rs/core/tests/suite/user_notification.rs @@ -0,0 +1,73 @@ +#![cfg(not(target_os = "windows"))] + +use std::os::unix::fs::PermissionsExt; + +use codex_core::protocol::EventMsg; +use codex_core::protocol::InputItem; +use codex_core::protocol::Op; +use core_test_support::non_sandbox_test; +use core_test_support::responses; +use core_test_support::test_codex::TestCodex; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use tempfile::TempDir; +use wiremock::matchers::any; + +use responses::ev_assistant_message; +use responses::ev_completed; +use responses::sse; +use responses::start_mock_server; +use tokio::time::Duration; +use tokio::time::sleep; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn summarize_context_three_requests_and_instructions() -> anyhow::Result<()> { + non_sandbox_test!(result); + + let server = start_mock_server().await; + + let sse1 = sse(vec![ev_assistant_message("m1", "Done"), ev_completed("r1")]); + + responses::mount_sse_once(&server, any(), sse1).await; + + let notify_dir = TempDir::new()?; + // write a script to the notify that touches a file next to it + let notify_script = notify_dir.path().join("notify.sh"); + std::fs::write( + ¬ify_script, + r#"#!/bin/bash +set -e +echo -n "${@: -1}" > $(dirname "${0}")/notify.txt"#, + )?; + std::fs::set_permissions(¬ify_script, std::fs::Permissions::from_mode(0o755))?; + + let notify_file = notify_dir.path().join("notify.txt"); + let notify_script_str = notify_script.to_str().unwrap().to_string(); + + let TestCodex { codex, .. } = test_codex() + .with_config(move |cfg| cfg.notify = Some(vec![notify_script_str])) + .build(&server) + .await?; + + // 1) Normal user input – should hit server once. + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello world".into(), + }], + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + // We fork the notify script, so we need to wait for it to write to the file. + for _ in 0..100u32 { + if notify_file.exists() { + break; + } + sleep(Duration::from_millis(100)).await; + } + + assert!(notify_file.exists()); + + Ok(()) +}