From a180ed44e8cfe9674ad9980d3b3509269bdaab77 Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Fri, 2 May 2025 19:48:13 -0700 Subject: [PATCH] feat: configurable notifications in the Rust CLI (#793) With this change, you can specify a program that will be executed to get notified about events generated by Codex. The notification info will be packaged as a JSON object. The supported notification types are defined by the `UserNotification` enum introduced in this PR. Initially, it contains only one variant, `AgentTurnComplete`: ```rust pub(crate) enum UserNotification { #[serde(rename_all = "kebab-case")] AgentTurnComplete { turn_id: String, /// Messages that the user sent to the agent to initiate the turn. input_messages: Vec, /// The last message sent by the assistant in the turn. last_assistant_message: Option, }, } ``` This is intended to support the common case when a "turn" ends, which often means it is now your chance to give Codex further instructions. For example, I have the following in my `~/.codex/config.toml`: ```toml notify = ["python3", "/Users/mbolin/.codex/notify.py"] ``` I created my own custom notifier script that calls out to [terminal-notifier](https://github.com/julienXX/terminal-notifier) to show a desktop push notification on macOS. Contents of `notify.py`: ```python #!/usr/bin/env python3 import json import subprocess import sys def main() -> int: if len(sys.argv) != 2: print("Usage: notify.py ") return 1 try: notification = json.loads(sys.argv[1]) except json.JSONDecodeError: return 1 match notification_type := notification.get("type"): case "agent-turn-complete": assistant_message = notification.get("last-assistant-message") if assistant_message: title = f"Codex: {assistant_message}" else: title = "Codex: Turn Complete!" input_messages = notification.get("input_messages", []) message = " ".join(input_messages) title += message case _: print(f"not sending a push notification for: {notification_type}") return 0 subprocess.check_output( [ "terminal-notifier", "-title", title, "-message", message, "-group", "codex", "-ignoreDnD", "-activate", "com.googlecode.iterm2", ] ) return 0 if __name__ == "__main__": sys.exit(main()) ``` For reference, here are related PRs that tried to add this functionality to the TypeScript version of the Codex CLI: * https://github.com/openai/codex/pull/160 * https://github.com/openai/codex/pull/498 --- codex-rs/core/src/codex.rs | 76 +++++++++++++++++++++ codex-rs/core/src/codex_wrapper.rs | 1 + codex-rs/core/src/config.rs | 27 ++++++++ codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/protocol.rs | 7 ++ codex-rs/core/src/user_notification.rs | 40 +++++++++++ codex-rs/core/tests/live_agent.rs | 1 + codex-rs/core/tests/previous_response_id.rs | 1 + codex-rs/core/tests/stream_no_completed.rs | 1 + 9 files changed, 155 insertions(+) create mode 100644 codex-rs/core/src/user_notification.rs diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 384011e3..da2c6288 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -17,6 +17,7 @@ use codex_apply_patch::MaybeApplyPatchVerified; use fs_err as fs; use futures::prelude::*; use serde::Serialize; +use serde_json; use tokio::sync::oneshot; use tokio::sync::Notify; use tokio::task::AbortHandle; @@ -51,6 +52,7 @@ use crate::protocol::Submission; use crate::safety::assess_command_safety; use crate::safety::assess_patch_safety; use crate::safety::SafetyCheck; +use crate::user_notification::UserNotification; use crate::util::backoff; use crate::zdr_transcript::ZdrTranscript; @@ -193,6 +195,10 @@ struct Session { sandbox_policy: SandboxPolicy, writable_roots: Mutex>, + /// External notifier command (will be passed as args to exec()). When + /// `None` this feature is disabled. + notify: Option>, + state: Mutex, } @@ -377,6 +383,35 @@ impl Session { task.abort(); } } + + /// Spawn the configured notifier (if any) with the given JSON payload as + /// the last argument. Failures are logged but otherwise ignored so that + /// notification issues do not interfere with the main workflow. + fn maybe_notify(&self, notification: UserNotification) { + let Some(notify_command) = &self.notify else { + return; + }; + + if notify_command.is_empty() { + return; + } + + let Ok(json) = serde_json::to_string(¬ification) else { + tracing::error!("failed to serialise notification payload"); + return; + }; + + let mut command = std::process::Command::new(¬ify_command[0]); + if notify_command.len() > 1 { + command.args(¬ify_command[1..]); + } + command.arg(json); + + // Fire-and-forget – we do not wait for completion. + if let Err(e) = command.spawn() { + tracing::warn!("failed to spawn notifier '{}': {e}", notify_command[0]); + } + } } impl Drop for Session { @@ -482,6 +517,7 @@ async fn submission_loop( approval_policy, sandbox_policy, disable_response_storage, + notify, } => { info!(model, "Configuring session"); let client = ModelClient::new(model.clone()); @@ -511,6 +547,7 @@ async fn submission_loop( approval_policy, sandbox_policy, writable_roots: Mutex::new(get_writable_roots()), + notify, state: Mutex::new(state), })); @@ -610,6 +647,19 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { net_new_turn_input }; + let turn_input_messages: Vec = turn_input + .iter() + .filter_map(|item| match item { + ResponseItem::Message { content, .. } => Some(content), + _ => None, + }) + .flat_map(|content| { + content.iter().filter_map(|item| match item { + ContentItem::OutputText { text } => Some(text.clone()), + _ => None, + }) + }) + .collect(); match run_turn(&sess, sub_id.clone(), turn_input).await { Ok(turn_output) => { let (items, responses): (Vec<_>, Vec<_>) = turn_output @@ -620,6 +670,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { .into_iter() .flatten() .collect::>(); + let last_assistant_message = get_last_assistant_message_from_turn(&items); // Only attempt to take the lock if there is something to record. if !items.is_empty() { @@ -630,6 +681,11 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { if responses.is_empty() { debug!("Turn completed"); + sess.maybe_notify(UserNotification::AgentTurnComplete { + turn_id: sub_id.clone(), + input_messages: turn_input_messages, + last_assistant_message, + }); break; } @@ -1485,3 +1541,23 @@ fn format_exec_output(output: &str, exit_code: i32, duration: std::time::Duratio serde_json::to_string(&payload).expect("serialize ExecOutput") } + +fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { + responses.iter().rev().find_map(|item| { + if let ResponseItem::Message { role, content } = item { + if role == "assistant" { + content.iter().rev().find_map(|ci| { + if let ContentItem::OutputText { text } = ci { + Some(text.clone()) + } else { + None + } + }) + } else { + None + } + } else { + None + } + }) +} diff --git a/codex-rs/core/src/codex_wrapper.rs b/codex-rs/core/src/codex_wrapper.rs index 146a812e..223b051d 100644 --- a/codex-rs/core/src/codex_wrapper.rs +++ b/codex-rs/core/src/codex_wrapper.rs @@ -25,6 +25,7 @@ pub async fn init_codex(config: Config) -> anyhow::Result<(CodexWrapper, Event, approval_policy: config.approval_policy, sandbox_policy: config.sandbox_policy, disable_response_storage: config.disable_response_storage, + notify: config.notify.clone(), }) .await?; diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index c9bfa138..0ab77ada 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -30,6 +30,28 @@ pub struct Config { /// System instructions. pub instructions: Option, + + /// Optional external notifier command. When set, Codex will spawn this + /// program after each completed *turn* (i.e. when the agent finishes + /// processing a user submission). The value must be the full command + /// broken into argv tokens **without** the trailing JSON argument - Codex + /// appends one extra argument containing a JSON payload describing the + /// event. + /// + /// Example `~/.codex/config.toml` snippet: + /// + /// ```toml + /// notify = ["notify-send", "Codex"] + /// ``` + /// + /// which will be invoked as: + /// + /// ```shell + /// notify-send Codex '{"type":"agent-turn-complete","turn-id":"12345"}' + /// ``` + /// + /// If unset the feature is disabled. + pub notify: Option>, } /// Base config deserialized from ~/.codex/config.toml. @@ -52,6 +74,10 @@ pub struct ConfigToml { /// who have opted into Zero Data Retention (ZDR). pub disable_response_storage: Option, + /// Optional external command to spawn for end-user notifications. + #[serde(default)] + pub notify: Option>, + /// System instructions. pub instructions: Option, } @@ -161,6 +187,7 @@ impl Config { disable_response_storage: disable_response_storage .or(cfg.disable_response_storage) .unwrap_or(false), + notify: cfg.notify, instructions, } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index b1c746be..a5909ed6 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -18,6 +18,7 @@ pub mod linux; mod models; pub mod protocol; mod safety; +mod user_notification; pub mod util; mod zdr_transcript; diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 5c2d35c1..d19a5386 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -36,6 +36,13 @@ pub enum Op { /// Disable server-side response storage (send full context each request) #[serde(default)] disable_response_storage: bool, + + /// Optional external notifier command tokens. Present only when the + /// client wants the agent to spawn a program after each completed + /// turn. + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + notify: Option>, }, /// Abort current task. diff --git a/codex-rs/core/src/user_notification.rs b/codex-rs/core/src/user_notification.rs new file mode 100644 index 00000000..0a3cb49e --- /dev/null +++ b/codex-rs/core/src/user_notification.rs @@ -0,0 +1,40 @@ +use serde::Serialize; + +/// User can configure a program that will receive notifications. Each +/// notification is serialized as JSON and passed as an argument to the +/// program. +#[derive(Debug, Clone, PartialEq, Serialize)] +#[serde(tag = "type", rename_all = "kebab-case")] +pub(crate) enum UserNotification { + #[serde(rename_all = "kebab-case")] + AgentTurnComplete { + turn_id: String, + + /// Messages that the user sent to the agent to initiate the turn. + input_messages: Vec, + + /// The last message sent by the assistant in the turn. + last_assistant_message: Option, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_user_notification() { + let notification = UserNotification::AgentTurnComplete { + turn_id: "12345".to_string(), + input_messages: vec!["Rename `foo` to `bar` and update the callsites.".to_string()], + last_assistant_message: Some( + "Rename complete and verified `cargo build` succeeds.".to_string(), + ), + }; + let serialized = serde_json::to_string(¬ification).unwrap(); + assert_eq!( + serialized, + r#"{"type":"agent-turn-complete","turn-id":"12345","input-messages":["Rename `foo` to `bar` and update the callsites."],"last-assistant-message":"Rename complete and verified `cargo build` succeeds."}"# + ); + } +} diff --git a/codex-rs/core/tests/live_agent.rs b/codex-rs/core/tests/live_agent.rs index 7d2be33d..b780a287 100644 --- a/codex-rs/core/tests/live_agent.rs +++ b/codex-rs/core/tests/live_agent.rs @@ -57,6 +57,7 @@ async fn spawn_codex() -> Codex { approval_policy: config.approval_policy, sandbox_policy: SandboxPolicy::new_read_only_policy(), disable_response_storage: false, + notify: None, }, }) .await diff --git a/codex-rs/core/tests/previous_response_id.rs b/codex-rs/core/tests/previous_response_id.rs index c83d49ee..9410f7b5 100644 --- a/codex-rs/core/tests/previous_response_id.rs +++ b/codex-rs/core/tests/previous_response_id.rs @@ -97,6 +97,7 @@ async fn keeps_previous_response_id_between_tasks() { approval_policy: config.approval_policy, sandbox_policy: SandboxPolicy::new_read_only_policy(), disable_response_storage: false, + notify: None, }, }) .await diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index e64281e3..858850f9 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -80,6 +80,7 @@ async fn retries_on_early_close() { approval_policy: config.approval_policy, sandbox_policy: SandboxPolicy::new_read_only_policy(), disable_response_storage: false, + notify: None, }, }) .await