Files
llmx/codex-rs/core/tests/chat_completions_sse.rs
vishnu-oai 04c1782e52 OpenTelemetry events (#2103)
### Title

## otel

Codex can emit [OpenTelemetry](https://opentelemetry.io/) **log events**
that
describe each run: outbound API requests, streamed responses, user
input,
tool-approval decisions, and the result of every tool invocation. Export
is
**disabled by default** so local runs remain self-contained. Opt in by
adding an
`[otel]` table and choosing an exporter.

```toml
[otel]
environment = "staging"   # defaults to "dev"
exporter = "none"          # defaults to "none"; set to otlp-http or otlp-grpc to send events
log_user_prompt = false    # defaults to false; redact prompt text unless explicitly enabled
```

Codex tags every exported event with `service.name = "codex-cli"`, the
CLI
version, and an `env` attribute so downstream collectors can distinguish
dev/staging/prod traffic. Only telemetry produced inside the
`codex_otel`
crate—the events listed below—is forwarded to the exporter.

### Event catalog

Every event shares a common set of metadata fields: `event.timestamp`,
`conversation.id`, `app.version`, `auth_mode` (when available),
`user.account_id` (when available), `terminal.type`, `model`, and
`slug`.

With OTEL enabled Codex emits the following event types (in addition to
the
metadata above):

- `codex.api_request`
  - `cf_ray` (optional)
  - `attempt`
  - `duration_ms`
  - `http.response.status_code` (optional)
  - `error.message` (failures)
- `codex.sse_event`
  - `event.kind`
  - `duration_ms`
  - `error.message` (failures)
  - `input_token_count` (completion only)
  - `output_token_count` (completion only)
  - `cached_token_count` (completion only, optional)
  - `reasoning_token_count` (completion only, optional)
  - `tool_token_count` (completion only)
- `codex.user_prompt`
  - `prompt_length`
  - `prompt` (redacted unless `log_user_prompt = true`)
- `codex.tool_decision`
  - `tool_name`
  - `call_id`
- `decision` (`approved`, `approved_for_session`, `denied`, or `abort`)
  - `source` (`config` or `user`)
- `codex.tool_result`
  - `tool_name`
  - `call_id`
  - `arguments`
  - `duration_ms` (execution time for the tool)
  - `success` (`"true"` or `"false"`)
  - `output`

### Choosing an exporter

Set `otel.exporter` to control where events go:

- `none` – leaves instrumentation active but skips exporting. This is
the
  default.
- `otlp-http` – posts OTLP log records to an OTLP/HTTP collector.
Specify the
  endpoint, protocol, and headers your collector expects:

  ```toml
  [otel]
  exporter = { otlp-http = {
    endpoint = "https://otel.example.com/v1/logs",
    protocol = "binary",
    headers = { "x-otlp-api-key" = "${OTLP_TOKEN}" }
  }}
  ```

- `otlp-grpc` – streams OTLP log records over gRPC. Provide the endpoint
and any
  metadata headers:

  ```toml
  [otel]
  exporter = { otlp-grpc = {
    endpoint = "https://otel.example.com:4317",
    headers = { "x-otlp-meta" = "abc123" }
  }}
  ```

If the exporter is `none` nothing is written anywhere; otherwise you
must run or point to your
own collector. All exporters run on a background batch worker that is
flushed on
shutdown.

If you build Codex from source the OTEL crate is still behind an `otel`
feature
flag; the official prebuilt binaries ship with the feature enabled. When
the
feature is disabled the telemetry hooks become no-ops so the CLI
continues to
function without the extra dependencies.

---------

Co-authored-by: Anton Panasenko <apanasenko@openai.com>
2025-09-29 11:30:55 -07:00

427 lines
13 KiB
Rust

use std::sync::Arc;
use tracing_test::traced_test;
use codex_core::ContentItem;
use codex_core::ModelClient;
use codex_core::ModelProviderInfo;
use codex_core::Prompt;
use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::WireApi;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use core_test_support::load_default_config_for_test;
use futures::StreamExt;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn network_disabled() -> bool {
std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok()
}
async fn run_stream(sse_body: &str) -> Vec<ResponseEvent> {
run_stream_with_bytes(sse_body.as_bytes()).await
}
async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
let server = MockServer::start().await;
let template = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_bytes(sse_body.to_vec());
Mock::given(method("POST"))
.and(path("/v1/chat/completions"))
.respond_with(template)
.expect(1)
.mount(&server)
.await;
let provider = ModelProviderInfo {
name: "mock".into(),
base_url: Some(format!("{}/v1", server.uri())),
env_key: None,
env_key_instructions: None,
wire_api: WireApi::Chat,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
};
let codex_home = match TempDir::new() {
Ok(dir) => dir,
Err(e) => panic!("failed to create TempDir: {e}"),
};
let mut config = load_default_config_for_test(&codex_home);
config.model_provider_id = provider.name.clone();
config.model_provider = provider.clone();
config.show_raw_agent_reasoning = true;
let effort = config.model_reasoning_effort;
let summary = config.model_reasoning_summary;
let config = Arc::new(config);
let conversation_id = ConversationId::new();
let otel_event_manager = OtelEventManager::new(
conversation_id,
config.model.as_str(),
config.model_family.slug.as_str(),
None,
Some(AuthMode::ChatGPT),
false,
"test".to_string(),
);
let client = ModelClient::new(
Arc::clone(&config),
None,
otel_event_manager,
provider,
effort,
summary,
conversation_id,
);
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "hello".to_string(),
}],
}];
let mut stream = match client.stream(&prompt).await {
Ok(s) => s,
Err(e) => panic!("stream chat failed: {e}"),
};
let mut events = Vec::new();
while let Some(event) = stream.next().await {
match event {
Ok(ev) => events.push(ev),
// We still collect the error to exercise telemetry and complete the task.
Err(_e) => break,
}
}
events
}
fn assert_message(item: &ResponseItem, expected: &str) {
if let ResponseItem::Message { content, .. } = item {
let text = content.iter().find_map(|part| match part {
ContentItem::OutputText { text } | ContentItem::InputText { text } => Some(text),
_ => None,
});
let Some(text) = text else {
panic!("message missing text: {item:?}");
};
assert_eq!(text, expected);
} else {
panic!("expected message item, got: {item:?}");
}
}
fn assert_reasoning(item: &ResponseItem, expected: &str) {
if let ResponseItem::Reasoning {
content: Some(parts),
..
} = item
{
let mut combined = String::new();
for part in parts {
match part {
codex_core::ReasoningItemContent::ReasoningText { text }
| codex_core::ReasoningItemContent::Text { text } => combined.push_str(text),
}
}
assert_eq!(combined, expected);
} else {
panic!("expected reasoning item, got: {item:?}");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn streams_text_without_reasoning() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let sse = concat!(
"data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{}}]}\n\n",
"data: [DONE]\n\n",
);
let events = run_stream(sse).await;
assert_eq!(events.len(), 3, "unexpected events: {events:?}");
match &events[0] {
ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "hi"),
other => panic!("expected text delta, got {other:?}"),
}
match &events[1] {
ResponseEvent::OutputItemDone(item) => assert_message(item, "hi"),
other => panic!("expected terminal message, got {other:?}"),
}
assert!(matches!(events[2], ResponseEvent::Completed { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn streams_reasoning_from_string_delta() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let sse = concat!(
"data: {\"choices\":[{\"delta\":{\"reasoning\":\"think1\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"content\":\"ok\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{} ,\"finish_reason\":\"stop\"}]}\n\n",
);
let events = run_stream(sse).await;
assert_eq!(events.len(), 5, "unexpected events: {events:?}");
match &events[0] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "think1"),
other => panic!("expected reasoning delta, got {other:?}"),
}
match &events[1] {
ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "ok"),
other => panic!("expected text delta, got {other:?}"),
}
match &events[2] {
ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "think1"),
other => panic!("expected reasoning item, got {other:?}"),
}
match &events[3] {
ResponseEvent::OutputItemDone(item) => assert_message(item, "ok"),
other => panic!("expected message item, got {other:?}"),
}
assert!(matches!(events[4], ResponseEvent::Completed { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn streams_reasoning_from_object_delta() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let sse = concat!(
"data: {\"choices\":[{\"delta\":{\"reasoning\":{\"text\":\"partA\"}}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"reasoning\":{\"content\":\"partB\"}}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"content\":\"answer\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{} ,\"finish_reason\":\"stop\"}]}\n\n",
);
let events = run_stream(sse).await;
assert_eq!(events.len(), 6, "unexpected events: {events:?}");
match &events[0] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "partA"),
other => panic!("expected reasoning delta, got {other:?}"),
}
match &events[1] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "partB"),
other => panic!("expected reasoning delta, got {other:?}"),
}
match &events[2] {
ResponseEvent::OutputTextDelta(text) => assert_eq!(text, "answer"),
other => panic!("expected text delta, got {other:?}"),
}
match &events[3] {
ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "partApartB"),
other => panic!("expected reasoning item, got {other:?}"),
}
match &events[4] {
ResponseEvent::OutputItemDone(item) => assert_message(item, "answer"),
other => panic!("expected message item, got {other:?}"),
}
assert!(matches!(events[5], ResponseEvent::Completed { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn streams_reasoning_from_final_message() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let sse = "data: {\"choices\":[{\"message\":{\"reasoning\":\"final-cot\"},\"finish_reason\":\"stop\"}]}\n\n";
let events = run_stream(sse).await;
assert_eq!(events.len(), 3, "unexpected events: {events:?}");
match &events[0] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "final-cot"),
other => panic!("expected reasoning delta, got {other:?}"),
}
match &events[1] {
ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "final-cot"),
other => panic!("expected reasoning item, got {other:?}"),
}
assert!(matches!(events[2], ResponseEvent::Completed { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn streams_reasoning_before_tool_call() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let sse = concat!(
"data: {\"choices\":[{\"delta\":{\"reasoning\":\"pre-tool\"}}]}\n\n",
"data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"id\":\"call_1\",\"type\":\"function\",\"function\":{\"name\":\"run\",\"arguments\":\"{}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n",
);
let events = run_stream(sse).await;
assert_eq!(events.len(), 4, "unexpected events: {events:?}");
match &events[0] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "pre-tool"),
other => panic!("expected reasoning delta, got {other:?}"),
}
match &events[1] {
ResponseEvent::OutputItemDone(item) => assert_reasoning(item, "pre-tool"),
other => panic!("expected reasoning item, got {other:?}"),
}
match &events[2] {
ResponseEvent::OutputItemDone(ResponseItem::FunctionCall {
name,
arguments,
call_id,
..
}) => {
assert_eq!(name, "run");
assert_eq!(arguments, "{}");
assert_eq!(call_id, "call_1");
}
other => panic!("expected function call, got {other:?}"),
}
assert!(matches!(events[3], ResponseEvent::Completed { .. }));
}
#[tokio::test]
#[traced_test]
async fn chat_sse_emits_failed_on_parse_error() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let sse_body = concat!("data: not-json\n\n", "data: [DONE]\n\n");
let _ = run_stream(sse_body).await;
logs_assert(|lines: &[&str]| {
lines
.iter()
.find(|line| {
line.contains("codex.api_request") && line.contains("http.response.status_code=200")
})
.map(|_| Ok(()))
.unwrap_or(Err("cannot find codex.api_request event".to_string()))
});
logs_assert(|lines: &[&str]| {
lines
.iter()
.find(|line| {
line.contains("codex.sse_event")
&& line.contains("error.message")
&& line.contains("expected ident at line 1 column 2")
})
.map(|_| Ok(()))
.unwrap_or(Err("cannot find SSE event".to_string()))
});
}
#[tokio::test]
#[traced_test]
async fn chat_sse_done_chunk_emits_event() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let sse_body = "data: [DONE]\n\n";
let _ = run_stream(sse_body).await;
logs_assert(|lines: &[&str]| {
lines
.iter()
.find(|line| line.contains("codex.sse_event") && line.contains("event.kind=message"))
.map(|_| Ok(()))
.unwrap_or(Err("cannot find SSE event".to_string()))
});
}
#[tokio::test]
#[traced_test]
async fn chat_sse_emits_error_on_invalid_utf8() {
if network_disabled() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
let _ = run_stream_with_bytes(b"data: \x80\x80\n\n").await;
logs_assert(|lines: &[&str]| {
lines
.iter()
.find(|line| {
line.contains("codex.sse_event")
&& line.contains("error.message")
&& line.contains("UTF8 error: invalid utf-8 sequence of 1 bytes from index 0")
})
.map(|_| Ok(()))
.unwrap_or(Err("cannot find SSE event".to_string()))
});
}