diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index fc0eb20a..96bb30f7 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -734,6 +734,7 @@ dependencies = [ "codex-apply-patch", "codex-file-search", "codex-mcp-client", + "codex-otel", "codex-protocol", "codex-rmcp-client", "core_test_support", @@ -771,6 +772,7 @@ dependencies = [ "toml", "toml_edit", "tracing", + "tracing-test", "tree-sitter", "tree-sitter-bash", "uuid", @@ -795,6 +797,7 @@ dependencies = [ "codex-protocol", "core_test_support", "libc", + "opentelemetry-appender-tracing", "owo-colors", "predicates", "pretty_assertions", @@ -948,6 +951,26 @@ dependencies = [ "wiremock", ] +[[package]] +name = "codex-otel" +version = "0.0.0" +dependencies = [ + "chrono", + "codex-protocol", + "eventsource-stream", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "reqwest", + "serde", + "serde_json", + "strum_macros 0.27.2", + "tokio", + "tonic", + "tracing", +] + [[package]] name = "codex-process-hardening" version = "0.0.0" @@ -1051,6 +1074,7 @@ dependencies = [ "lazy_static", "libc", "mcp-types", + "opentelemetry-appender-tracing", "path-clean", "pathdiff", "pretty_assertions", @@ -2301,6 +2325,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -2308,6 +2333,19 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -2342,7 +2380,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.0", "system-configuration", "tokio", "tower-service", @@ -3063,7 +3101,7 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.11.1", "security-framework-sys", "tempfile", ] @@ -3349,6 +3387,104 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.16", + "tracing", +] + +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e68f63eca5fad47e570e00e893094fc17be959c80c79a7d6ec1abdd5ae6ffc16" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "opentelemetry-http" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +dependencies = [ + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +dependencies = [ + "base64", + "hex", + "opentelemetry", + "opentelemetry_sdk", + "prost", + "serde", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2" + +[[package]] +name = "opentelemetry_sdk" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -3463,6 +3599,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -3637,6 +3793,29 @@ dependencies = [ "windows", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "pulldown-cmark" version = "0.10.3" @@ -3682,9 +3861,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.9" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" dependencies = [ "bytes", "cfg_aliases 0.2.1", @@ -3693,7 +3872,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.5.10", "thiserror 2.0.16", "tokio", "tracing", @@ -3702,9 +3881,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" dependencies = [ "bytes", "getrandom 0.3.3", @@ -3723,16 +3902,16 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.14" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970" dependencies = [ "cfg_aliases 0.2.1", "libc", "once_cell", - "socket2", + "socket2 0.5.10", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.52.0", ] [[package]] @@ -3929,6 +4108,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls", + "rustls-native-certs", "rustls-pki-types", "serde", "serde_json", @@ -4060,6 +4240,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.3.0", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -4265,6 +4457,19 @@ dependencies = [ "security-framework-sys", ] +[[package]] +name = "security-framework" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" +dependencies = [ + "bitflags 2.9.1", + "core-foundation 0.10.1", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework-sys" version = "2.14.0" @@ -4530,6 +4735,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7c388c1b5e93756d0c740965c41e8822f866621d41acbdf6336a6a168f8840c" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.0" @@ -5038,7 +5253,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "socket2", + "socket2 0.6.0", "tokio-macros", "windows-sys 0.59.0", ] @@ -5163,6 +5378,35 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64" +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2 0.5.10", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -5171,11 +5415,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.10.0", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -5292,6 +5540,27 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.104", +] + [[package]] name = "tree-sitter" version = "0.25.9" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 862f06aa..1c591897 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -21,6 +21,7 @@ members = [ "protocol-ts", "rmcp-client", "responses-api-proxy", + "otel", "tui", "utils/readiness", ] @@ -55,6 +56,7 @@ codex-protocol = { path = "protocol" } codex-protocol-ts = { path = "protocol-ts" } codex-rmcp-client = { path = "rmcp-client" } codex-tui = { path = "tui" } +codex-otel = { path = "otel" } codex-utils-readiness = { path = "utils/readiness" } core_test_support = { path = "core/tests/common" } mcp-types = { path = "mcp-types" } @@ -103,6 +105,11 @@ mime_guess = "2.0.5" multimap = "0.10.0" nucleo-matcher = "0.3.1" openssl-sys = "*" +opentelemetry = "0.30.0" +opentelemetry_sdk = "0.30.0" +opentelemetry-otlp = "0.30.0" +opentelemetry-appender-tracing = "0.30.0" +opentelemetry-semantic-conventions = "0.30.0" os_info = "3.12.0" owo-colors = "4.2.0" path-absolutize = "3.1.1" @@ -144,9 +151,11 @@ toml_edit = "0.23.4" tracing = "0.1.41" tracing-appender = "0.2.3" tracing-subscriber = "0.3.20" +tracing-test = "0.2.5" tree-sitter = "0.25.9" tree-sitter-bash = "0.25.0" ts-rs = "11" +tonic = "0.13.1" unicode-segmentation = "1.12.0" unicode-width = "0.2" url = "2" diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 9b1c4888..ff1103c3 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -24,6 +24,7 @@ codex-file-search = { workspace = true } codex-mcp-client = { workspace = true } codex-rmcp-client = { workspace = true } codex-protocol = { workspace = true } +codex-otel = { workspace = true, features = ["otel"] } dirs = { workspace = true } env-flags = { workspace = true } eventsource-stream = { workspace = true } @@ -91,6 +92,7 @@ tempfile = { workspace = true } tokio-test = { workspace = true } walkdir = { workspace = true } wiremock = { workspace = true } +tracing-test = { workspace = true, features = ["no-env-filter"] } [package.metadata.cargo-shear] ignored = ["openssl-sys"] diff --git a/codex-rs/core/src/apply_patch.rs b/codex-rs/core/src/apply_patch.rs index 1ebbe5d7..836b8596 100644 --- a/codex-rs/core/src/apply_patch.rs +++ b/codex-rs/core/src/apply_patch.rs @@ -45,12 +45,13 @@ pub(crate) async fn apply_patch( &turn_context.sandbox_policy, &turn_context.cwd, ) { - SafetyCheck::AutoApprove { .. } => { - InternalApplyPatchInvocation::DelegateToExec(ApplyPatchExec { - action, - user_explicitly_approved_this_action: false, - }) - } + SafetyCheck::AutoApprove { + user_explicitly_approved, + .. + } => InternalApplyPatchInvocation::DelegateToExec(ApplyPatchExec { + action, + user_explicitly_approved_this_action: user_explicitly_approved, + }), SafetyCheck::AskUser => { // Compute a readable summary of path changes to include in the // approval request so the user can make an informed decision. diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index eddc7864..f7194a08 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -1,6 +1,19 @@ use std::time::Duration; +use crate::ModelProviderInfo; +use crate::client_common::Prompt; +use crate::client_common::ResponseEvent; +use crate::client_common::ResponseStream; +use crate::error::CodexErr; +use crate::error::Result; +use crate::model_family::ModelFamily; +use crate::openai_tools::create_tools_json_for_chat_completions_api; +use crate::util::backoff; use bytes::Bytes; +use codex_otel::otel_event_manager::OtelEventManager; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ReasoningItemContent; +use codex_protocol::models::ResponseItem; use eventsource_stream::Eventsource; use futures::Stream; use futures::StreamExt; @@ -15,25 +28,13 @@ use tokio::time::timeout; use tracing::debug; use tracing::trace; -use crate::ModelProviderInfo; -use crate::client_common::Prompt; -use crate::client_common::ResponseEvent; -use crate::client_common::ResponseStream; -use crate::error::CodexErr; -use crate::error::Result; -use crate::model_family::ModelFamily; -use crate::openai_tools::create_tools_json_for_chat_completions_api; -use crate::util::backoff; -use codex_protocol::models::ContentItem; -use codex_protocol::models::ReasoningItemContent; -use codex_protocol::models::ResponseItem; - /// Implementation for the classic Chat Completions API. pub(crate) async fn stream_chat_completions( prompt: &Prompt, model_family: &ModelFamily, client: &reqwest::Client, provider: &ModelProviderInfo, + otel_event_manager: &OtelEventManager, ) -> Result { if prompt.output_schema.is_some() { return Err(CodexErr::UnsupportedOperation( @@ -294,10 +295,13 @@ pub(crate) async fn stream_chat_completions( let req_builder = provider.create_request_builder(client, &None).await?; - let res = req_builder - .header(reqwest::header::ACCEPT, "text/event-stream") - .json(&payload) - .send() + let res = otel_event_manager + .log_request(attempt, || { + req_builder + .header(reqwest::header::ACCEPT, "text/event-stream") + .json(&payload) + .send() + }) .await; match res { @@ -308,6 +312,7 @@ pub(crate) async fn stream_chat_completions( stream, tx_event, provider.stream_idle_timeout(), + otel_event_manager.clone(), )); return Ok(ResponseStream { rx_event }); } @@ -351,6 +356,7 @@ async fn process_chat_sse( stream: S, tx_event: mpsc::Sender>, idle_timeout: Duration, + otel_event_manager: OtelEventManager, ) where S: Stream> + Unpin, { @@ -374,7 +380,10 @@ async fn process_chat_sse( let mut reasoning_text = String::new(); loop { - let sse = match timeout(idle_timeout, stream.next()).await { + let sse = match otel_event_manager + .log_sse_event(|| timeout(idle_timeout, stream.next())) + .await + { Ok(Some(Ok(ev))) => ev, Ok(Some(Err(e))) => { let _ = tx_event diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index e8aca68f..685fbf45 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -47,6 +47,7 @@ use crate::protocol::RateLimitWindow; use crate::protocol::TokenUsage; use crate::token_data::PlanType; use crate::util::backoff; +use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ResponseItem; @@ -73,6 +74,7 @@ struct Error { pub struct ModelClient { config: Arc, auth_manager: Option>, + otel_event_manager: OtelEventManager, client: reqwest::Client, provider: ModelProviderInfo, conversation_id: ConversationId, @@ -84,6 +86,7 @@ impl ModelClient { pub fn new( config: Arc, auth_manager: Option>, + otel_event_manager: OtelEventManager, provider: ModelProviderInfo, effort: Option, summary: ReasoningSummaryConfig, @@ -94,6 +97,7 @@ impl ModelClient { Self { config, auth_manager, + otel_event_manager, client, provider, conversation_id, @@ -127,6 +131,7 @@ impl ModelClient { &self.config.model_family, &self.client, &self.provider, + &self.otel_event_manager, ) .await?; @@ -163,7 +168,12 @@ impl ModelClient { if let Some(path) = &*CODEX_RS_SSE_FIXTURE { // short circuit for tests warn!(path, "Streaming from fixture"); - return stream_from_fixture(path, self.provider.clone()).await; + return stream_from_fixture( + path, + self.provider.clone(), + self.otel_event_manager.clone(), + ) + .await; } let auth_manager = self.auth_manager.clone(); @@ -233,7 +243,7 @@ impl ModelClient { let max_attempts = self.provider.request_max_retries(); for attempt in 0..=max_attempts { match self - .attempt_stream_responses(&payload_json, &auth_manager) + .attempt_stream_responses(attempt, &payload_json, &auth_manager) .await { Ok(stream) => { @@ -258,6 +268,7 @@ impl ModelClient { /// Single attempt to start a streaming Responses API call. async fn attempt_stream_responses( &self, + attempt: u64, payload_json: &Value, auth_manager: &Option>, ) -> std::result::Result { @@ -291,7 +302,11 @@ impl ModelClient { req_builder = req_builder.header("chatgpt-account-id", account_id); } - let res = req_builder.send().await; + let res = self + .otel_event_manager + .log_request(attempt, || req_builder.send()) + .await; + if let Ok(resp) = &res { trace!( "Response status: {}, cf-ray: {}", @@ -322,6 +337,7 @@ impl ModelClient { stream, tx_event, self.provider.stream_idle_timeout(), + self.otel_event_manager.clone(), )); Ok(ResponseStream { rx_event }) @@ -399,6 +415,10 @@ impl ModelClient { self.provider.clone() } + pub fn get_otel_event_manager(&self) -> OtelEventManager { + self.otel_event_manager.clone() + } + /// Returns the currently configured model slug. pub fn get_model(&self) -> String { self.config.model.clone() @@ -605,6 +625,7 @@ async fn process_sse( stream: S, tx_event: mpsc::Sender>, idle_timeout: Duration, + otel_event_manager: OtelEventManager, ) where S: Stream> + Unpin, { @@ -616,7 +637,10 @@ async fn process_sse( let mut response_error: Option = None; loop { - let sse = match timeout(idle_timeout, stream.next()).await { + let sse = match otel_event_manager + .log_sse_event(|| timeout(idle_timeout, stream.next())) + .await + { Ok(Some(Ok(sse))) => sse, Ok(Some(Err(e))) => { debug!("SSE Error: {e:#}"); @@ -630,6 +654,21 @@ async fn process_sse( id: response_id, usage, }) => { + if let Some(token_usage) = &usage { + otel_event_manager.sse_event_completed( + token_usage.input_tokens, + token_usage.output_tokens, + token_usage + .input_tokens_details + .as_ref() + .map(|d| d.cached_tokens), + token_usage + .output_tokens_details + .as_ref() + .map(|d| d.reasoning_tokens), + token_usage.total_tokens, + ); + } let event = ResponseEvent::Completed { response_id, token_usage: usage.map(Into::into), @@ -637,12 +676,13 @@ async fn process_sse( let _ = tx_event.send(Ok(event)).await; } None => { - let _ = tx_event - .send(Err(response_error.unwrap_or(CodexErr::Stream( - "stream closed before response.completed".into(), - None, - )))) - .await; + let error = response_error.unwrap_or(CodexErr::Stream( + "stream closed before response.completed".into(), + None, + )); + otel_event_manager.see_event_completed_failed(&error); + + let _ = tx_event.send(Err(error)).await; } } return; @@ -746,7 +786,9 @@ async fn process_sse( response_error = Some(CodexErr::Stream(message, delay)); } Err(e) => { - debug!("failed to parse ErrorResponse: {e}"); + let error = format!("failed to parse ErrorResponse: {e}"); + debug!(error); + response_error = Some(CodexErr::Stream(error, None)) } } } @@ -760,7 +802,9 @@ async fn process_sse( response_completed = Some(r); } Err(e) => { - debug!("failed to parse ResponseCompleted: {e}"); + let error = format!("failed to parse ResponseCompleted: {e}"); + debug!(error); + response_error = Some(CodexErr::Stream(error, None)); continue; } }; @@ -807,6 +851,7 @@ async fn process_sse( async fn stream_from_fixture( path: impl AsRef, provider: ModelProviderInfo, + otel_event_manager: OtelEventManager, ) -> Result { let (tx_event, rx_event) = mpsc::channel::>(1600); let f = std::fs::File::open(path.as_ref())?; @@ -825,6 +870,7 @@ async fn stream_from_fixture( stream, tx_event, provider.stream_idle_timeout(), + otel_event_manager, )); Ok(ResponseStream { rx_event }) } @@ -880,6 +926,7 @@ mod tests { async fn collect_events( chunks: &[&[u8]], provider: ModelProviderInfo, + otel_event_manager: OtelEventManager, ) -> Vec> { let mut builder = IoBuilder::new(); for chunk in chunks { @@ -889,7 +936,12 @@ mod tests { let reader = builder.build(); let stream = ReaderStream::new(reader).map_err(CodexErr::Io); let (tx, mut rx) = mpsc::channel::>(16); - tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout())); + tokio::spawn(process_sse( + stream, + tx, + provider.stream_idle_timeout(), + otel_event_manager, + )); let mut events = Vec::new(); while let Some(ev) = rx.recv().await { @@ -903,6 +955,7 @@ mod tests { async fn run_sse( events: Vec, provider: ModelProviderInfo, + otel_event_manager: OtelEventManager, ) -> Vec { let mut body = String::new(); for e in events { @@ -919,7 +972,12 @@ mod tests { let (tx, mut rx) = mpsc::channel::>(8); let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io); - tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout())); + tokio::spawn(process_sse( + stream, + tx, + provider.stream_idle_timeout(), + otel_event_manager, + )); let mut out = Vec::new(); while let Some(ev) = rx.recv().await { @@ -928,6 +986,18 @@ mod tests { out } + fn otel_event_manager() -> OtelEventManager { + OtelEventManager::new( + ConversationId::new(), + "test", + "test", + None, + Some(AuthMode::ChatGPT), + false, + "test".to_string(), + ) + } + // ──────────────────────────── // Tests from `implement-test-for-responses-api-sse-parser` // ──────────────────────────── @@ -979,9 +1049,12 @@ mod tests { requires_openai_auth: false, }; + let otel_event_manager = otel_event_manager(); + let events = collect_events( &[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()], provider, + otel_event_manager, ) .await; @@ -1039,7 +1112,9 @@ mod tests { requires_openai_auth: false, }; - let events = collect_events(&[sse1.as_bytes()], provider).await; + let otel_event_manager = otel_event_manager(); + + let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await; assert_eq!(events.len(), 2); @@ -1073,7 +1148,9 @@ mod tests { requires_openai_auth: false, }; - let events = collect_events(&[sse1.as_bytes()], provider).await; + let otel_event_manager = otel_event_manager(); + + let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await; assert_eq!(events.len(), 1); @@ -1178,7 +1255,9 @@ mod tests { requires_openai_auth: false, }; - let out = run_sse(evs, provider).await; + let otel_event_manager = otel_event_manager(); + + let out = run_sse(evs, provider, otel_event_manager).await; assert_eq!(out.len(), case.expected_len, "case {}", case.name); assert!( (case.expect_first)(&out[0]), diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8b792887..7a2b2c8d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::fmt::Debug; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -11,6 +12,7 @@ use crate::client_common::REVIEW_PROMPT; use crate::event_mapping::map_response_item_to_event_messages; use crate::function_tool::FunctionCallError; use crate::review_format::format_review_findings_block; +use crate::terminal; use crate::user_notification::UserNotifier; use async_channel::Receiver; use async_channel::Sender; @@ -125,6 +127,8 @@ use crate::unified_exec::UnifiedExecSessionManager; use crate::user_instructions::UserInstructions; use crate::user_notification::UserNotification; use crate::util::backoff; +use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_event_manager::ToolDecisionSource; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::custom_prompts::CustomPrompt; @@ -422,11 +426,35 @@ impl Session { } } + let otel_event_manager = OtelEventManager::new( + conversation_id, + config.model.as_str(), + config.model_family.slug.as_str(), + auth_manager.auth().and_then(|a| a.get_account_id()), + auth_manager.auth().map(|a| a.mode), + config.otel.log_user_prompt, + terminal::user_agent(), + ); + + otel_event_manager.conversation_starts( + config.model_provider.name.as_str(), + config.model_reasoning_effort, + config.model_reasoning_summary, + config.model_context_window, + config.model_max_output_tokens, + config.model_auto_compact_token_limit, + config.approval_policy, + config.sandbox_policy.clone(), + config.mcp_servers.keys().map(String::as_str).collect(), + config.active_profile.clone(), + ); + // Now that the conversation id is final (may have been updated by resume), // construct the model client. let client = ModelClient::new( config.clone(), Some(auth_manager.clone()), + otel_event_manager, provider.clone(), model_reasoning_effort, model_reasoning_summary, @@ -1122,9 +1150,15 @@ async fn submission_loop( updated_config.model_context_window = Some(model_info.context_window); } + let otel_event_manager = prev.client.get_otel_event_manager().with_model( + updated_config.model.as_str(), + updated_config.model_family.slug.as_str(), + ); + let client = ModelClient::new( Arc::new(updated_config), auth_manager, + otel_event_manager, provider, effective_effort, effective_summary, @@ -1176,6 +1210,10 @@ async fn submission_loop( } } Op::UserInput { items } => { + turn_context + .client + .get_otel_event_manager() + .user_prompt(&items); // attempt to inject input into current task if let Err(items) = sess.inject_input(items).await { // no current task, spawn a new one @@ -1193,6 +1231,10 @@ async fn submission_loop( summary, final_output_json_schema, } => { + turn_context + .client + .get_otel_event_manager() + .user_prompt(&items); // attempt to inject input into current task if let Err(items) = sess.inject_input(items).await { // Derive a fresh TurnContext for this turn using the provided overrides. @@ -1211,11 +1253,18 @@ async fn submission_loop( per_turn_config.model_context_window = Some(model_info.context_window); } + let otel_event_manager = + turn_context.client.get_otel_event_manager().with_model( + per_turn_config.model.as_str(), + per_turn_config.model_family.slug.as_str(), + ); + // Build a new client with per‑turn reasoning settings. // Reuse the same provider and session id; auth defaults to env/API key. let client = ModelClient::new( Arc::new(per_turn_config), auth_manager, + otel_event_manager, provider, effort, summary, @@ -1472,10 +1521,19 @@ async fn spawn_review_thread( per_turn_config.model_context_window = Some(model_info.context_window); } + let otel_event_manager = parent_turn_context + .client + .get_otel_event_manager() + .with_model( + per_turn_config.model.as_str(), + per_turn_config.model_family.slug.as_str(), + ); + let per_turn_config = Arc::new(per_turn_config); let client = ModelClient::new( per_turn_config.clone(), auth_manager, + otel_event_manager, provider, per_turn_config.model_reasoning_effort, per_turn_config.model_reasoning_summary, @@ -2140,16 +2198,21 @@ async fn handle_response_item( .await; Some(resp) } else { - let result = handle_function_call( - sess, - turn_context, - turn_diff_tracker, - sub_id.to_string(), - name, - arguments, - call_id.clone(), - ) - .await; + let result = turn_context + .client + .get_otel_event_manager() + .log_tool_result(name.as_str(), call_id.as_str(), arguments.as_str(), || { + handle_function_call( + sess, + turn_context, + turn_diff_tracker, + sub_id.to_string(), + name.to_owned(), + arguments.to_owned(), + call_id.clone(), + ) + }) + .await; let output = match result { Ok(content) => FunctionCallOutputPayload { @@ -2170,6 +2233,7 @@ async fn handle_response_item( status: _, action, } => { + let name = "local_shell"; let LocalShellAction::Exec(action) = action; tracing::info!("LocalShellCall: {action:?}"); let params = ShellToolCallParams { @@ -2183,11 +2247,18 @@ async fn handle_response_item( (Some(call_id), _) => call_id, (None, Some(id)) => id, (None, None) => { - error!("LocalShellCall without call_id or id"); + let error_message = "LocalShellCall without call_id or id"; + + turn_context + .client + .get_otel_event_manager() + .log_tool_failed(name, error_message); + + error!(error_message); return Ok(Some(ResponseInputItem::FunctionCallOutput { call_id: "".to_string(), output: FunctionCallOutputPayload { - content: "LocalShellCall without call_id or id".to_string(), + content: error_message.to_string(), success: None, }, })); @@ -2196,15 +2267,26 @@ async fn handle_response_item( let exec_params = to_exec_params(params, turn_context); { - let result = handle_container_exec_with_params( - exec_params, - sess, - turn_context, - turn_diff_tracker, - sub_id.to_string(), - effective_call_id.clone(), - ) - .await; + let result = turn_context + .client + .get_otel_event_manager() + .log_tool_result( + name, + effective_call_id.as_str(), + exec_params.command.join(" ").as_str(), + || { + handle_container_exec_with_params( + name, + exec_params, + sess, + turn_context, + turn_diff_tracker, + sub_id.to_string(), + effective_call_id.clone(), + ) + }, + ) + .await; let output = match result { Ok(content) => FunctionCallOutputPayload { @@ -2229,16 +2311,21 @@ async fn handle_response_item( input, status: _, } => { - let result = handle_custom_tool_call( - sess, - turn_context, - turn_diff_tracker, - sub_id.to_string(), - name, - input, - call_id.clone(), - ) - .await; + let result = turn_context + .client + .get_otel_event_manager() + .log_tool_result(name.as_str(), call_id.as_str(), input.as_str(), || { + handle_custom_tool_call( + sess, + turn_context, + turn_diff_tracker, + sub_id.to_string(), + name.to_owned(), + input.to_owned(), + call_id.clone(), + ) + }) + .await; let output = match result { Ok(content) => content, @@ -2344,6 +2431,7 @@ async fn handle_function_call( "container.exec" | "shell" => { let params = parse_container_exec_arguments(arguments, turn_context, &call_id)?; handle_container_exec_with_params( + name.as_str(), params, sess, turn_context, @@ -2407,6 +2495,7 @@ async fn handle_function_call( justification: None, }; handle_container_exec_with_params( + name.as_str(), exec_params, sess, turn_context, @@ -2479,6 +2568,7 @@ async fn handle_custom_tool_call( }; handle_container_exec_with_params( + name.as_str(), exec_params, sess, turn_context, @@ -2548,6 +2638,7 @@ fn maybe_translate_shell_command( } async fn handle_container_exec_with_params( + tool_name: &str, params: ExecParams, sess: &Session, turn_context: &TurnContext, @@ -2555,6 +2646,8 @@ async fn handle_container_exec_with_params( sub_id: String, call_id: String, ) -> Result { + let otel_event_manager = turn_context.client.get_otel_event_manager(); + if params.with_escalated_permissions.unwrap_or(false) && !matches!(turn_context.approval_policy, AskForApproval::OnRequest) { @@ -2618,6 +2711,7 @@ async fn handle_container_exec_with_params( let safety = if *user_explicitly_approved_this_action { SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, + user_explicitly_approved: true, } } else { assess_safety_for_untrusted_command( @@ -2649,7 +2743,23 @@ async fn handle_container_exec_with_params( }; let sandbox_type = match safety { - SafetyCheck::AutoApprove { sandbox_type } => sandbox_type, + SafetyCheck::AutoApprove { + sandbox_type, + user_explicitly_approved, + } => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Approved, + if user_explicitly_approved { + ToolDecisionSource::User + } else { + ToolDecisionSource::Config + }, + ); + + sandbox_type + } SafetyCheck::AskUser => { let decision = sess .request_command_approval( @@ -2661,15 +2771,45 @@ async fn handle_container_exec_with_params( ) .await; match decision { - ReviewDecision::Approved => (), + ReviewDecision::Approved => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Approved, + ToolDecisionSource::User, + ); + } ReviewDecision::ApprovedForSession => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::ApprovedForSession, + ToolDecisionSource::User, + ); sess.add_approved_command(params.command.clone()).await; } - ReviewDecision::Denied | ReviewDecision::Abort => { + ReviewDecision::Denied => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Denied, + ToolDecisionSource::User, + ); return Err(FunctionCallError::RespondToModel( "exec command rejected by user".to_string(), )); } + ReviewDecision::Abort => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Abort, + ToolDecisionSource::User, + ); + return Err(FunctionCallError::RespondToModel( + "exec command aborted by user".to_string(), + )); + } } // No sandboxing is applied because the user has given // explicit approval. Often, we end up in this case because @@ -2678,6 +2818,12 @@ async fn handle_container_exec_with_params( SandboxType::None } SafetyCheck::Reject { reason } => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + ReviewDecision::Denied, + ToolDecisionSource::Config, + ); return Err(FunctionCallError::RespondToModel(format!( "exec command rejected: {reason:?}" ))); @@ -2736,6 +2882,7 @@ async fn handle_container_exec_with_params( } Err(CodexErr::Sandbox(error)) => { handle_sandbox_error( + tool_name, turn_diff_tracker, params, exec_command_context, @@ -2743,6 +2890,7 @@ async fn handle_container_exec_with_params( sandbox_type, sess, turn_context, + &otel_event_manager, ) .await } @@ -2752,7 +2900,9 @@ async fn handle_container_exec_with_params( } } +#[allow(clippy::too_many_arguments)] async fn handle_sandbox_error( + tool_name: &str, turn_diff_tracker: &mut TurnDiffTracker, params: ExecParams, exec_command_context: ExecCommandContext, @@ -2760,6 +2910,7 @@ async fn handle_sandbox_error( sandbox_type: SandboxType, sess: &Session, turn_context: &TurnContext, + otel_event_manager: &OtelEventManager, ) -> Result { let call_id = exec_command_context.call_id.clone(); let sub_id = exec_command_context.sub_id.clone(); @@ -2814,6 +2965,13 @@ async fn handle_sandbox_error( sess.notify_background_event(&sub_id, "retrying command without sandbox") .await; + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + decision, + ToolDecisionSource::User, + ); + // This is an escalated retry; the policy will not be // examined and the sandbox has been set to `None`. let retry_output_result = sess @@ -2854,7 +3012,14 @@ async fn handle_sandbox_error( ))), } } - ReviewDecision::Denied | ReviewDecision::Abort => { + decision @ (ReviewDecision::Denied | ReviewDecision::Abort) => { + otel_event_manager.tool_decision( + tool_name, + call_id.as_str(), + decision, + ToolDecisionSource::User, + ); + // Fall through to original failure handling. Err(FunctionCallError::RespondToModel( "exec command rejected by user".to_string(), @@ -3129,13 +3294,17 @@ mod tests { use super::*; use crate::config::ConfigOverrides; use crate::config::ConfigToml; + use crate::protocol::CompactedItem; use crate::protocol::InitialHistory; use crate::protocol::ResumedHistory; use crate::state::TaskKind; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; + use codex_protocol::mcp_protocol::AuthMode; use codex_protocol::models::ContentItem; + use codex_protocol::models::ResponseItem; + use mcp_types::ContentBlock; use mcp_types::TextContent; use pretty_assertions::assert_eq; @@ -3370,6 +3539,18 @@ mod tests { }) } + fn otel_event_manager(conversation_id: ConversationId, config: &Config) -> OtelEventManager { + OtelEventManager::new( + conversation_id, + config.model.as_str(), + config.model_family.slug.as_str(), + None, + Some(AuthMode::ChatGPT), + false, + "test".to_string(), + ) + } + pub(crate) fn make_session_and_context() -> (Session, TurnContext) { let (tx_event, _rx_event) = async_channel::unbounded(); let codex_home = tempfile::tempdir().expect("create temp dir"); @@ -3381,9 +3562,11 @@ mod tests { .expect("load default test config"); let config = Arc::new(config); let conversation_id = ConversationId::default(); + let otel_event_manager = otel_event_manager(conversation_id, config.as_ref()); let client = ModelClient::new( config.clone(), None, + otel_event_manager, config.model_provider.clone(), config.model_reasoning_effort, config.model_reasoning_summary, @@ -3448,9 +3631,11 @@ mod tests { .expect("load default test config"); let config = Arc::new(config); let conversation_id = ConversationId::default(); + let otel_event_manager = otel_event_manager(conversation_id, config.as_ref()); let client = ModelClient::new( config.clone(), None, + otel_event_manager, config.model_provider.clone(), config.model_reasoning_effort, config.model_reasoning_summary, @@ -3741,10 +3926,12 @@ mod tests { let mut turn_diff_tracker = TurnDiffTracker::new(); + let tool_name = "shell"; let sub_id = "test-sub".to_string(); let call_id = "test-call".to_string(); let resp = handle_container_exec_with_params( + tool_name, params, &session, &turn_context, @@ -3770,6 +3957,7 @@ mod tests { turn_context.sandbox_policy = SandboxPolicy::DangerFullAccess; let resp2 = handle_container_exec_with_params( + tool_name, params2, &session, &turn_context, diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index 292b9f7b..7d616204 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -1,8 +1,12 @@ use crate::config_profile::ConfigProfile; +use crate::config_types::DEFAULT_OTEL_ENVIRONMENT; use crate::config_types::History; use crate::config_types::McpServerConfig; use crate::config_types::McpServerTransportConfig; use crate::config_types::Notifications; +use crate::config_types::OtelConfig; +use crate::config_types::OtelConfigToml; +use crate::config_types::OtelExporterKind; use crate::config_types::ReasoningSummaryFormat; use crate::config_types::SandboxWorkspaceWrite; use crate::config_types::ShellEnvironmentPolicy; @@ -199,6 +203,9 @@ pub struct Config { /// All characters are inserted as they are received, and no buffering /// or placeholder replacement will occur for fast keypress bursts. pub disable_paste_burst: bool, + + /// OTEL configuration (exporter type, endpoint, headers, etc.). + pub otel: crate::config_types::OtelConfig, } impl Config { @@ -719,6 +726,9 @@ pub struct ConfigToml { /// All characters are inserted as they are received, and no buffering /// or placeholder replacement will occur for fast keypress bursts. pub disable_paste_burst: Option, + + /// OTEL configuration. + pub otel: Option, } impl From for UserSavedConfig { @@ -1068,6 +1078,19 @@ impl Config { .as_ref() .map(|t| t.notifications.clone()) .unwrap_or_default(), + otel: { + let t: OtelConfigToml = cfg.otel.unwrap_or_default(); + let log_user_prompt = t.log_user_prompt.unwrap_or(false); + let environment = t + .environment + .unwrap_or(DEFAULT_OTEL_ENVIRONMENT.to_string()); + let exporter = t.exporter.unwrap_or(OtelExporterKind::None); + OtelConfig { + log_user_prompt, + environment, + exporter, + } + }, }; Ok(config) } @@ -1809,6 +1832,7 @@ model_verbosity = "high" active_profile: Some("o3".to_string()), disable_paste_burst: false, tui_notifications: Default::default(), + otel: OtelConfig::default(), }, o3_profile_config ); @@ -1868,6 +1892,7 @@ model_verbosity = "high" active_profile: Some("gpt3".to_string()), disable_paste_burst: false, tui_notifications: Default::default(), + otel: OtelConfig::default(), }; assert_eq!(expected_gpt3_profile_config, gpt3_profile_config); @@ -1942,6 +1967,7 @@ model_verbosity = "high" active_profile: Some("zdr".to_string()), disable_paste_burst: false, tui_notifications: Default::default(), + otel: OtelConfig::default(), }; assert_eq!(expected_zdr_profile_config, zdr_profile_config); @@ -2002,6 +2028,7 @@ model_verbosity = "high" active_profile: Some("gpt5".to_string()), disable_paste_burst: false, tui_notifications: Default::default(), + otel: OtelConfig::default(), }; assert_eq!(expected_gpt5_profile_config, gpt5_profile_config); diff --git a/codex-rs/core/src/config_types.rs b/codex-rs/core/src/config_types.rs index 283ae3a4..4728e76e 100644 --- a/codex-rs/core/src/config_types.rs +++ b/codex-rs/core/src/config_types.rs @@ -13,6 +13,8 @@ use serde::Deserialize; use serde::Serialize; use serde::de::Error as SerdeError; +pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev"; + #[derive(Serialize, Debug, Clone, PartialEq)] pub struct McpServerConfig { #[serde(flatten)] @@ -219,6 +221,64 @@ pub enum HistoryPersistence { None, } +// ===== OTEL configuration ===== + +#[derive(Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "kebab-case")] +pub enum OtelHttpProtocol { + /// Binary payload + Binary, + /// JSON payload + Json, +} + +/// Which OTEL exporter to use. +#[derive(Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "kebab-case")] +pub enum OtelExporterKind { + None, + OtlpHttp { + endpoint: String, + headers: HashMap, + protocol: OtelHttpProtocol, + }, + OtlpGrpc { + endpoint: String, + headers: HashMap, + }, +} + +/// OTEL settings loaded from config.toml. Fields are optional so we can apply defaults. +#[derive(Deserialize, Debug, Clone, PartialEq, Default)] +pub struct OtelConfigToml { + /// Log user prompt in traces + pub log_user_prompt: Option, + + /// Mark traces with environment (dev, staging, prod, test). Defaults to dev. + pub environment: Option, + + /// Exporter to use. Defaults to `otlp-file`. + pub exporter: Option, +} + +/// Effective OTEL settings after defaults are applied. +#[derive(Debug, Clone, PartialEq)] +pub struct OtelConfig { + pub log_user_prompt: bool, + pub environment: String, + pub exporter: OtelExporterKind, +} + +impl Default for OtelConfig { + fn default() -> Self { + OtelConfig { + log_user_prompt: false, + environment: DEFAULT_OTEL_ENVIRONMENT.to_owned(), + exporter: OtelExporterKind::None, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(untagged)] pub enum Notifications { diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index f3f61066..7d087a0d 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -103,3 +103,5 @@ pub use codex_protocol::models::LocalShellExecAction; pub use codex_protocol::models::LocalShellStatus; pub use codex_protocol::models::ReasoningItemContent; pub use codex_protocol::models::ResponseItem; + +pub mod otel_init; diff --git a/codex-rs/core/src/otel_init.rs b/codex-rs/core/src/otel_init.rs new file mode 100644 index 00000000..0d6002c2 --- /dev/null +++ b/codex-rs/core/src/otel_init.rs @@ -0,0 +1,61 @@ +use crate::config::Config; +use crate::config_types::OtelExporterKind as Kind; +use crate::config_types::OtelHttpProtocol as Protocol; +use crate::default_client::ORIGINATOR; +use codex_otel::config::OtelExporter; +use codex_otel::config::OtelHttpProtocol; +use codex_otel::config::OtelSettings; +use codex_otel::otel_provider::OtelProvider; +use std::error::Error; + +/// Build an OpenTelemetry provider from the app Config. +/// +/// Returns `None` when OTEL export is disabled. +pub fn build_provider( + config: &Config, + service_version: &str, +) -> Result, Box> { + let exporter = match &config.otel.exporter { + Kind::None => OtelExporter::None, + Kind::OtlpHttp { + endpoint, + headers, + protocol, + } => { + let protocol = match protocol { + Protocol::Json => OtelHttpProtocol::Json, + Protocol::Binary => OtelHttpProtocol::Binary, + }; + + OtelExporter::OtlpHttp { + endpoint: endpoint.clone(), + headers: headers + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(), + protocol, + } + } + Kind::OtlpGrpc { endpoint, headers } => OtelExporter::OtlpGrpc { + endpoint: endpoint.clone(), + headers: headers + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(), + }, + }; + + OtelProvider::from(&OtelSettings { + service_name: ORIGINATOR.value.to_owned(), + service_version: service_version.to_string(), + codex_home: config.codex_home.clone(), + environment: config.otel.environment.to_string(), + exporter, + }) +} + +/// Filter predicate for exporting only Codex-owned events via OTEL. +/// Keeps events that originated from codex_otel module +pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool { + meta.target().starts_with("codex_otel") +} diff --git a/codex-rs/core/src/safety.rs b/codex-rs/core/src/safety.rs index 0c8dfd9a..b976ae4a 100644 --- a/codex-rs/core/src/safety.rs +++ b/codex-rs/core/src/safety.rs @@ -15,9 +15,14 @@ use crate::protocol::SandboxPolicy; #[derive(Debug, PartialEq)] pub enum SafetyCheck { - AutoApprove { sandbox_type: SandboxType }, + AutoApprove { + sandbox_type: SandboxType, + user_explicitly_approved: bool, + }, AskUser, - Reject { reason: String }, + Reject { + reason: String, + }, } pub fn assess_patch_safety( @@ -54,12 +59,16 @@ pub fn assess_patch_safety( // fall back to asking the user because the patch may touch arbitrary // paths outside the project. match get_platform_sandbox() { - Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type }, + Some(sandbox_type) => SafetyCheck::AutoApprove { + sandbox_type, + user_explicitly_approved: false, + }, None if sandbox_policy == &SandboxPolicy::DangerFullAccess => { // If the user has explicitly requested DangerFullAccess, then // we can auto-approve even without a sandbox. SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, + user_explicitly_approved: false, } } None => SafetyCheck::AskUser, @@ -118,6 +127,7 @@ pub fn assess_command_safety( if is_known_safe_command(command) || approved.contains(command) { return SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, + user_explicitly_approved: false, }; } @@ -143,13 +153,17 @@ pub(crate) fn assess_safety_for_untrusted_command( | (Never, DangerFullAccess) | (OnRequest, DangerFullAccess) => SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, + user_explicitly_approved: false, }, (OnRequest, ReadOnly) | (OnRequest, WorkspaceWrite { .. }) => { if with_escalated_permissions { SafetyCheck::AskUser } else { match get_platform_sandbox() { - Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type }, + Some(sandbox_type) => SafetyCheck::AutoApprove { + sandbox_type, + user_explicitly_approved: false, + }, // Fall back to asking since the command is untrusted and // we do not have a sandbox available None => SafetyCheck::AskUser, @@ -161,7 +175,10 @@ pub(crate) fn assess_safety_for_untrusted_command( | (OnFailure, ReadOnly) | (OnFailure, WorkspaceWrite { .. }) => { match get_platform_sandbox() { - Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type }, + Some(sandbox_type) => SafetyCheck::AutoApprove { + sandbox_type, + user_explicitly_approved: false, + }, None => { if matches!(approval_policy, OnFailure) { // Since the command is not trusted, even though the @@ -362,7 +379,8 @@ mod tests { assert_eq!( safety_check, SafetyCheck::AutoApprove { - sandbox_type: SandboxType::None + sandbox_type: SandboxType::None, + user_explicitly_approved: false, } ); } @@ -409,7 +427,10 @@ mod tests { ); let expected = match get_platform_sandbox() { - Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type }, + Some(sandbox_type) => SafetyCheck::AutoApprove { + sandbox_type, + user_explicitly_approved: false, + }, None => SafetyCheck::AskUser, }; assert_eq!(safety_check, expected); diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index ba3fe9de..4551c5ca 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -11,6 +11,8 @@ use codex_core::ReasoningItemContent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_otel::otel_event_manager::OtelEventManager; +use codex_protocol::mcp_protocol::AuthMode; use codex_protocol::mcp_protocol::ConversationId; use core_test_support::load_default_config_for_test; use futures::StreamExt; @@ -70,13 +72,26 @@ async fn run_request(input: Vec) -> Value { let summary = config.model_reasoning_summary; let config = Arc::new(config); + let conversation_id = ConversationId::new(); + + let otel_event_manager = OtelEventManager::new( + conversation_id, + config.model.as_str(), + config.model_family.slug.as_str(), + None, + Some(AuthMode::ChatGPT), + false, + "test".to_string(), + ); + let client = ModelClient::new( Arc::clone(&config), None, + otel_event_manager, provider, effort, summary, - ConversationId::new(), + conversation_id, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index 6155d15e..9338a1af 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use tracing_test::traced_test; use codex_core::ContentItem; use codex_core::ModelClient; @@ -8,6 +9,8 @@ use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_otel::otel_event_manager::OtelEventManager; +use codex_protocol::mcp_protocol::AuthMode; use codex_protocol::mcp_protocol::ConversationId; use core_test_support::load_default_config_for_test; use futures::StreamExt; @@ -23,11 +26,15 @@ fn network_disabled() -> bool { } async fn run_stream(sse_body: &str) -> Vec { + run_stream_with_bytes(sse_body.as_bytes()).await +} + +async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec { let server = MockServer::start().await; let template = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") - .set_body_raw(sse_body.to_string(), "text/event-stream"); + .set_body_bytes(sse_body.to_vec()); Mock::given(method("POST")) .and(path("/v1/chat/completions")) @@ -63,13 +70,26 @@ async fn run_stream(sse_body: &str) -> Vec { let summary = config.model_reasoning_summary; let config = Arc::new(config); + let conversation_id = ConversationId::new(); + + let otel_event_manager = OtelEventManager::new( + conversation_id, + config.model.as_str(), + config.model_family.slug.as_str(), + None, + Some(AuthMode::ChatGPT), + false, + "test".to_string(), + ); + let client = ModelClient::new( Arc::clone(&config), None, + otel_event_manager, provider, effort, summary, - ConversationId::new(), + conversation_id, ); let mut prompt = Prompt::default(); @@ -89,7 +109,8 @@ async fn run_stream(sse_body: &str) -> Vec { while let Some(event) = stream.next().await { match event { Ok(ev) => events.push(ev), - Err(e) => panic!("stream event error: {e}"), + // We still collect the error to exercise telemetry and complete the task. + Err(_e) => break, } } events @@ -318,3 +339,88 @@ async fn streams_reasoning_before_tool_call() { assert!(matches!(events[3], ResponseEvent::Completed { .. })); } + +#[tokio::test] +#[traced_test] +async fn chat_sse_emits_failed_on_parse_error() { + if network_disabled() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + let sse_body = concat!("data: not-json\n\n", "data: [DONE]\n\n"); + + let _ = run_stream(sse_body).await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.api_request") && line.contains("http.response.status_code=200") + }) + .map(|_| Ok(())) + .unwrap_or(Err("cannot find codex.api_request event".to_string())) + }); + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("error.message") + && line.contains("expected ident at line 1 column 2") + }) + .map(|_| Ok(())) + .unwrap_or(Err("cannot find SSE event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn chat_sse_done_chunk_emits_event() { + if network_disabled() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + let sse_body = "data: [DONE]\n\n"; + + let _ = run_stream(sse_body).await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| line.contains("codex.sse_event") && line.contains("event.kind=message")) + .map(|_| Ok(())) + .unwrap_or(Err("cannot find SSE event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn chat_sse_emits_error_on_invalid_utf8() { + if network_disabled() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + let _ = run_stream_with_bytes(b"data: \x80\x80\n\n").await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("error.message") + && line.contains("UTF8 error: invalid utf-8 sequence of 1 bytes from index 0") + }) + .map(|_| Ok(())) + .unwrap_or(Err("cannot find SSE event".to_string())) + }); +} diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 6ba10a70..b13e7599 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -75,6 +75,33 @@ pub fn ev_function_call(call_id: &str, name: &str, arguments: &str) -> Value { }) } +pub fn ev_custom_tool_call(call_id: &str, name: &str, input: &str) -> Value { + serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "custom_tool_call", + "call_id": call_id, + "name": name, + "input": input + } + }) +} + +pub fn ev_local_shell_call(call_id: &str, status: &str, command: Vec<&str>) -> Value { + serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "local_shell_call", + "call_id": call_id, + "status": status, + "action": { + "type": "exec", + "command": command, + } + } + }) +} + /// Convenience: SSE event for an `apply_patch` custom tool call with raw patch /// text. This mirrors the payload produced by the Responses API when the model /// invokes `apply_patch` directly (before we convert it to a function call). @@ -114,7 +141,7 @@ pub fn sse_response(body: String) -> ResponseTemplate { .set_body_raw(body, "text/event-stream") } -pub async fn mount_sse_once(server: &MockServer, matcher: M, body: String) +pub async fn mount_sse_once_match(server: &MockServer, matcher: M, body: String) where M: wiremock::Match + Send + Sync + 'static, { @@ -127,6 +154,23 @@ where .await; } +pub async fn mount_sse_once(server: &MockServer, body: String) { + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(sse_response(body)) + .expect(1) + .mount(server) + .await; +} + +pub async fn mount_sse(server: &MockServer, body: String) { + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(sse_response(body)) + .mount(server) + .await; +} + pub async fn start_mock_server() -> MockServer { MockServer::builder() .body_print_limit(BodyPrintLimit::Limited(80_000)) diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index 2fa3d4df..368f7f74 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -4,7 +4,7 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; use core_test_support::responses::ev_function_call; -use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::test_codex::test_codex; @@ -30,7 +30,7 @@ async fn interrupt_long_running_tool_emits_turn_aborted() { let body = sse(vec![ev_function_call("call_sleep", "shell", &args)]); let server = start_mock_server().await; - mount_sse_once(&server, body_string_contains("start sleep"), body).await; + mount_sse_once_match(&server, body_string_contains("start sleep"), body).await; let codex = test_codex().build(&server).await.unwrap().codex; diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index cf15e02d..7b8eb429 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -16,6 +16,8 @@ use codex_core::built_in_model_providers; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_otel::otel_event_manager::OtelEventManager; +use codex_protocol::mcp_protocol::AuthMode; use codex_protocol::mcp_protocol::ConversationId; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::WebSearchAction; @@ -664,13 +666,26 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { let summary = config.model_reasoning_summary; let config = Arc::new(config); + let conversation_id = ConversationId::new(); + + let otel_event_manager = OtelEventManager::new( + conversation_id, + config.model.as_str(), + config.model_family.slug.as_str(), + None, + Some(AuthMode::ChatGPT), + false, + "test".to_string(), + ); + let client = ModelClient::new( Arc::clone(&config), None, + otel_event_manager, provider, effort, summary, - ConversationId::new(), + conversation_id, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index ede25588..a8515296 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -25,7 +25,7 @@ use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_completed_with_tokens; use core_test_support::responses::ev_function_call; -use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; use core_test_support::responses::sse_response; use core_test_support::responses::start_mock_server; @@ -79,19 +79,19 @@ async fn summarize_context_three_requests_and_instructions() { body.contains("\"text\":\"hello world\"") && !body.contains("You have exceeded the maximum number of tokens") }; - mount_sse_once(&server, first_matcher, sse1).await; + mount_sse_once_match(&server, first_matcher, sse1).await; let second_matcher = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("You have exceeded the maximum number of tokens") }; - mount_sse_once(&server, second_matcher, sse2).await; + mount_sse_once_match(&server, second_matcher, sse2).await; let third_matcher = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains(&format!("\"text\":\"{THIRD_USER_MSG}\"")) }; - mount_sse_once(&server, third_matcher, sse3).await; + mount_sse_once_match(&server, third_matcher, sse3).await; // Build config pointing to the mock server and spawn Codex. let model_provider = ModelProviderInfo { diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 73ad091f..690e1aab 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -25,7 +25,7 @@ use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use core_test_support::load_default_config_for_test; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; -use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::sse; use core_test_support::wait_for_event; use pretty_assertions::assert_eq; @@ -702,13 +702,13 @@ async fn mount_initial_flow(server: &MockServer) { && !body.contains("\"text\":\"AFTER_RESUME\"") && !body.contains("\"text\":\"AFTER_FORK\"") }; - mount_sse_once(server, match_first, sse1).await; + mount_sse_once_match(server, match_first, sse1).await; let match_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("You have exceeded the maximum number of tokens") }; - mount_sse_once(server, match_compact, sse2).await; + mount_sse_once_match(server, match_compact, sse2).await; let match_after_compact = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); @@ -716,19 +716,19 @@ async fn mount_initial_flow(server: &MockServer) { && !body.contains("\"text\":\"AFTER_RESUME\"") && !body.contains("\"text\":\"AFTER_FORK\"") }; - mount_sse_once(server, match_after_compact, sse3).await; + mount_sse_once_match(server, match_after_compact, sse3).await; let match_after_resume = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("\"text\":\"AFTER_RESUME\"") }; - mount_sse_once(server, match_after_resume, sse4).await; + mount_sse_once_match(server, match_after_resume, sse4).await; let match_after_fork = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains("\"text\":\"AFTER_FORK\"") }; - mount_sse_once(server, match_after_fork, sse5).await; + mount_sse_once_match(server, match_after_fork, sse5).await; } async fn mount_second_compact_flow(server: &MockServer) { @@ -743,13 +743,13 @@ async fn mount_second_compact_flow(server: &MockServer) { body.contains("You have exceeded the maximum number of tokens") && body.contains("AFTER_FORK") }; - mount_sse_once(server, match_second_compact, sse6).await; + mount_sse_once_match(server, match_second_compact, sse6).await; let match_after_second_resume = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains(&format!("\"text\":\"{AFTER_SECOND_RESUME}\"")) }; - mount_sse_once(server, match_after_second_resume, sse7).await; + mount_sse_once_match(server, match_after_second_resume, sse7).await; } async fn start_test_conversation( diff --git a/codex-rs/core/tests/suite/json_result.rs b/codex-rs/core/tests/suite/json_result.rs index 90d92232..e8d91fae 100644 --- a/codex-rs/core/tests/suite/json_result.rs +++ b/codex-rs/core/tests/suite/json_result.rs @@ -67,7 +67,7 @@ async fn codex_returns_json_result(model: String) -> anyhow::Result<()> { && format.get("strict") == Some(&serde_json::Value::Bool(true)) && format.get("schema") == Some(&expected_schema) }; - responses::mount_sse_once(&server, match_json_text_param, sse1).await; + responses::mount_sse_once_match(&server, match_json_text_param, sse1).await; let TestCodex { codex, cwd, .. } = test_codex().build(&server).await?; diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 5dd2cb67..b3a90ff3 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -12,6 +12,7 @@ mod fork_conversation; mod json_result; mod live_cli; mod model_overrides; +mod otel; mod prompt_caching; mod review; mod rmcp_client; diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs new file mode 100644 index 00000000..2e7fe317 --- /dev/null +++ b/codex-rs/core/tests/suite/otel.rs @@ -0,0 +1,1196 @@ +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::InputItem; +use codex_protocol::protocol::Op; +use codex_protocol::protocol::ReviewDecision; +use codex_protocol::protocol::SandboxPolicy; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_custom_tool_call; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::mount_sse; +use core_test_support::responses::mount_sse_once; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::TestCodex; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event_with_timeout; +use std::time::Duration; +use tracing_test::traced_test; + +use core_test_support::responses::ev_local_shell_call; + +#[tokio::test] +#[traced_test] +async fn responses_api_emits_api_request_event() { + let server = start_mock_server().await; + + mount_sse_once(&server, sse(vec![ev_completed("done")])).await; + + let TestCodex { codex, .. } = test_codex().build(&server).await.unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| line.contains("codex.api_request")) + .map(|_| Ok(())) + .unwrap_or_else(|| Err("expected codex.api_request event".to_string())) + }); + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| line.contains("codex.conversation_starts")) + .map(|_| Ok(())) + .unwrap_or_else(|| Err("expected codex.conversation_starts event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_emits_tracing_for_output_item() { + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![ev_assistant_message("id1", "hi"), ev_completed("id2")]), + ) + .await; + + let TestCodex { codex, .. } = test_codex().build(&server).await.unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("event.kind=response.output_item.done") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing response.output_item.done event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_emits_failed_event_on_parse_error() { + let server = start_mock_server().await; + + mount_sse_once(&server, "data: not-json\n\n".to_string()).await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("error.message") + && line.contains("expected ident at line 1 column 2") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing codex.sse_event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_records_failed_event_when_stream_closes_without_completed() { + let server = start_mock_server().await; + + mount_sse_once(&server, sse(vec![ev_assistant_message("id", "hi")])).await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("error.message") + && line.contains("stream closed before response.completed") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing codex.sse_event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_failed_event_records_response_error_message() { + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![serde_json::json!({ + "type": "response.failed", + "response": { + "error": { + "message": "boom", + "code": "bad" + } + } + })]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("event.kind=response.failed") + && line.contains("error.message") + && line.contains("boom") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing codex.sse_event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_failed_event_logs_parse_error() { + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![serde_json::json!({ + "type": "response.failed", + "response": { + "error": "not-an-object" + } + })]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") && line.contains("event.kind=response.failed") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing codex.sse_event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_failed_event_logs_missing_error() { + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![serde_json::json!({ + "type": "response.failed", + "response": {} + })]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") && line.contains("event.kind=response.failed") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing codex.sse_event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_failed_event_logs_response_completed_parse_error() { + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![serde_json::json!({ + "type": "response.completed", + "response": {} + })]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("event.kind=response.completed") + && line.contains("error.message") + && line.contains("failed to parse ResponseCompleted") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing codex.sse_event".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn process_sse_emits_completed_telemetry() { + let server = start_mock_server().await; + + mount_sse_once( + &server, + sse(vec![serde_json::json!({ + "type": "response.completed", + "response": { + "id": "resp1", + "usage": { + "input_tokens": 3, + "input_tokens_details": { "cached_tokens": 1 }, + "output_tokens": 5, + "output_tokens_details": { "reasoning_tokens": 2 }, + "total_tokens": 9 + } + } + })]), + ) + .await; + + let TestCodex { codex, .. } = test_codex().build(&server).await.unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + lines + .iter() + .find(|line| { + line.contains("codex.sse_event") + && line.contains("event.kind=response.completed") + && line.contains("input_token_count=3") + && line.contains("output_token_count=5") + && line.contains("cached_token_count=1") + && line.contains("reasoning_token_count=2") + && line.contains("tool_token_count=9") + }) + .map(|_| Ok(())) + .unwrap_or(Err("missing response.completed telemetry".to_string())) + }); +} + +#[tokio::test] +#[traced_test] +async fn handle_response_item_records_tool_result_for_custom_tool_call() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_custom_tool_call( + "custom-tool-call", + "unsupported_tool", + "{\"key\":\"value\"}", + ), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + let line = lines + .iter() + .find(|line| { + line.contains("codex.tool_result") && line.contains("call_id=custom-tool-call") + }) + .ok_or_else(|| "missing codex.tool_result event".to_string())?; + + if !line.contains("tool_name=unsupported_tool") { + return Err("missing tool_name field".to_string()); + } + if !line.contains("arguments={\"key\":\"value\"}") { + return Err("missing arguments field".to_string()); + } + if !line.contains("output=unsupported custom tool call: unsupported_tool") { + return Err("missing output field".to_string()); + } + if !line.contains("success=false") { + return Err("missing success field".to_string()); + } + + Ok(()) + }); +} + +#[tokio::test] +#[traced_test] +async fn handle_response_item_records_tool_result_for_function_call() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_function_call("function-call", "nonexistent", "{\"value\":1}"), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + let line = lines + .iter() + .find(|line| { + line.contains("codex.tool_result") && line.contains("call_id=function-call") + }) + .ok_or_else(|| "missing codex.tool_result event".to_string())?; + + if !line.contains("tool_name=nonexistent") { + return Err("missing tool_name field".to_string()); + } + if !line.contains("arguments={\"value\":1}") { + return Err("missing arguments field".to_string()); + } + if !line.contains("output=unsupported call: nonexistent") { + return Err("missing output field".to_string()); + } + if !line.contains("success=false") { + return Err("missing success field".to_string()); + } + + Ok(()) + }); +} + +#[tokio::test] +#[traced_test] +async fn handle_response_item_records_tool_result_for_local_shell_missing_ids() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "local_shell_call", + "status": "completed", + "action": { + "type": "exec", + "command": vec!["/bin/echo", "hello"], + } + } + }), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + let line = lines + .iter() + .find(|line| { + line.contains("codex.tool_result") + && line.contains(&"tool_name=local_shell".to_string()) + && line.contains("output=LocalShellCall without call_id or id") + }) + .ok_or_else(|| "missing codex.tool_result event".to_string())?; + + if !line.contains("success=false") { + return Err("missing success field".to_string()); + } + + Ok(()) + }); +} + +#[cfg(target_os = "macos")] +#[tokio::test] +#[traced_test] +async fn handle_response_item_records_tool_result_for_local_shell_call() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_local_shell_call("shell-call", "completed", vec!["/bin/echo", "shell"]), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(move |config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(|lines: &[&str]| { + let line = lines + .iter() + .find(|line| line.contains("codex.tool_result") && line.contains("call_id=shell-call")) + .ok_or_else(|| "missing codex.tool_result event".to_string())?; + + if !line.contains("tool_name=local_shell") { + return Err("missing tool_name field".to_string()); + } + if !line.contains("arguments=/bin/echo shell") { + return Err("missing arguments field".to_string()); + } + let output_idx = line + .find("output=") + .ok_or_else(|| "missing output field".to_string())?; + if line[output_idx + "output=".len()..].is_empty() { + return Err("empty output field".to_string()); + } + if !line.contains("success=false") { + return Err("missing success field".to_string()); + } + + Ok(()) + }); +} + +fn tool_decision_assertion<'a>( + call_id: &'a str, + expected_decision: &'a str, + expected_source: &'a str, +) -> impl Fn(&[&str]) -> Result<(), String> + 'a { + let call_id = call_id.to_string(); + let expected_decision = expected_decision.to_string(); + let expected_source = expected_source.to_string(); + + move |lines: &[&str]| { + let line = lines + .iter() + .find(|line| { + line.contains("codex.tool_decision") && line.contains(&format!("call_id={call_id}")) + }) + .ok_or_else(|| format!("missing codex.tool_decision event for {call_id}"))?; + + let lower = line.to_lowercase(); + if !lower.contains("tool_name=local_shell") { + return Err("missing tool_name for local_shell".to_string()); + } + if !lower.contains(&format!("decision={expected_decision}")) { + return Err(format!("unexpected decision for {call_id}")); + } + if !lower.contains(&format!("source={expected_source}")) { + return Err(format!("unexpected source for {expected_source}")); + } + + Ok(()) + } +} + +#[tokio::test] +#[traced_test] +async fn handle_container_exec_autoapprove_from_config_records_tool_decision() { + let server = start_mock_server().await; + mount_sse( + &server, + sse(vec![ + ev_local_shell_call("auto_config_call", "completed", vec!["/bin/echo", "hello"]), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.approval_policy = AskForApproval::OnRequest; + config.sandbox_policy = SandboxPolicy::DangerFullAccess; + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(tool_decision_assertion( + "auto_config_call", + "approved", + "config", + )); +} + +#[tokio::test] +#[traced_test] +async fn handle_container_exec_user_approved_records_tool_decision() { + let server = start_mock_server().await; + mount_sse( + &server, + sse(vec![ + ev_local_shell_call( + "user_approved_call", + "completed", + vec!["/bin/echo", "approved"], + ), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.approval_policy = AskForApproval::UnlessTrusted; + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "approved".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::ExecApprovalRequest(_)), + Duration::from_secs(5), + ) + .await; + + codex + .submit(Op::ExecApproval { + id: "0".into(), + decision: ReviewDecision::Approved, + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(tool_decision_assertion( + "user_approved_call", + "approved", + "user", + )); +} + +#[tokio::test] +#[traced_test] +async fn handle_container_exec_user_approved_for_session_records_tool_decision() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_local_shell_call( + "user_approved_session_call", + "completed", + vec!["/bin/echo", "persist"], + ), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.approval_policy = AskForApproval::UnlessTrusted; + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "persist".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::ExecApprovalRequest(_)), + Duration::from_secs(5), + ) + .await; + + codex + .submit(Op::ExecApproval { + id: "0".into(), + decision: ReviewDecision::ApprovedForSession, + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(tool_decision_assertion( + "user_approved_session_call", + "approvedforsession", + "user", + )); +} + +#[tokio::test] +#[traced_test] +async fn handle_sandbox_error_user_approves_retry_records_tool_decision() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_local_shell_call( + "sandbox_retry_call", + "completed", + vec!["/bin/echo", "retry"], + ), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.approval_policy = AskForApproval::UnlessTrusted; + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "retry".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::ExecApprovalRequest(_)), + Duration::from_secs(5), + ) + .await; + + codex + .submit(Op::ExecApproval { + id: "0".into(), + decision: ReviewDecision::Approved, + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(tool_decision_assertion( + "sandbox_retry_call", + "approved", + "user", + )); +} + +#[tokio::test] +#[traced_test] +async fn handle_container_exec_user_denies_records_tool_decision() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_local_shell_call("user_denied_call", "completed", vec!["/bin/echo", "deny"]), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.approval_policy = AskForApproval::UnlessTrusted; + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "deny".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::ExecApprovalRequest(_)), + Duration::from_secs(5), + ) + .await; + + codex + .submit(Op::ExecApproval { + id: "0".into(), + decision: ReviewDecision::Denied, + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(tool_decision_assertion( + "user_denied_call", + "denied", + "user", + )); +} + +#[tokio::test] +#[traced_test] +async fn handle_sandbox_error_user_approves_for_session_records_tool_decision() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_local_shell_call( + "sandbox_session_call", + "completed", + vec!["/bin/echo", "persist"], + ), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.approval_policy = AskForApproval::UnlessTrusted; + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "persist".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::ExecApprovalRequest(_)), + Duration::from_secs(5), + ) + .await; + + codex + .submit(Op::ExecApproval { + id: "0".into(), + decision: ReviewDecision::ApprovedForSession, + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(tool_decision_assertion( + "sandbox_session_call", + "approvedforsession", + "user", + )); +} + +#[tokio::test] +#[traced_test] +async fn handle_sandbox_error_user_denies_records_tool_decision() { + let server = start_mock_server().await; + + mount_sse( + &server, + sse(vec![ + ev_local_shell_call("sandbox_deny_call", "completed", vec!["/bin/echo", "deny"]), + ev_completed("done"), + ]), + ) + .await; + + let TestCodex { codex, .. } = test_codex() + .with_config(|config| { + config.approval_policy = AskForApproval::UnlessTrusted; + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(0); + }) + .build(&server) + .await + .unwrap(); + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "deny".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::ExecApprovalRequest(_)), + Duration::from_secs(5), + ) + .await; + + codex + .submit(Op::ExecApproval { + id: "0".into(), + decision: ReviewDecision::Denied, + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TokenCount(_)), + Duration::from_secs(5), + ) + .await; + + logs_assert(tool_decision_assertion( + "sandbox_deny_call", + "denied", + "user", + )); +} diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index aed84ec6..bd168240 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -12,7 +12,7 @@ use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; use core_test_support::responses; -use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_once_match; use core_test_support::skip_if_no_network; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; @@ -36,7 +36,7 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { let server_name = "rmcp"; let tool_name = format!("{server_name}__echo"); - mount_sse_once( + mount_sse_once_match( &server, any(), responses::sse(vec![ @@ -49,7 +49,7 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { ]), ) .await; - mount_sse_once( + mount_sse_once_match( &server, any(), responses::sse(vec![ @@ -173,7 +173,7 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> { let server_name = "rmcp_http"; let tool_name = format!("{server_name}__echo"); - mount_sse_once( + mount_sse_once_match( &server, any(), responses::sse(vec![ @@ -186,7 +186,7 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> { ]), ) .await; - mount_sse_once( + mount_sse_once_match( &server, any(), responses::sse(vec![ diff --git a/codex-rs/core/tests/suite/user_notification.rs b/codex-rs/core/tests/suite/user_notification.rs index c1622394..2ad87c7b 100644 --- a/codex-rs/core/tests/suite/user_notification.rs +++ b/codex-rs/core/tests/suite/user_notification.rs @@ -28,7 +28,7 @@ async fn summarize_context_three_requests_and_instructions() -> anyhow::Result<( let sse1 = sse(vec![ev_assistant_message("m1", "Done"), ev_completed("r1")]); - responses::mount_sse_once(&server, any(), sse1).await; + responses::mount_sse_once_match(&server, any(), sse1).await; let notify_dir = TempDir::new()?; // write a script to the notify that touches a file next to it diff --git a/codex-rs/exec/Cargo.toml b/codex-rs/exec/Cargo.toml index 8603e8fd..e37865b2 100644 --- a/codex-rs/exec/Cargo.toml +++ b/codex-rs/exec/Cargo.toml @@ -40,12 +40,14 @@ tokio = { workspace = true, features = [ ] } tracing = { workspace = true, features = ["log"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } +opentelemetry-appender-tracing = { workspace = true } ts-rs = { workspace = true, features = [ "uuid-impl", "serde-json-impl", "no-serde-warnings", ] } + [dev-dependencies] assert_cmd = { workspace = true } core_test_support = { workspace = true } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index da23fb1b..de468493 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -5,10 +5,6 @@ pub mod event_processor_with_json_output; pub mod exec_events; pub mod experimental_event_processor_with_json_output; -use std::io::IsTerminal; -use std::io::Read; -use std::path::PathBuf; - pub use cli::Cli; use codex_core::AuthManager; use codex_core::BUILT_IN_OSS_MODEL_PROVIDER_ID; @@ -26,17 +22,22 @@ use codex_core::protocol::TaskCompleteEvent; use codex_ollama::DEFAULT_OSS_MODEL; use codex_protocol::config_types::SandboxMode; use event_processor_with_human_output::EventProcessorWithHumanOutput; +use event_processor_with_json_output::EventProcessorWithJsonOutput; use experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use serde_json::Value; +use std::io::IsTerminal; +use std::io::Read; +use std::path::PathBuf; use tracing::debug; use tracing::error; use tracing::info; use tracing_subscriber::EnvFilter; +use tracing_subscriber::prelude::*; use crate::cli::Command as ExecCommand; use crate::event_processor::CodexStatus; use crate::event_processor::EventProcessor; -use crate::event_processor_with_json_output::EventProcessorWithJsonOutput; use codex_core::find_conversation_path_by_id_str; pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> anyhow::Result<()> { @@ -114,19 +115,18 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any ), }; - // TODO(mbolin): Take a more thoughtful approach to logging. + // Build fmt layer (existing logging) to compose with OTEL layer. let default_level = "error"; - let _ = tracing_subscriber::fmt() - // Fallback to the `default_level` log filter if the environment - // variable is not set _or_ contains an invalid value - .with_env_filter( - EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new(default_level)) - .unwrap_or_else(|_| EnvFilter::new(default_level)), - ) + + // Build env_filter separately and attach via with_filter. + let env_filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new(default_level)) + .unwrap_or_else(|_| EnvFilter::new(default_level)); + + let fmt_layer = tracing_subscriber::fmt::layer() .with_ansi(stderr_with_ansi) .with_writer(std::io::stderr) - .try_init(); + .with_filter(env_filter); let sandbox_mode = if full_auto { Some(SandboxMode::WorkspaceWrite) @@ -182,6 +182,31 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any }; let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?; + + let otel = codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")); + + #[allow(clippy::print_stderr)] + let otel = match otel { + Ok(otel) => otel, + Err(e) => { + eprintln!("Could not create otel exporter: {e}"); + std::process::exit(1); + } + }; + + if let Some(provider) = otel.as_ref() { + let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter( + tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter), + ); + + let _ = tracing_subscriber::registry() + .with(fmt_layer) + .with(otel_layer) + .try_init(); + } else { + let _ = tracing_subscriber::registry().with(fmt_layer).try_init(); + } + let mut event_processor: Box = match (json_mode, experimental_json) { (_, true) => Box::new(ExperimentalEventProcessorWithJsonOutput::new( last_message_file.clone(), diff --git a/codex-rs/exec/tests/suite/output_schema.rs b/codex-rs/exec/tests/suite/output_schema.rs index 03a04d05..d9fcb763 100644 --- a/codex-rs/exec/tests/suite/output_schema.rs +++ b/codex-rs/exec/tests/suite/output_schema.rs @@ -31,7 +31,7 @@ async fn exec_includes_output_schema_in_request() -> anyhow::Result<()> { responses::ev_assistant_message("m1", "fixture hello"), responses::ev_completed("resp1"), ]); - responses::mount_sse_once(&server, any(), body).await; + responses::mount_sse_once_match(&server, any(), body).await; test.cmd_with_server(&server) .arg("--skip-git-repo-check") diff --git a/codex-rs/exec/tests/suite/server_error_exit.rs b/codex-rs/exec/tests/suite/server_error_exit.rs index 6b6358f0..051306ce 100644 --- a/codex-rs/exec/tests/suite/server_error_exit.rs +++ b/codex-rs/exec/tests/suite/server_error_exit.rs @@ -21,7 +21,7 @@ async fn exits_non_zero_when_server_reports_error() -> anyhow::Result<()> { "error": {"code": "rate_limit_exceeded", "message": "synthetic server error"} } })]); - responses::mount_sse_once(&server, any(), body).await; + responses::mount_sse_once_match(&server, any(), body).await; test.cmd_with_server(&server) .arg("--skip-git-repo-check") diff --git a/codex-rs/otel/Cargo.toml b/codex-rs/otel/Cargo.toml new file mode 100644 index 00000000..610423f9 --- /dev/null +++ b/codex-rs/otel/Cargo.toml @@ -0,0 +1,39 @@ +[package] +edition = "2024" +name = "codex-otel" +version = { workspace = true } + +[lib] +name = "codex_otel" +path = "src/lib.rs" +doctest = false + +[lints] +workspace = true + +[features] +# Compile-time gate for OTLP support; disabled by default. +# Downstream crates can enable via `features = ["otel"]`. +default = [] +otel = [ + "opentelemetry", + "opentelemetry_sdk", + "opentelemetry-otlp", + "tonic", +] + +[dependencies] +codex-protocol = { path = "../protocol" } +chrono = { workspace = true } +tracing = { workspace = true } +opentelemetry = { workspace = true, features = ["logs"], optional = true } +opentelemetry_sdk = { workspace = true, features = ["logs", "rt-tokio"], optional = true } +opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "http-proto", "http-json", "reqwest", "reqwest-rustls"], optional = true } +opentelemetry-semantic-conventions = { workspace = true } +tonic = { workspace = true, optional = true } +serde = { workspace = true, features = ["derive"] } +strum_macros = { workspace = true } +reqwest = { workspace = true } +eventsource-stream = { workspace = true } +tokio = { workspace = true } +serde_json = { workspace = true } \ No newline at end of file diff --git a/codex-rs/otel/src/config.rs b/codex-rs/otel/src/config.rs new file mode 100644 index 00000000..77063ed0 --- /dev/null +++ b/codex-rs/otel/src/config.rs @@ -0,0 +1,33 @@ +use std::collections::HashMap; +use std::path::PathBuf; + +#[derive(Clone, Debug)] +pub struct OtelSettings { + pub environment: String, + pub service_name: String, + pub service_version: String, + pub codex_home: PathBuf, + pub exporter: OtelExporter, +} + +#[derive(Clone, Debug)] +pub enum OtelHttpProtocol { + /// HTTP protocol with binary protobuf + Binary, + /// HTTP protocol with JSON payload + Json, +} + +#[derive(Clone, Debug)] +pub enum OtelExporter { + None, + OtlpGrpc { + endpoint: String, + headers: HashMap, + }, + OtlpHttp { + endpoint: String, + headers: HashMap, + protocol: OtelHttpProtocol, + }, +} diff --git a/codex-rs/otel/src/lib.rs b/codex-rs/otel/src/lib.rs new file mode 100644 index 00000000..3fdb6f74 --- /dev/null +++ b/codex-rs/otel/src/lib.rs @@ -0,0 +1,26 @@ +pub mod config; + +pub mod otel_event_manager; +#[cfg(feature = "otel")] +pub mod otel_provider; + +#[cfg(not(feature = "otel"))] +mod imp { + use reqwest::header::HeaderMap; + use tracing::Span; + + pub struct OtelProvider; + + impl OtelProvider { + pub fn from(_settings: &crate::config::OtelSettings) -> Option { + None + } + + pub fn headers(_span: &Span) -> HeaderMap { + HeaderMap::new() + } + } +} + +#[cfg(not(feature = "otel"))] +pub use imp::OtelProvider; diff --git a/codex-rs/otel/src/otel_event_manager.rs b/codex-rs/otel/src/otel_event_manager.rs new file mode 100644 index 00000000..87ed6bb5 --- /dev/null +++ b/codex-rs/otel/src/otel_event_manager.rs @@ -0,0 +1,459 @@ +use chrono::SecondsFormat; +use chrono::Utc; +use codex_protocol::config_types::ReasoningEffort; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::mcp_protocol::AuthMode; +use codex_protocol::mcp_protocol::ConversationId; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::InputItem; +use codex_protocol::protocol::ReviewDecision; +use codex_protocol::protocol::SandboxPolicy; +use eventsource_stream::Event as StreamEvent; +use eventsource_stream::EventStreamError as StreamError; +use reqwest::Error; +use reqwest::Response; +use serde::Serialize; +use std::fmt::Display; +use std::time::Duration; +use std::time::Instant; +use strum_macros::Display; +use tokio::time::error::Elapsed; + +#[derive(Debug, Clone, Serialize, Display)] +#[serde(rename_all = "snake_case")] +pub enum ToolDecisionSource { + Config, + User, +} + +#[derive(Debug, Clone)] +pub struct OtelEventMetadata { + conversation_id: ConversationId, + auth_mode: Option, + account_id: Option, + model: String, + slug: String, + log_user_prompts: bool, + app_version: &'static str, + terminal_type: String, +} + +#[derive(Debug, Clone)] +pub struct OtelEventManager { + metadata: OtelEventMetadata, +} + +impl OtelEventManager { + pub fn new( + conversation_id: ConversationId, + model: &str, + slug: &str, + account_id: Option, + auth_mode: Option, + log_user_prompts: bool, + terminal_type: String, + ) -> OtelEventManager { + Self { + metadata: OtelEventMetadata { + conversation_id, + auth_mode: auth_mode.map(|m| m.to_string()), + account_id, + model: model.to_owned(), + slug: slug.to_owned(), + log_user_prompts, + app_version: env!("CARGO_PKG_VERSION"), + terminal_type, + }, + } + } + + pub fn with_model(&self, model: &str, slug: &str) -> Self { + let mut manager = self.clone(); + manager.metadata.model = model.to_owned(); + manager.metadata.slug = slug.to_owned(); + manager + } + + #[allow(clippy::too_many_arguments)] + pub fn conversation_starts( + &self, + provider_name: &str, + reasoning_effort: Option, + reasoning_summary: ReasoningSummary, + context_window: Option, + max_output_tokens: Option, + auto_compact_token_limit: Option, + approval_policy: AskForApproval, + sandbox_policy: SandboxPolicy, + mcp_servers: Vec<&str>, + active_profile: Option, + ) { + tracing::event!( + tracing::Level::INFO, + event.name = "codex.conversation_starts", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + provider_name = %provider_name, + reasoning_effort = reasoning_effort.map(|e| e.to_string()), + reasoning_summary = %reasoning_summary, + context_window = context_window, + max_output_tokens = max_output_tokens, + auto_compact_token_limit = auto_compact_token_limit, + approval_policy = %approval_policy, + sandbox_policy = %sandbox_policy, + mcp_servers = mcp_servers.join(", "), + active_profile = active_profile, + ) + } + + pub async fn log_request(&self, attempt: u64, f: F) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + let start = std::time::Instant::now(); + let response = f().await; + let duration = start.elapsed(); + + let (status, error) = match &response { + Ok(response) => (Some(response.status().as_u16()), None), + Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())), + }; + + tracing::event!( + tracing::Level::INFO, + event.name = "codex.api_request", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + duration_ms = %duration.as_millis(), + http.response.status_code = status, + error.message = error, + attempt = attempt, + ); + + response + } + + pub async fn log_sse_event( + &self, + next: Next, + ) -> Result>>, Elapsed> + where + Next: FnOnce() -> Fut, + Fut: Future>>, Elapsed>>, + E: Display, + { + let start = std::time::Instant::now(); + let response = next().await; + let duration = start.elapsed(); + + match response { + Ok(Some(Ok(ref sse))) => { + if sse.data.trim() == "[DONE]" { + self.sse_event(&sse.event, duration); + } else { + match serde_json::from_str::(&sse.data) { + Ok(error) if sse.event == "response.failed" => { + self.sse_event_failed(Some(&sse.event), duration, &error); + } + Ok(content) if sse.event == "response.output_item.done" => { + match serde_json::from_value::(content) { + Ok(_) => self.sse_event(&sse.event, duration), + Err(_) => { + self.sse_event_failed( + Some(&sse.event), + duration, + &"failed to parse response.output_item.done", + ); + } + }; + } + Ok(_) => { + self.sse_event(&sse.event, duration); + } + Err(error) => { + self.sse_event_failed(Some(&sse.event), duration, &error); + } + } + } + } + Ok(Some(Err(ref error))) => { + self.sse_event_failed(None, duration, error); + } + Ok(None) => {} + Err(_) => { + self.sse_event_failed(None, duration, &"idle timeout waiting for SSE"); + } + } + + response + } + + fn sse_event(&self, kind: &str, duration: Duration) { + tracing::event!( + tracing::Level::INFO, + event.name = "codex.sse_event", + event.timestamp = %timestamp(), + event.kind = %kind, + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + duration_ms = %duration.as_millis(), + ); + } + + pub fn sse_event_failed(&self, kind: Option<&String>, duration: Duration, error: &T) + where + T: Display, + { + match kind { + Some(kind) => tracing::event!( + tracing::Level::INFO, + event.name = "codex.sse_event", + event.timestamp = %timestamp(), + event.kind = %kind, + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + duration_ms = %duration.as_millis(), + error.message = %error, + ), + None => tracing::event!( + tracing::Level::INFO, + event.name = "codex.sse_event", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + duration_ms = %duration.as_millis(), + error.message = %error, + ), + } + } + + pub fn see_event_completed_failed(&self, error: &T) + where + T: Display, + { + tracing::event!( + tracing::Level::INFO, + event.name = "codex.sse_event", + event.kind = %"response.completed", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + error.message = %error, + ) + } + + pub fn sse_event_completed( + &self, + input_token_count: u64, + output_token_count: u64, + cached_token_count: Option, + reasoning_token_count: Option, + tool_token_count: u64, + ) { + tracing::event!( + tracing::Level::INFO, + event.name = "codex.sse_event", + event.timestamp = %timestamp(), + event.kind = %"response.completed", + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + input_token_count = %input_token_count, + output_token_count = %output_token_count, + cached_token_count = cached_token_count, + reasoning_token_count = reasoning_token_count, + tool_token_count = %tool_token_count, + ); + } + + pub fn user_prompt(&self, items: &[InputItem]) { + let prompt = items + .iter() + .flat_map(|item| match item { + InputItem::Text { text } => Some(text.as_str()), + _ => None, + }) + .collect::(); + + let prompt_to_log = if self.metadata.log_user_prompts { + prompt.as_str() + } else { + "[REDACTED]" + }; + + tracing::event!( + tracing::Level::INFO, + event.name = "codex.user_prompt", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + prompt_length = %prompt.chars().count(), + prompt = %prompt_to_log, + ); + } + + pub fn tool_decision( + &self, + tool_name: &str, + call_id: &str, + decision: ReviewDecision, + source: ToolDecisionSource, + ) { + tracing::event!( + tracing::Level::INFO, + event.name = "codex.tool_decision", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + tool_name = %tool_name, + call_id = %call_id, + decision = %decision.to_string().to_lowercase(), + source = %source.to_string(), + ); + } + + pub async fn log_tool_result( + &self, + tool_name: &str, + call_id: &str, + arguments: &str, + f: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + E: Display, + { + let start = Instant::now(); + let result = f().await; + let duration = start.elapsed(); + + let (output, success) = match &result { + Ok(content) => (content, true), + Err(error) => (&error.to_string(), false), + }; + + tracing::event!( + tracing::Level::INFO, + event.name = "codex.tool_result", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + tool_name = %tool_name, + call_id = %call_id, + arguments = %arguments, + duration_ms = %duration.as_millis(), + success = %success, + output = %output, + ); + + result + } + + pub fn log_tool_failed(&self, tool_name: &str, error: &str) { + tracing::event!( + tracing::Level::INFO, + event.name = "codex.tool_result", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + tool_name = %tool_name, + duration_ms = %Duration::ZERO.as_millis(), + success = %false, + output = %error, + ); + } + + pub fn tool_result( + &self, + tool_name: &str, + call_id: &str, + arguments: &str, + duration: Duration, + success: bool, + output: &str, + ) { + let success_str = if success { "true" } else { "false" }; + + tracing::event!( + tracing::Level::INFO, + event.name = "codex.tool_result", + event.timestamp = %timestamp(), + conversation.id = %self.metadata.conversation_id, + app.version = %self.metadata.app_version, + auth_mode = self.metadata.auth_mode, + user.account_id = self.metadata.account_id, + terminal.type = %self.metadata.terminal_type, + model = %self.metadata.model, + slug = %self.metadata.slug, + tool_name = %tool_name, + call_id = %call_id, + arguments = %arguments, + duration_ms = %duration.as_millis(), + success = %success_str, + output = %output, + ); + } +} + +fn timestamp() -> String { + Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true) +} diff --git a/codex-rs/otel/src/otel_provider.rs b/codex-rs/otel/src/otel_provider.rs new file mode 100644 index 00000000..222322a2 --- /dev/null +++ b/codex-rs/otel/src/otel_provider.rs @@ -0,0 +1,103 @@ +use crate::config::OtelExporter; +use crate::config::OtelHttpProtocol; +use crate::config::OtelSettings; +use opentelemetry::KeyValue; +use opentelemetry_otlp::LogExporter; +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::WithHttpConfig; +use opentelemetry_otlp::WithTonicConfig; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_semantic_conventions as semconv; +use reqwest::header::HeaderMap; +use reqwest::header::HeaderName; +use reqwest::header::HeaderValue; +use std::error::Error; +use tonic::metadata::MetadataMap; +use tracing::debug; + +const ENV_ATTRIBUTE: &str = "env"; + +pub struct OtelProvider { + pub logger: SdkLoggerProvider, +} + +impl OtelProvider { + pub fn shutdown(&self) { + let _ = self.logger.shutdown(); + } + + pub fn from(settings: &OtelSettings) -> Result, Box> { + let resource = Resource::builder() + .with_service_name(settings.service_name.clone()) + .with_attributes(vec![ + KeyValue::new( + semconv::attribute::SERVICE_VERSION, + settings.service_version.clone(), + ), + KeyValue::new(ENV_ATTRIBUTE, settings.environment.clone()), + ]) + .build(); + + let mut builder = SdkLoggerProvider::builder().with_resource(resource); + + match &settings.exporter { + OtelExporter::None => { + debug!("No exporter enabled in OTLP settings."); + return Ok(None); + } + OtelExporter::OtlpGrpc { endpoint, headers } => { + debug!("Using OTLP Grpc exporter: {}", endpoint); + + let mut header_map = HeaderMap::new(); + for (key, value) in headers { + if let Ok(name) = HeaderName::from_bytes(key.as_bytes()) + && let Ok(val) = HeaderValue::from_str(value) + { + header_map.insert(name, val); + } + } + + let exporter = LogExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_metadata(MetadataMap::from_headers(header_map)) + .build()?; + + builder = builder.with_batch_exporter(exporter); + } + OtelExporter::OtlpHttp { + endpoint, + headers, + protocol, + } => { + debug!("Using OTLP Http exporter: {}", endpoint); + + let protocol = match protocol { + OtelHttpProtocol::Binary => Protocol::HttpBinary, + OtelHttpProtocol::Json => Protocol::HttpJson, + }; + + let exporter = LogExporter::builder() + .with_http() + .with_endpoint(endpoint) + .with_protocol(protocol) + .with_headers(headers.clone()) + .build()?; + + builder = builder.with_batch_exporter(exporter); + } + } + + Ok(Some(Self { + logger: builder.build(), + })) + } +} + +impl Drop for OtelProvider { + fn drop(&mut self) { + let _ = self.logger.shutdown(); + } +} diff --git a/codex-rs/protocol/src/mcp_protocol.rs b/codex-rs/protocol/src/mcp_protocol.rs index c63bf2ae..a9f808b1 100644 --- a/codex-rs/protocol/src/mcp_protocol.rs +++ b/codex-rs/protocol/src/mcp_protocol.rs @@ -81,7 +81,7 @@ impl GitSha { } } -#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, TS)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Display, TS)] #[serde(rename_all = "lowercase")] pub enum AuthMode { ApiKey, diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index bf0b1925..01b4eb3a 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1206,7 +1206,7 @@ pub struct SessionConfiguredEvent { } /// User's decision in response to an ExecApprovalRequest. -#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, TS)] +#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Display, TS)] #[serde(rename_all = "snake_case")] pub enum ReviewDecision { /// User has approved this command and the agent should execute it. diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index 51f5c235..2e89288b 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -77,6 +77,7 @@ tokio-stream = { workspace = true } tracing = { workspace = true, features = ["log"] } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } +opentelemetry-appender-tracing = { workspace = true } unicode-segmentation = { workspace = true } unicode-width = { workspace = true } url = { workspace = true } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index e959973b..dd3dcb9e 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -20,6 +20,7 @@ use codex_core::protocol::SandboxPolicy; use codex_ollama::DEFAULT_OSS_MODEL; use codex_protocol::config_types::SandboxMode; use codex_protocol::mcp_protocol::AuthMode; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use std::fs::OpenOptions; use std::path::PathBuf; use tracing::error; @@ -239,7 +240,29 @@ pub async fn run_main( .map_err(|e| std::io::Error::other(format!("OSS setup failed: {e}")))?; } - let _ = tracing_subscriber::registry().with(file_layer).try_init(); + let otel = codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")); + + #[allow(clippy::print_stderr)] + let otel = match otel { + Ok(otel) => otel, + Err(e) => { + eprintln!("Could not create otel exporter: {e}"); + std::process::exit(1); + } + }; + + if let Some(provider) = otel.as_ref() { + let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter( + tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter), + ); + + let _ = tracing_subscriber::registry() + .with(file_layer) + .with(otel_layer) + .try_init(); + } else { + let _ = tracing_subscriber::registry().with(file_layer).try_init(); + }; run_ratatui_app(cli, config, active_profile, should_show_trust_screen) .await diff --git a/docs/config.md b/docs/config.md index ba204ee0..e0d4e66c 100644 --- a/docs/config.md +++ b/docs/config.md @@ -435,6 +435,117 @@ set = { PATH = "/usr/bin", MY_FLAG = "1" } Currently, `CODEX_SANDBOX_NETWORK_DISABLED=1` is also added to the environment, assuming network is disabled. This is not configurable. +## otel + +Codex can emit [OpenTelemetry](https://opentelemetry.io/) **log events** that +describe each run: outbound API requests, streamed responses, user input, +tool-approval decisions, and the result of every tool invocation. Export is +**disabled by default** so local runs remain self-contained. Opt in by adding an +`[otel]` table and choosing an exporter. + +```toml +[otel] +environment = "staging" # defaults to "dev" +exporter = "none" # defaults to "none"; set to otlp-http or otlp-grpc to send events +log_user_prompt = false # defaults to false; redact prompt text unless explicitly enabled +``` + +Codex tags every exported event with `service.name = $ORIGINATOR` (the same +value sent in the `originator` header, `codex_cli_rs` by default), the CLI +version, and an `env` attribute so downstream collectors can distinguish +dev/staging/prod traffic. Only telemetry produced inside the `codex_otel` +crate—the events listed below—is forwarded to the exporter. + +### Event catalog + +Every event shares a common set of metadata fields: `event.timestamp`, +`conversation.id`, `app.version`, `auth_mode` (when available), +`user.account_id` (when available), `terminal.type`, `model`, and `slug`. + +With OTEL enabled Codex emits the following event types (in addition to the +metadata above): + +- `codex.conversation_starts` + - `provider_name` + - `reasoning_effort` (optional) + - `reasoning_summary` + - `context_window` (optional) + - `max_output_tokens` (optional) + - `auto_compact_token_limit` (optional) + - `approval_policy` + - `sandbox_policy` + - `mcp_servers` (comma-separated list) + - `active_profile` (optional) +- `codex.api_request` + - `attempt` + - `duration_ms` + - `http.response.status_code` (optional) + - `error.message` (failures) +- `codex.sse_event` + - `event.kind` + - `duration_ms` + - `error.message` (failures) + - `input_token_count` (responses only) + - `output_token_count` (responses only) + - `cached_token_count` (responses only, optional) + - `reasoning_token_count` (responses only, optional) + - `tool_token_count` (responses only) +- `codex.user_prompt` + - `prompt_length` + - `prompt` (redacted unless `log_user_prompt = true`) +- `codex.tool_decision` + - `tool_name` + - `call_id` + - `decision` (`approved`, `approved_for_session`, `denied`, or `abort`) + - `source` (`config` or `user`) +- `codex.tool_result` + - `tool_name` + - `call_id` (optional) + - `arguments` (optional) + - `duration_ms` (execution time for the tool) + - `success` (`"true"` or `"false"`) + - `output` + +These event shapes may change as we iterate. + +### Choosing an exporter + +Set `otel.exporter` to control where events go: + +- `none` – leaves instrumentation active but skips exporting. This is the + default. +- `otlp-http` – posts OTLP log records to an OTLP/HTTP collector. Specify the + endpoint, protocol, and headers your collector expects: + + ```toml + [otel] + exporter = { otlp-http = { + endpoint = "https://otel.example.com/v1/logs", + protocol = "binary", + headers = { "x-otlp-api-key" = "${OTLP_TOKEN}" } + }} + ``` + +- `otlp-grpc` – streams OTLP log records over gRPC. Provide the endpoint and any + metadata headers: + + ```toml + [otel] + exporter = { otlp-grpc = { + endpoint = "https://otel.example.com:4317", + headers = { "x-otlp-meta" = "abc123" } + }} + ``` + +If the exporter is `none` nothing is written anywhere; otherwise you must run or point to your +own collector. All exporters run on a background batch worker that is flushed on +shutdown. + +If you build Codex from source the OTEL crate is still behind an `otel` feature +flag; the official prebuilt binaries ship with the feature enabled. When the +feature is disabled the telemetry hooks become no-ops so the CLI continues to +function without the extra dependencies. + ## notify Specify a program that will be executed to get notified about events generated by Codex. Note that the program will receive the notification argument as a string of JSON, e.g.: