Files
llmx/codex-rs/core/tests/stream_no_completed.rs
Michael Bolin a180ed44e8 feat: configurable notifications in the Rust CLI (#793)
With this change, you can specify a program that will be executed to get
notified about events generated by Codex. The notification info will be
packaged as a JSON object. The supported notification types are defined
by the `UserNotification` enum introduced in this PR. Initially, it
contains only one variant, `AgentTurnComplete`:

```rust
pub(crate) enum UserNotification {
    #[serde(rename_all = "kebab-case")]
    AgentTurnComplete {
        turn_id: String,

        /// Messages that the user sent to the agent to initiate the turn.
        input_messages: Vec<String>,

        /// The last message sent by the assistant in the turn.
        last_assistant_message: Option<String>,
    },
}
```

This is intended to support the common case when a "turn" ends, which
often means it is now your chance to give Codex further instructions.

For example, I have the following in my `~/.codex/config.toml`:

```toml
notify = ["python3", "/Users/mbolin/.codex/notify.py"]
```

I created my own custom notifier script that calls out to
[terminal-notifier](https://github.com/julienXX/terminal-notifier) to
show a desktop push notification on macOS. Contents of `notify.py`:

```python
#!/usr/bin/env python3

import json
import subprocess
import sys


def main() -> int:
    if len(sys.argv) != 2:
        print("Usage: notify.py <NOTIFICATION_JSON>")
        return 1

    try:
        notification = json.loads(sys.argv[1])
    except json.JSONDecodeError:
        return 1

    match notification_type := notification.get("type"):
        case "agent-turn-complete":
            assistant_message = notification.get("last-assistant-message")
            if assistant_message:
                title = f"Codex: {assistant_message}"
            else:
                title = "Codex: Turn Complete!"
            input_messages = notification.get("input_messages", [])
            message = " ".join(input_messages)
            title += message
        case _:
            print(f"not sending a push notification for: {notification_type}")
            return 0

    subprocess.check_output(
        [
            "terminal-notifier",
            "-title",
            title,
            "-message",
            message,
            "-group",
            "codex",
            "-ignoreDnD",
            "-activate",
            "com.googlecode.iterm2",
        ]
    )

    return 0


if __name__ == "__main__":
    sys.exit(main())
```

For reference, here are related PRs that tried to add this functionality
to the TypeScript version of the Codex CLI:

* https://github.com/openai/codex/pull/160
* https://github.com/openai/codex/pull/498
2025-05-02 19:48:13 -07:00

113 lines
3.5 KiB
Rust

//! Verifies that the agent retries when the SSE stream terminates before
//! delivering a `response.completed` event.
use std::time::Duration;
use codex_core::config::Config;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::Submission;
use codex_core::Codex;
use tokio::time::timeout;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
fn sse_incomplete() -> String {
// Only a single line; missing the completed event.
"event: response.output_item.done\n\n".to_string()
}
fn sse_completed(id: &str) -> String {
format!(
"event: response.completed\n\
data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"{}\",\"output\":[]}}}}\n\n\n",
id
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retries_on_early_close() {
let server = MockServer::start().await;
struct SeqResponder;
impl Respond for SeqResponder {
fn respond(&self, _: &Request) -> ResponseTemplate {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
static CALLS: AtomicUsize = AtomicUsize::new(0);
let n = CALLS.fetch_add(1, Ordering::SeqCst);
if n == 0 {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_incomplete(), "text/event-stream")
} else {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp_ok"), "text/event-stream")
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {})
.expect(2)
.mount(&server)
.await;
// Environment
std::env::set_var("OPENAI_API_KEY", "test-key");
std::env::set_var("OPENAI_API_BASE", server.uri());
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "1");
std::env::set_var("OPENAI_STREAM_IDLE_TIMEOUT_MS", "2000");
let codex = Codex::spawn(std::sync::Arc::new(tokio::sync::Notify::new())).unwrap();
let config = Config::load_default_config_for_test();
codex
.submit(Submission {
id: "init".into(),
op: Op::ConfigureSession {
model: config.model,
instructions: None,
approval_policy: config.approval_policy,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
disable_response_storage: false,
notify: None,
},
})
.await
.unwrap();
let _ = codex.next_event().await.unwrap();
codex
.submit(Submission {
id: "task".into(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
},
})
.await
.unwrap();
// Wait until TaskComplete (should succeed after retry).
loop {
let ev = timeout(Duration::from_secs(10), codex.next_event())
.await
.unwrap()
.unwrap();
if matches!(ev.msg, codex_core::protocol::EventMsg::TaskComplete) {
break;
}
}
}