Add notifier tests (#4064)

Proposal:
1. Use anyhow for tests and avoid unwrap
2. Extract a helper for starting a test instance of codex
This commit is contained in:
pakrym-oai
2025-09-23 07:25:46 -07:00
committed by GitHub
parent c93e77b68b
commit 5c7d9e27b1
8 changed files with 214 additions and 49 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1076,6 +1076,7 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
name = "core_test_support"
version = "0.0.0"
dependencies = [
"anyhow",
"codex-core",
"serde_json",
"tempfile",

View File

@@ -11,6 +11,7 @@ use crate::AuthManager;
use crate::client_common::REVIEW_PROMPT;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::review_format::format_review_findings_block;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
@@ -186,7 +187,7 @@ impl Codex {
base_instructions: config.base_instructions.clone(),
approval_policy: config.approval_policy,
sandbox_policy: config.sandbox_policy.clone(),
notify: config.notify.clone(),
notify: UserNotifier::new(config.notify.clone()),
cwd: config.cwd.clone(),
};
@@ -274,9 +275,7 @@ pub(crate) struct Session {
session_manager: ExecSessionManager,
unified_exec_manager: UnifiedExecSessionManager,
/// External notifier command (will be passed as args to exec()). When
/// `None` this feature is disabled.
notify: Option<Vec<String>>,
notifier: UserNotifier,
/// Optional rollout recorder for persisting the conversation transcript so
/// sessions can be replayed or inspected later.
@@ -335,10 +334,7 @@ struct ConfigureSession {
/// How to sandbox commands executed in the system
sandbox_policy: SandboxPolicy,
/// Optional external notifier command tokens. Present only when the
/// client wants the agent to spawn a program after each completed
/// turn.
notify: Option<Vec<String>>,
notify: UserNotifier,
/// Working directory that should be treated as the *root* of the
/// session. All relative paths supplied by the model as well as the
@@ -480,7 +476,7 @@ impl Session {
mcp_connection_manager,
session_manager: ExecSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notify,
notifier: notify,
state: Mutex::new(state),
rollout: Mutex::new(Some(rollout_recorder)),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
@@ -586,7 +582,7 @@ impl Session {
command: Vec<String>,
cwd: PathBuf,
reason: Option<String>,
) -> oneshot::Receiver<ReviewDecision> {
) -> ReviewDecision {
// Add the tx_approve callback to the map before sending the request.
let (tx_approve, rx_approve) = oneshot::channel();
let event_id = sub_id.clone();
@@ -608,7 +604,7 @@ impl Session {
}),
};
self.send_event(event).await;
rx_approve
rx_approve.await.unwrap_or_default()
}
pub async fn request_patch_approval(
@@ -1034,33 +1030,8 @@ impl Session {
}
}
/// Spawn the configured notifier (if any) with the given JSON payload as
/// the last argument. Failures are logged but otherwise ignored so that
/// notification issues do not interfere with the main workflow.
fn maybe_notify(&self, notification: UserNotification) {
let Some(notify_command) = &self.notify else {
return;
};
if notify_command.is_empty() {
return;
}
let Ok(json) = serde_json::to_string(&notification) else {
error!("failed to serialise notification payload");
return;
};
let mut command = std::process::Command::new(&notify_command[0]);
if notify_command.len() > 1 {
command.args(&notify_command[1..]);
}
command.arg(json);
// Fire-and-forget we do not wait for completion.
if let Err(e) = command.spawn() {
warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
}
pub(crate) fn notifier(&self) -> &UserNotifier {
&self.notifier
}
}
@@ -1883,11 +1854,12 @@ async fn run_task(
last_agent_message = get_last_assistant_message_from_turn(
&items_to_record_in_conversation_history,
);
sess.maybe_notify(UserNotification::AgentTurnComplete {
turn_id: sub_id.clone(),
input_messages: turn_input_messages,
last_assistant_message: last_agent_message.clone(),
});
sess.notifier()
.notify(&UserNotification::AgentTurnComplete {
turn_id: sub_id.clone(),
input_messages: turn_input_messages,
last_assistant_message: last_agent_message.clone(),
});
break;
}
continue;
@@ -2842,7 +2814,7 @@ async fn handle_container_exec_with_params(
let sandbox_type = match safety {
SafetyCheck::AutoApprove { sandbox_type } => sandbox_type,
SafetyCheck::AskUser => {
let rx_approve = sess
let decision = sess
.request_command_approval(
sub_id.clone(),
call_id.clone(),
@@ -2851,7 +2823,7 @@ async fn handle_container_exec_with_params(
params.justification.clone(),
)
.await;
match rx_approve.await.unwrap_or_default() {
match decision {
ReviewDecision::Approved => (),
ReviewDecision::ApprovedForSession => {
sess.add_approved_command(params.command.clone()).await;
@@ -3012,7 +2984,7 @@ async fn handle_sandbox_error(
sess.notify_background_event(&sub_id, format!("Execution failed: {error}"))
.await;
let rx_approve = sess
let decision = sess
.request_command_approval(
sub_id.clone(),
call_id.clone(),
@@ -3022,7 +2994,7 @@ async fn handle_sandbox_error(
)
.await;
match rx_approve.await.unwrap_or_default() {
match decision {
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
// Persist this command as preapproved for the
// remainder of the session so future
@@ -3642,7 +3614,7 @@ mod tests {
mcp_connection_manager: McpConnectionManager::default(),
session_manager: ExecSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notify: None,
notifier: UserNotifier::default(),
rollout: Mutex::new(None),
state: Mutex::new(State {
history: ConversationHistory::new(),

View File

@@ -1,4 +1,45 @@
use serde::Serialize;
use tracing::error;
use tracing::warn;
#[derive(Debug, Default)]
pub(crate) struct UserNotifier {
notify_command: Option<Vec<String>>,
}
impl UserNotifier {
pub(crate) fn notify(&self, notification: &UserNotification) {
if let Some(notify_command) = &self.notify_command
&& !notify_command.is_empty()
{
self.invoke_notify(notify_command, notification)
}
}
fn invoke_notify(&self, notify_command: &[String], notification: &UserNotification) {
let Ok(json) = serde_json::to_string(&notification) else {
error!("failed to serialise notification payload");
return;
};
let mut command = std::process::Command::new(&notify_command[0]);
if notify_command.len() > 1 {
command.args(&notify_command[1..]);
}
command.arg(json);
// Fire-and-forget we do not wait for completion.
if let Err(e) = command.spawn() {
warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
}
}
pub(crate) fn new(notify: Option<Vec<String>>) -> Self {
Self {
notify_command: notify,
}
}
}
/// User can configure a program that will receive notifications. Each
/// notification is serialized as JSON and passed as an argument to the

View File

@@ -1,12 +1,13 @@
[package]
edition = "2024"
name = "core_test_support"
version = { workspace = true }
edition = "2024"
[lib]
path = "lib.rs"
[dependencies]
anyhow = { workspace = true }
codex-core = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }

View File

@@ -8,6 +8,7 @@ use codex_core::config::ConfigOverrides;
use codex_core::config::ConfigToml;
pub mod responses;
pub mod test_codex;
/// Returns a default `Config` whose on-disk state is confined to the provided
/// temporary directory. Using a per-test directory keeps tests hermetic and

View File

@@ -0,0 +1,75 @@
use std::mem::swap;
use std::sync::Arc;
use codex_core::CodexAuth;
use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::built_in_model_providers;
use codex_core::config::Config;
use codex_core::protocol::SessionConfiguredEvent;
use tempfile::TempDir;
use crate::load_default_config_for_test;
type ConfigMutator = dyn FnOnce(&mut Config);
pub struct TestCodexBuilder {
config_mutators: Vec<Box<ConfigMutator>>,
}
impl TestCodexBuilder {
pub fn with_config<T>(mut self, mutator: T) -> Self
where
T: FnOnce(&mut Config) + 'static,
{
self.config_mutators.push(Box::new(mutator));
self
}
pub async fn build(&mut self, server: &wiremock::MockServer) -> anyhow::Result<TestCodex> {
// Build config pointing to the mock server and spawn Codex.
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let home = TempDir::new()?;
let cwd = TempDir::new()?;
let mut config = load_default_config_for_test(&home);
config.cwd = cwd.path().to_path_buf();
config.model_provider = model_provider;
let mut mutators = vec![];
swap(&mut self.config_mutators, &mut mutators);
for mutator in mutators {
mutator(&mut config)
}
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let NewConversation {
conversation,
session_configured,
..
} = conversation_manager.new_conversation(config).await?;
Ok(TestCodex {
home,
cwd,
codex: conversation,
session_configured,
})
}
}
pub struct TestCodex {
pub home: TempDir,
pub cwd: TempDir,
pub codex: Arc<CodexConversation>,
pub session_configured: SessionConfiguredEvent,
}
pub fn test_codex() -> TestCodexBuilder {
TestCodexBuilder {
config_mutators: vec![],
}
}

View File

@@ -15,3 +15,4 @@ mod rollout_list_find;
mod seatbelt;
mod stream_error_allows_next_turn;
mod stream_no_completed;
mod user_notification;

View File

@@ -0,0 +1,73 @@
#![cfg(not(target_os = "windows"))]
use std::os::unix::fs::PermissionsExt;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use core_test_support::non_sandbox_test;
use core_test_support::responses;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use tempfile::TempDir;
use wiremock::matchers::any;
use responses::ev_assistant_message;
use responses::ev_completed;
use responses::sse;
use responses::start_mock_server;
use tokio::time::Duration;
use tokio::time::sleep;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn summarize_context_three_requests_and_instructions() -> anyhow::Result<()> {
non_sandbox_test!(result);
let server = start_mock_server().await;
let sse1 = sse(vec![ev_assistant_message("m1", "Done"), ev_completed("r1")]);
responses::mount_sse_once(&server, any(), sse1).await;
let notify_dir = TempDir::new()?;
// write a script to the notify that touches a file next to it
let notify_script = notify_dir.path().join("notify.sh");
std::fs::write(
&notify_script,
r#"#!/bin/bash
set -e
echo -n "${@: -1}" > $(dirname "${0}")/notify.txt"#,
)?;
std::fs::set_permissions(&notify_script, std::fs::Permissions::from_mode(0o755))?;
let notify_file = notify_dir.path().join("notify.txt");
let notify_script_str = notify_script.to_str().unwrap().to_string();
let TestCodex { codex, .. } = test_codex()
.with_config(move |cfg| cfg.notify = Some(vec![notify_script_str]))
.build(&server)
.await?;
// 1) Normal user input should hit server once.
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello world".into(),
}],
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// We fork the notify script, so we need to wait for it to write to the file.
for _ in 0..100u32 {
if notify_file.exists() {
break;
}
sleep(Duration::from_millis(100)).await;
}
assert!(notify_file.exists());
Ok(())
}