OpenTelemetry events (#2103)

### Title

## otel

Codex can emit [OpenTelemetry](https://opentelemetry.io/) **log events**
that
describe each run: outbound API requests, streamed responses, user
input,
tool-approval decisions, and the result of every tool invocation. Export
is
**disabled by default** so local runs remain self-contained. Opt in by
adding an
`[otel]` table and choosing an exporter.

```toml
[otel]
environment = "staging"   # defaults to "dev"
exporter = "none"          # defaults to "none"; set to otlp-http or otlp-grpc to send events
log_user_prompt = false    # defaults to false; redact prompt text unless explicitly enabled
```

Codex tags every exported event with `service.name = "codex-cli"`, the
CLI
version, and an `env` attribute so downstream collectors can distinguish
dev/staging/prod traffic. Only telemetry produced inside the
`codex_otel`
crate—the events listed below—is forwarded to the exporter.

### Event catalog

Every event shares a common set of metadata fields: `event.timestamp`,
`conversation.id`, `app.version`, `auth_mode` (when available),
`user.account_id` (when available), `terminal.type`, `model`, and
`slug`.

With OTEL enabled Codex emits the following event types (in addition to
the
metadata above):

- `codex.api_request`
  - `cf_ray` (optional)
  - `attempt`
  - `duration_ms`
  - `http.response.status_code` (optional)
  - `error.message` (failures)
- `codex.sse_event`
  - `event.kind`
  - `duration_ms`
  - `error.message` (failures)
  - `input_token_count` (completion only)
  - `output_token_count` (completion only)
  - `cached_token_count` (completion only, optional)
  - `reasoning_token_count` (completion only, optional)
  - `tool_token_count` (completion only)
- `codex.user_prompt`
  - `prompt_length`
  - `prompt` (redacted unless `log_user_prompt = true`)
- `codex.tool_decision`
  - `tool_name`
  - `call_id`
- `decision` (`approved`, `approved_for_session`, `denied`, or `abort`)
  - `source` (`config` or `user`)
- `codex.tool_result`
  - `tool_name`
  - `call_id`
  - `arguments`
  - `duration_ms` (execution time for the tool)
  - `success` (`"true"` or `"false"`)
  - `output`

### Choosing an exporter

Set `otel.exporter` to control where events go:

- `none` – leaves instrumentation active but skips exporting. This is
the
  default.
- `otlp-http` – posts OTLP log records to an OTLP/HTTP collector.
Specify the
  endpoint, protocol, and headers your collector expects:

  ```toml
  [otel]
  exporter = { otlp-http = {
    endpoint = "https://otel.example.com/v1/logs",
    protocol = "binary",
    headers = { "x-otlp-api-key" = "${OTLP_TOKEN}" }
  }}
  ```

- `otlp-grpc` – streams OTLP log records over gRPC. Provide the endpoint
and any
  metadata headers:

  ```toml
  [otel]
  exporter = { otlp-grpc = {
    endpoint = "https://otel.example.com:4317",
    headers = { "x-otlp-meta" = "abc123" }
  }}
  ```

If the exporter is `none` nothing is written anywhere; otherwise you
must run or point to your
own collector. All exporters run on a background batch worker that is
flushed on
shutdown.

If you build Codex from source the OTEL crate is still behind an `otel`
feature
flag; the official prebuilt binaries ship with the feature enabled. When
the
feature is disabled the telemetry hooks become no-ops so the CLI
continues to
function without the extra dependencies.

---------

Co-authored-by: Anton Panasenko <apanasenko@openai.com>
This commit is contained in:
vishnu-oai
2025-09-29 19:30:55 +01:00
committed by GitHub
parent d15253415a
commit 04c1782e52
38 changed files with 3069 additions and 142 deletions

View File

@@ -47,6 +47,7 @@ use crate::protocol::RateLimitWindow;
use crate::protocol::TokenUsage;
use crate::token_data::PlanType;
use crate::util::backoff;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::ResponseItem;
@@ -73,6 +74,7 @@ struct Error {
pub struct ModelClient {
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
otel_event_manager: OtelEventManager,
client: reqwest::Client,
provider: ModelProviderInfo,
conversation_id: ConversationId,
@@ -84,6 +86,7 @@ impl ModelClient {
pub fn new(
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
otel_event_manager: OtelEventManager,
provider: ModelProviderInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
@@ -94,6 +97,7 @@ impl ModelClient {
Self {
config,
auth_manager,
otel_event_manager,
client,
provider,
conversation_id,
@@ -127,6 +131,7 @@ impl ModelClient {
&self.config.model_family,
&self.client,
&self.provider,
&self.otel_event_manager,
)
.await?;
@@ -163,7 +168,12 @@ impl ModelClient {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
// short circuit for tests
warn!(path, "Streaming from fixture");
return stream_from_fixture(path, self.provider.clone()).await;
return stream_from_fixture(
path,
self.provider.clone(),
self.otel_event_manager.clone(),
)
.await;
}
let auth_manager = self.auth_manager.clone();
@@ -233,7 +243,7 @@ impl ModelClient {
let max_attempts = self.provider.request_max_retries();
for attempt in 0..=max_attempts {
match self
.attempt_stream_responses(&payload_json, &auth_manager)
.attempt_stream_responses(attempt, &payload_json, &auth_manager)
.await
{
Ok(stream) => {
@@ -258,6 +268,7 @@ impl ModelClient {
/// Single attempt to start a streaming Responses API call.
async fn attempt_stream_responses(
&self,
attempt: u64,
payload_json: &Value,
auth_manager: &Option<Arc<AuthManager>>,
) -> std::result::Result<ResponseStream, StreamAttemptError> {
@@ -291,7 +302,11 @@ impl ModelClient {
req_builder = req_builder.header("chatgpt-account-id", account_id);
}
let res = req_builder.send().await;
let res = self
.otel_event_manager
.log_request(attempt, || req_builder.send())
.await;
if let Ok(resp) = &res {
trace!(
"Response status: {}, cf-ray: {}",
@@ -322,6 +337,7 @@ impl ModelClient {
stream,
tx_event,
self.provider.stream_idle_timeout(),
self.otel_event_manager.clone(),
));
Ok(ResponseStream { rx_event })
@@ -399,6 +415,10 @@ impl ModelClient {
self.provider.clone()
}
pub fn get_otel_event_manager(&self) -> OtelEventManager {
self.otel_event_manager.clone()
}
/// Returns the currently configured model slug.
pub fn get_model(&self) -> String {
self.config.model.clone()
@@ -605,6 +625,7 @@ async fn process_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
idle_timeout: Duration,
otel_event_manager: OtelEventManager,
) where
S: Stream<Item = Result<Bytes>> + Unpin,
{
@@ -616,7 +637,10 @@ async fn process_sse<S>(
let mut response_error: Option<CodexErr> = None;
loop {
let sse = match timeout(idle_timeout, stream.next()).await {
let sse = match otel_event_manager
.log_sse_event(|| timeout(idle_timeout, stream.next()))
.await
{
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(e))) => {
debug!("SSE Error: {e:#}");
@@ -630,6 +654,21 @@ async fn process_sse<S>(
id: response_id,
usage,
}) => {
if let Some(token_usage) = &usage {
otel_event_manager.sse_event_completed(
token_usage.input_tokens,
token_usage.output_tokens,
token_usage
.input_tokens_details
.as_ref()
.map(|d| d.cached_tokens),
token_usage
.output_tokens_details
.as_ref()
.map(|d| d.reasoning_tokens),
token_usage.total_tokens,
);
}
let event = ResponseEvent::Completed {
response_id,
token_usage: usage.map(Into::into),
@@ -637,12 +676,13 @@ async fn process_sse<S>(
let _ = tx_event.send(Ok(event)).await;
}
None => {
let _ = tx_event
.send(Err(response_error.unwrap_or(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
))))
.await;
let error = response_error.unwrap_or(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
otel_event_manager.see_event_completed_failed(&error);
let _ = tx_event.send(Err(error)).await;
}
}
return;
@@ -746,7 +786,9 @@ async fn process_sse<S>(
response_error = Some(CodexErr::Stream(message, delay));
}
Err(e) => {
debug!("failed to parse ErrorResponse: {e}");
let error = format!("failed to parse ErrorResponse: {e}");
debug!(error);
response_error = Some(CodexErr::Stream(error, None))
}
}
}
@@ -760,7 +802,9 @@ async fn process_sse<S>(
response_completed = Some(r);
}
Err(e) => {
debug!("failed to parse ResponseCompleted: {e}");
let error = format!("failed to parse ResponseCompleted: {e}");
debug!(error);
response_error = Some(CodexErr::Stream(error, None));
continue;
}
};
@@ -807,6 +851,7 @@ async fn process_sse<S>(
async fn stream_from_fixture(
path: impl AsRef<Path>,
provider: ModelProviderInfo,
otel_event_manager: OtelEventManager,
) -> Result<ResponseStream> {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let f = std::fs::File::open(path.as_ref())?;
@@ -825,6 +870,7 @@ async fn stream_from_fixture(
stream,
tx_event,
provider.stream_idle_timeout(),
otel_event_manager,
));
Ok(ResponseStream { rx_event })
}
@@ -880,6 +926,7 @@ mod tests {
async fn collect_events(
chunks: &[&[u8]],
provider: ModelProviderInfo,
otel_event_manager: OtelEventManager,
) -> Vec<Result<ResponseEvent>> {
let mut builder = IoBuilder::new();
for chunk in chunks {
@@ -889,7 +936,12 @@ mod tests {
let reader = builder.build();
let stream = ReaderStream::new(reader).map_err(CodexErr::Io);
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
tokio::spawn(process_sse(
stream,
tx,
provider.stream_idle_timeout(),
otel_event_manager,
));
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -903,6 +955,7 @@ mod tests {
async fn run_sse(
events: Vec<serde_json::Value>,
provider: ModelProviderInfo,
otel_event_manager: OtelEventManager,
) -> Vec<ResponseEvent> {
let mut body = String::new();
for e in events {
@@ -919,7 +972,12 @@ mod tests {
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
tokio::spawn(process_sse(
stream,
tx,
provider.stream_idle_timeout(),
otel_event_manager,
));
let mut out = Vec::new();
while let Some(ev) = rx.recv().await {
@@ -928,6 +986,18 @@ mod tests {
out
}
fn otel_event_manager() -> OtelEventManager {
OtelEventManager::new(
ConversationId::new(),
"test",
"test",
None,
Some(AuthMode::ChatGPT),
false,
"test".to_string(),
)
}
// ────────────────────────────
// Tests from `implement-test-for-responses-api-sse-parser`
// ────────────────────────────
@@ -979,9 +1049,12 @@ mod tests {
requires_openai_auth: false,
};
let otel_event_manager = otel_event_manager();
let events = collect_events(
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
provider,
otel_event_manager,
)
.await;
@@ -1039,7 +1112,9 @@ mod tests {
requires_openai_auth: false,
};
let events = collect_events(&[sse1.as_bytes()], provider).await;
let otel_event_manager = otel_event_manager();
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
assert_eq!(events.len(), 2);
@@ -1073,7 +1148,9 @@ mod tests {
requires_openai_auth: false,
};
let events = collect_events(&[sse1.as_bytes()], provider).await;
let otel_event_manager = otel_event_manager();
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
assert_eq!(events.len(), 1);
@@ -1178,7 +1255,9 @@ mod tests {
requires_openai_auth: false,
};
let out = run_sse(evs, provider).await;
let otel_event_manager = otel_event_manager();
let out = run_sse(evs, provider, otel_event_manager).await;
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
assert!(
(case.expect_first)(&out[0]),