feat: configurable notifications in the Rust CLI (#793)

With this change, you can specify a program that will be executed to get
notified about events generated by Codex. The notification info will be
packaged as a JSON object. The supported notification types are defined
by the `UserNotification` enum introduced in this PR. Initially, it
contains only one variant, `AgentTurnComplete`:

```rust
pub(crate) enum UserNotification {
    #[serde(rename_all = "kebab-case")]
    AgentTurnComplete {
        turn_id: String,

        /// Messages that the user sent to the agent to initiate the turn.
        input_messages: Vec<String>,

        /// The last message sent by the assistant in the turn.
        last_assistant_message: Option<String>,
    },
}
```

This is intended to support the common case when a "turn" ends, which
often means it is now your chance to give Codex further instructions.

For example, I have the following in my `~/.codex/config.toml`:

```toml
notify = ["python3", "/Users/mbolin/.codex/notify.py"]
```

I created my own custom notifier script that calls out to
[terminal-notifier](https://github.com/julienXX/terminal-notifier) to
show a desktop push notification on macOS. Contents of `notify.py`:

```python
#!/usr/bin/env python3

import json
import subprocess
import sys


def main() -> int:
    if len(sys.argv) != 2:
        print("Usage: notify.py <NOTIFICATION_JSON>")
        return 1

    try:
        notification = json.loads(sys.argv[1])
    except json.JSONDecodeError:
        return 1

    match notification_type := notification.get("type"):
        case "agent-turn-complete":
            assistant_message = notification.get("last-assistant-message")
            if assistant_message:
                title = f"Codex: {assistant_message}"
            else:
                title = "Codex: Turn Complete!"
            input_messages = notification.get("input_messages", [])
            message = " ".join(input_messages)
            title += message
        case _:
            print(f"not sending a push notification for: {notification_type}")
            return 0

    subprocess.check_output(
        [
            "terminal-notifier",
            "-title",
            title,
            "-message",
            message,
            "-group",
            "codex",
            "-ignoreDnD",
            "-activate",
            "com.googlecode.iterm2",
        ]
    )

    return 0


if __name__ == "__main__":
    sys.exit(main())
```

For reference, here are related PRs that tried to add this functionality
to the TypeScript version of the Codex CLI:

* https://github.com/openai/codex/pull/160
* https://github.com/openai/codex/pull/498
This commit is contained in:
Michael Bolin
2025-05-02 19:48:13 -07:00
committed by GitHub
parent 21cd953dbd
commit a180ed44e8
9 changed files with 155 additions and 0 deletions

View File

@@ -17,6 +17,7 @@ use codex_apply_patch::MaybeApplyPatchVerified;
use fs_err as fs;
use futures::prelude::*;
use serde::Serialize;
use serde_json;
use tokio::sync::oneshot;
use tokio::sync::Notify;
use tokio::task::AbortHandle;
@@ -51,6 +52,7 @@ use crate::protocol::Submission;
use crate::safety::assess_command_safety;
use crate::safety::assess_patch_safety;
use crate::safety::SafetyCheck;
use crate::user_notification::UserNotification;
use crate::util::backoff;
use crate::zdr_transcript::ZdrTranscript;
@@ -193,6 +195,10 @@ struct Session {
sandbox_policy: SandboxPolicy,
writable_roots: Mutex<Vec<PathBuf>>,
/// External notifier command (will be passed as args to exec()). When
/// `None` this feature is disabled.
notify: Option<Vec<String>>,
state: Mutex<State>,
}
@@ -377,6 +383,35 @@ impl Session {
task.abort();
}
}
/// Spawn the configured notifier (if any) with the given JSON payload as
/// the last argument. Failures are logged but otherwise ignored so that
/// notification issues do not interfere with the main workflow.
fn maybe_notify(&self, notification: UserNotification) {
let Some(notify_command) = &self.notify else {
return;
};
if notify_command.is_empty() {
return;
}
let Ok(json) = serde_json::to_string(&notification) else {
tracing::error!("failed to serialise notification payload");
return;
};
let mut command = std::process::Command::new(&notify_command[0]);
if notify_command.len() > 1 {
command.args(&notify_command[1..]);
}
command.arg(json);
// Fire-and-forget we do not wait for completion.
if let Err(e) = command.spawn() {
tracing::warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
}
}
}
impl Drop for Session {
@@ -482,6 +517,7 @@ async fn submission_loop(
approval_policy,
sandbox_policy,
disable_response_storage,
notify,
} => {
info!(model, "Configuring session");
let client = ModelClient::new(model.clone());
@@ -511,6 +547,7 @@ async fn submission_loop(
approval_policy,
sandbox_policy,
writable_roots: Mutex::new(get_writable_roots()),
notify,
state: Mutex::new(state),
}));
@@ -610,6 +647,19 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
net_new_turn_input
};
let turn_input_messages: Vec<String> = turn_input
.iter()
.filter_map(|item| match item {
ResponseItem::Message { content, .. } => Some(content),
_ => None,
})
.flat_map(|content| {
content.iter().filter_map(|item| match item {
ContentItem::OutputText { text } => Some(text.clone()),
_ => None,
})
})
.collect();
match run_turn(&sess, sub_id.clone(), turn_input).await {
Ok(turn_output) => {
let (items, responses): (Vec<_>, Vec<_>) = turn_output
@@ -620,6 +670,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
.into_iter()
.flatten()
.collect::<Vec<ResponseInputItem>>();
let last_assistant_message = get_last_assistant_message_from_turn(&items);
// Only attempt to take the lock if there is something to record.
if !items.is_empty() {
@@ -630,6 +681,11 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
if responses.is_empty() {
debug!("Turn completed");
sess.maybe_notify(UserNotification::AgentTurnComplete {
turn_id: sub_id.clone(),
input_messages: turn_input_messages,
last_assistant_message,
});
break;
}
@@ -1485,3 +1541,23 @@ fn format_exec_output(output: &str, exit_code: i32, duration: std::time::Duratio
serde_json::to_string(&payload).expect("serialize ExecOutput")
}
fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
responses.iter().rev().find_map(|item| {
if let ResponseItem::Message { role, content } = item {
if role == "assistant" {
content.iter().rev().find_map(|ci| {
if let ContentItem::OutputText { text } = ci {
Some(text.clone())
} else {
None
}
})
} else {
None
}
} else {
None
}
})
}

View File

@@ -25,6 +25,7 @@ pub async fn init_codex(config: Config) -> anyhow::Result<(CodexWrapper, Event,
approval_policy: config.approval_policy,
sandbox_policy: config.sandbox_policy,
disable_response_storage: config.disable_response_storage,
notify: config.notify.clone(),
})
.await?;

View File

@@ -30,6 +30,28 @@ pub struct Config {
/// System instructions.
pub instructions: Option<String>,
/// Optional external notifier command. When set, Codex will spawn this
/// program after each completed *turn* (i.e. when the agent finishes
/// processing a user submission). The value must be the full command
/// broken into argv tokens **without** the trailing JSON argument - Codex
/// appends one extra argument containing a JSON payload describing the
/// event.
///
/// Example `~/.codex/config.toml` snippet:
///
/// ```toml
/// notify = ["notify-send", "Codex"]
/// ```
///
/// which will be invoked as:
///
/// ```shell
/// notify-send Codex '{"type":"agent-turn-complete","turn-id":"12345"}'
/// ```
///
/// If unset the feature is disabled.
pub notify: Option<Vec<String>>,
}
/// Base config deserialized from ~/.codex/config.toml.
@@ -52,6 +74,10 @@ pub struct ConfigToml {
/// who have opted into Zero Data Retention (ZDR).
pub disable_response_storage: Option<bool>,
/// Optional external command to spawn for end-user notifications.
#[serde(default)]
pub notify: Option<Vec<String>>,
/// System instructions.
pub instructions: Option<String>,
}
@@ -161,6 +187,7 @@ impl Config {
disable_response_storage: disable_response_storage
.or(cfg.disable_response_storage)
.unwrap_or(false),
notify: cfg.notify,
instructions,
}
}

