From fac548e430f62efc6baa8de92613a15f82800a86 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 30 Oct 2025 02:49:40 -0700 Subject: [PATCH] Send delegate header (#5942) Send delegate type header --- codex-rs/core/src/chat_completions.rs | 20 +++-- codex-rs/core/src/client.rs | 19 ++-- codex-rs/core/tests/responses_headers.rs | 108 +++++++++++++++++++++-- codex-rs/protocol/src/protocol.rs | 2 + 4 files changed, 129 insertions(+), 20 deletions(-) diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 9b01a6be..abb27d9b 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -21,6 +21,7 @@ use codex_protocol::models::FunctionCallOutputContentItem; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; use eventsource_stream::Eventsource; use futures::Stream; use futures::StreamExt; @@ -347,13 +348,18 @@ pub(crate) async fn stream_chat_completions( let mut req_builder = provider.create_request_builder(client, &None).await?; - // Include session source for backend telemetry and routing. - let task_type = match serde_json::to_value(session_source) { - Ok(serde_json::Value::String(s)) => s, - Ok(other) => other.to_string(), - Err(_) => "unknown".to_string(), - }; - req_builder = req_builder.header("Codex-Task-Type", task_type); + // Include subagent header only for subagent sessions. + if let SessionSource::SubAgent(sub) = session_source.clone() { + let subagent = if let SubAgentSource::Other(label) = sub { + label + } else { + serde_json::to_value(&sub) + .ok() + .and_then(|v| v.as_str().map(std::string::ToString::to_string)) + .unwrap_or_else(|| "other".to_string()) + }; + req_builder = req_builder.header("x-openai-subagent", subagent); + } let res = otel_event_manager .log_request(attempt, || { diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 45c84d90..683091b1 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -303,13 +303,18 @@ impl ModelClient { .await .map_err(StreamAttemptError::Fatal)?; - // Include session source for backend telemetry and routing. - let task_type = match serde_json::to_value(&self.session_source) { - Ok(serde_json::Value::String(s)) => s, - Ok(other) => other.to_string(), - Err(_) => "unknown".to_string(), - }; - req_builder = req_builder.header("Codex-Task-Type", task_type); + // Include subagent header only for subagent sessions. + if let SessionSource::SubAgent(sub) = &self.session_source { + let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub { + label.clone() + } else { + serde_json::to_value(sub) + .ok() + .and_then(|v| v.as_str().map(std::string::ToString::to_string)) + .unwrap_or_else(|| "other".to_string()) + }; + req_builder = req_builder.header("x-openai-subagent", subagent); + } req_builder = req_builder // Send session_id for compatibility. diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 66df0aef..7b6f645f 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -18,7 +18,7 @@ use tempfile::TempDir; use wiremock::matchers::header; #[tokio::test] -async fn responses_stream_includes_task_type_header() { +async fn responses_stream_includes_subagent_header_on_review() { core_test_support::skip_if_no_network!(); let server = responses::start_mock_server().await; @@ -27,9 +27,12 @@ async fn responses_stream_includes_task_type_header() { responses::ev_completed("resp-1"), ]); - let request_recorder = - responses::mount_sse_once_match(&server, header("Codex-Task-Type", "exec"), response_body) - .await; + let request_recorder = responses::mount_sse_once_match( + &server, + header("x-openai-subagent", "review"), + response_body, + ) + .await; let provider = ModelProviderInfo { name: "mock".into(), @@ -76,7 +79,7 @@ async fn responses_stream_includes_task_type_header() { effort, summary, conversation_id, - SessionSource::Exec, + SessionSource::SubAgent(codex_protocol::protocol::SubAgentSource::Review), ); let mut prompt = Prompt::default(); @@ -96,5 +99,98 @@ async fn responses_stream_includes_task_type_header() { } let request = request_recorder.single_request(); - assert_eq!(request.header("Codex-Task-Type").as_deref(), Some("exec")); + assert_eq!( + request.header("x-openai-subagent").as_deref(), + Some("review") + ); +} + +#[tokio::test] +async fn responses_stream_includes_subagent_header_on_other() { + core_test_support::skip_if_no_network!(); + + let server = responses::start_mock_server().await; + let response_body = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_completed("resp-1"), + ]); + + let request_recorder = responses::mount_sse_once_match( + &server, + header("x-openai-subagent", "my-task"), + response_body, + ) + .await; + + let provider = ModelProviderInfo { + name: "mock".into(), + base_url: Some(format!("{}/v1", server.uri())), + env_key: None, + env_key_instructions: None, + experimental_bearer_token: None, + wire_api: WireApi::Responses, + query_params: None, + http_headers: None, + env_http_headers: None, + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: Some(5_000), + requires_openai_auth: false, + }; + + let codex_home = TempDir::new().expect("failed to create TempDir"); + let mut config = load_default_config_for_test(&codex_home); + config.model_provider_id = provider.name.clone(); + config.model_provider = provider.clone(); + let effort = config.model_reasoning_effort; + let summary = config.model_reasoning_summary; + let config = Arc::new(config); + + let conversation_id = ConversationId::new(); + + let otel_event_manager = OtelEventManager::new( + conversation_id, + config.model.as_str(), + config.model_family.slug.as_str(), + None, + Some("test@test.com".to_string()), + Some(AuthMode::ChatGPT), + false, + "test".to_string(), + ); + + let client = ModelClient::new( + Arc::clone(&config), + None, + otel_event_manager, + provider, + effort, + summary, + conversation_id, + SessionSource::SubAgent(codex_protocol::protocol::SubAgentSource::Other( + "my-task".to_string(), + )), + ); + + let mut prompt = Prompt::default(); + prompt.input = vec![ResponseItem::Message { + id: None, + role: "user".into(), + content: vec![ContentItem::InputText { + text: "hello".into(), + }], + }]; + + let mut stream = client.stream(&prompt).await.expect("stream failed"); + while let Some(event) = stream.next().await { + if matches!(event, Ok(ResponseEvent::Completed { .. })) { + break; + } + } + + let request = request_recorder.single_request(); + assert_eq!( + request.header("x-openai-subagent").as_deref(), + Some("my-task") + ); } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index d09ecde7..b16baab9 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1058,6 +1058,8 @@ pub enum SessionSource { } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +#[ts(rename_all = "snake_case")] pub enum SubAgentSource { Review, Compact,