View File

@@ -18,6 +18,7 @@ pub mod linux;
mod models;
pub mod protocol;
mod safety;
mod user_notification;
pub mod util;
mod zdr_transcript;

View File

@@ -36,6 +36,13 @@ pub enum Op {
/// Disable server-side response storage (send full context each request)
#[serde(default)]
disable_response_storage: bool,
/// Optional external notifier command tokens. Present only when the
/// client wants the agent to spawn a program after each completed
/// turn.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
notify: Option<Vec<String>>,
},
/// Abort current task.

View File

@@ -0,0 +1,40 @@
use serde::Serialize;
/// User can configure a program that will receive notifications. Each
/// notification is serialized as JSON and passed as an argument to the
/// program.
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub(crate) enum UserNotification {
#[serde(rename_all = "kebab-case")]
AgentTurnComplete {
turn_id: String,
/// Messages that the user sent to the agent to initiate the turn.
input_messages: Vec<String>,
/// The last message sent by the assistant in the turn.
last_assistant_message: Option<String>,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_user_notification() {
let notification = UserNotification::AgentTurnComplete {
turn_id: "12345".to_string(),
input_messages: vec!["Rename `foo` to `bar` and update the callsites.".to_string()],
last_assistant_message: Some(
"Rename complete and verified `cargo build` succeeds.".to_string(),
),
};
let serialized = serde_json::to_string(&notification).unwrap();
assert_eq!(
serialized,
r#"{"type":"agent-turn-complete","turn-id":"12345","input-messages":["Rename `foo` to `bar` and update the callsites."],"last-assistant-message":"Rename complete and verified `cargo build` succeeds."}"#
);
}
}

View File

@@ -57,6 +57,7 @@ async fn spawn_codex() -> Codex {
approval_policy: config.approval_policy,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
disable_response_storage: false,
notify: None,
},
})
.await

View File

@@ -97,6 +97,7 @@ async fn keeps_previous_response_id_between_tasks() {
approval_policy: config.approval_policy,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
disable_response_storage: false,
notify: None,
},
})
.await

View File

@@ -80,6 +80,7 @@ async fn retries_on_early_close() {
approval_policy: config.approval_policy,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
disable_response_storage: false,
notify: None,
},
})
.await