use codex_core::ConversationManager; use codex_core::ModelProviderInfo; use codex_core::NewConversation; use codex_core::WireApi; use codex_core::built_in_model_providers; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_login::AuthMode; use codex_login::CodexAuth; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; use core_test_support::wait_for_event; use tempfile::TempDir; use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; use wiremock::matchers::header_regex; use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::query_param; /// Build minimal SSE stream with completed marker using the JSON fixture. fn sse_completed(id: &str) -> String { load_sse_fixture_with_id("tests/fixtures/completed_template.json", id) } #[expect(clippy::unwrap_used)] fn assert_message_role(request_body: &serde_json::Value, role: &str) { assert_eq!(request_body["role"].as_str().unwrap(), role); } #[expect(clippy::expect_used)] fn assert_message_starts_with(request_body: &serde_json::Value, text: &str) { let content = request_body["content"][0]["text"] .as_str() .expect("invalid message content"); assert!( content.starts_with(text), "expected message content '{content}' to start with '{text}'" ); } #[expect(clippy::expect_used)] fn assert_message_ends_with(request_body: &serde_json::Value, text: &str) { let content = request_body["content"][0]["text"] .as_str() .expect("invalid message content"); assert!( content.ends_with(text), "expected message content '{content}' to end with '{text}'" ); } /// Writes an `auth.json` into the provided `codex_home` with the specified parameters. /// Returns the fake JWT string written to `tokens.id_token`. #[expect(clippy::unwrap_used)] fn write_auth_json( codex_home: &TempDir, openai_api_key: Option<&str>, chatgpt_plan_type: &str, access_token: &str, account_id: Option<&str>, ) -> String { use base64::Engine as _; use serde_json::json; let header = json!({ "alg": "none", "typ": "JWT" }); let payload = json!({ "email": "user@example.com", "https://api.openai.com/auth": { "chatgpt_plan_type": chatgpt_plan_type, "chatgpt_account_id": account_id.unwrap_or("acc-123") } }); let b64 = |b: &[u8]| base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b); let header_b64 = b64(&serde_json::to_vec(&header).unwrap()); let payload_b64 = b64(&serde_json::to_vec(&payload).unwrap()); let signature_b64 = b64(b"sig"); let fake_jwt = format!("{header_b64}.{payload_b64}.{signature_b64}"); let mut tokens = json!({ "id_token": fake_jwt, "access_token": access_token, "refresh_token": "refresh-test", }); if let Some(acc) = account_id { tokens["account_id"] = json!(acc); } let auth_json = json!({ "OPENAI_API_KEY": openai_api_key, "tokens": tokens, // RFC3339 datetime; value doesn't matter for these tests "last_refresh": "2025-08-06T20:41:36.232376Z", }); std::fs::write( codex_home.path().join("auth.json"), serde_json::to_string_pretty(&auth_json).unwrap(), ) .unwrap(); fake_jwt } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn includes_session_id_and_model_headers_in_request() { if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { println!( "Skipping test because it cannot execute when network is disabled in a Codex sandbox." ); return; } // Mock server let server = MockServer::start().await; // First request – must NOT include `previous_response_id`. let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); Mock::given(method("POST")) .and(path("/v1/responses")) .respond_with(first) .expect(1) .mount(&server) .await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/v1", server.uri())), ..built_in_model_providers()["openai"].clone() }; // Init session let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; let conversation_manager = ConversationManager::default(); let NewConversation { conversation: codex, conversation_id, session_configured: _, } = conversation_manager .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) .await .expect("create new conversation"); codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // get request from the server let request = &server.received_requests().await.unwrap()[0]; let request_session_id = request.headers.get("session_id").unwrap(); let request_authorization = request.headers.get("authorization").unwrap(); let request_originator = request.headers.get("originator").unwrap(); assert_eq!( request_session_id.to_str().unwrap(), conversation_id.to_string() ); assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs"); assert_eq!( request_authorization.to_str().unwrap(), "Bearer Test API Key" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn includes_base_instructions_override_in_request() { // Mock server let server = MockServer::start().await; // First request – must NOT include `previous_response_id`. let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); Mock::given(method("POST")) .and(path("/v1/responses")) .respond_with(first) .expect(1) .mount(&server) .await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/v1", server.uri())), ..built_in_model_providers()["openai"].clone() }; let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.base_instructions = Some("test instructions".to_string()); config.model_provider = model_provider; let conversation_manager = ConversationManager::default(); let codex = conversation_manager .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) .await .expect("create new conversation") .conversation; codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; let request = &server.received_requests().await.unwrap()[0]; let request_body = request.body_json::().unwrap(); assert!( request_body["instructions"] .as_str() .unwrap() .contains("test instructions") ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn originator_config_override_is_used() { // Mock server let server = MockServer::start().await; let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); Mock::given(method("POST")) .and(path("/v1/responses")) .respond_with(first) .expect(1) .mount(&server) .await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/v1", server.uri())), ..built_in_model_providers()["openai"].clone() }; let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; config.internal_originator = Some("my_override".to_string()); let conversation_manager = ConversationManager::default(); let codex = conversation_manager .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) .await .expect("create new conversation") .conversation; codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; let request = &server.received_requests().await.unwrap()[0]; let request_originator = request.headers.get("originator").unwrap(); assert_eq!(request_originator.to_str().unwrap(), "my_override"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn chatgpt_auth_sends_correct_request() { if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { println!( "Skipping test because it cannot execute when network is disabled in a Codex sandbox." ); return; } // Mock server let server = MockServer::start().await; // First request – must NOT include `previous_response_id`. let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); Mock::given(method("POST")) .and(path("/api/codex/responses")) .respond_with(first) .expect(1) .mount(&server) .await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/api/codex", server.uri())), ..built_in_model_providers()["openai"].clone() }; // Init session let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; let conversation_manager = ConversationManager::default(); let NewConversation { conversation: codex, conversation_id, session_configured: _, } = conversation_manager .new_conversation_with_auth(config, Some(create_dummy_codex_auth())) .await .expect("create new conversation"); codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // get request from the server let request = &server.received_requests().await.unwrap()[0]; let request_session_id = request.headers.get("session_id").unwrap(); let request_authorization = request.headers.get("authorization").unwrap(); let request_originator = request.headers.get("originator").unwrap(); let request_chatgpt_account_id = request.headers.get("chatgpt-account-id").unwrap(); let request_body = request.body_json::().unwrap(); assert_eq!( request_session_id.to_str().unwrap(), conversation_id.to_string() ); assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs"); assert_eq!( request_authorization.to_str().unwrap(), "Bearer Access Token" ); assert_eq!(request_chatgpt_account_id.to_str().unwrap(), "account_id"); assert!(!request_body["store"].as_bool().unwrap()); assert!(request_body["stream"].as_bool().unwrap()); assert_eq!( request_body["include"][0].as_str().unwrap(), "reasoning.encrypted_content" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prefers_chatgpt_token_when_config_prefers_chatgpt() { if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { println!( "Skipping test because it cannot execute when network is disabled in a Codex sandbox." ); return; } // Mock server let server = MockServer::start().await; let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); // Expect ChatGPT base path and correct headers Mock::given(method("POST")) .and(path("/v1/responses")) .and(header_regex("Authorization", r"Bearer Access-123")) .and(header_regex("chatgpt-account-id", r"acc-123")) .respond_with(first) .expect(1) .mount(&server) .await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/v1", server.uri())), ..built_in_model_providers()["openai"].clone() }; // Init session let codex_home = TempDir::new().unwrap(); // Write auth.json that contains both API key and ChatGPT tokens for a plan that should prefer ChatGPT. let _jwt = write_auth_json( &codex_home, Some("sk-test-key"), "pro", "Access-123", Some("acc-123"), ); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; config.preferred_auth_method = AuthMode::ChatGPT; let conversation_manager = ConversationManager::default(); let NewConversation { conversation: codex, .. } = conversation_manager .new_conversation(config) .await .expect("create new conversation"); codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // verify request body flags let request = &server.received_requests().await.unwrap()[0]; let request_body = request.body_json::().unwrap(); assert!( !request_body["store"].as_bool().unwrap(), "store should be false for ChatGPT auth" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { println!( "Skipping test because it cannot execute when network is disabled in a Codex sandbox." ); return; } // Mock server let server = MockServer::start().await; let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); // Expect API key header, no ChatGPT account header required. Mock::given(method("POST")) .and(path("/v1/responses")) .and(header_regex("Authorization", r"Bearer sk-test-key")) .respond_with(first) .expect(1) .mount(&server) .await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/v1", server.uri())), ..built_in_model_providers()["openai"].clone() }; // Init session let codex_home = TempDir::new().unwrap(); // Write auth.json that contains both API key and ChatGPT tokens for a plan that should prefer ChatGPT, // but config will force API key preference. let _jwt = write_auth_json( &codex_home, Some("sk-test-key"), "pro", "Access-123", Some("acc-123"), ); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; config.preferred_auth_method = AuthMode::ApiKey; let conversation_manager = ConversationManager::default(); let NewConversation { conversation: codex, .. } = conversation_manager .new_conversation(config) .await .expect("create new conversation"); codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; // verify request body flags let request = &server.received_requests().await.unwrap()[0]; let request_body = request.body_json::().unwrap(); assert!( request_body["store"].as_bool().unwrap(), "store should be true for API key auth" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn includes_user_instructions_message_in_request() { let server = MockServer::start().await; let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); Mock::given(method("POST")) .and(path("/v1/responses")) .respond_with(first) .expect(1) .mount(&server) .await; let model_provider = ModelProviderInfo { base_url: Some(format!("{}/v1", server.uri())), ..built_in_model_providers()["openai"].clone() }; let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; config.user_instructions = Some("be nice".to_string()); let conversation_manager = ConversationManager::default(); let codex = conversation_manager .new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key"))) .await .expect("create new conversation") .conversation; codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; let request = &server.received_requests().await.unwrap()[0]; let request_body = request.body_json::().unwrap(); assert!( !request_body["instructions"] .as_str() .unwrap() .contains("be nice") ); assert_message_role(&request_body["input"][0], "user"); assert_message_starts_with(&request_body["input"][0], ""); assert_message_ends_with(&request_body["input"][0], ""); assert_message_role(&request_body["input"][1], "user"); assert_message_starts_with(&request_body["input"][1], ""); assert_message_ends_with(&request_body["input"][1], ""); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn azure_overrides_assign_properties_used_for_responses_url() { let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" }; // Mock server let server = MockServer::start().await; // First request – must NOT include `previous_response_id`. let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); // Expect POST to /openai/responses with api-version query param Mock::given(method("POST")) .and(path("/openai/responses")) .and(query_param("api-version", "2025-04-01-preview")) .and(header_regex("Custom-Header", "Value")) .and(header_regex( "Authorization", format!( "Bearer {}", std::env::var(existing_env_var_with_random_value).unwrap() ) .as_str(), )) .respond_with(first) .expect(1) .mount(&server) .await; let provider = ModelProviderInfo { name: "custom".to_string(), base_url: Some(format!("{}/openai", server.uri())), // Reuse the existing environment variable to avoid using unsafe code env_key: Some(existing_env_var_with_random_value.to_string()), query_params: Some(std::collections::HashMap::from([( "api-version".to_string(), "2025-04-01-preview".to_string(), )])), env_key_instructions: None, wire_api: WireApi::Responses, http_headers: Some(std::collections::HashMap::from([( "Custom-Header".to_string(), "Value".to_string(), )])), env_http_headers: None, request_max_retries: None, stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, }; // Init session let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = provider; let conversation_manager = ConversationManager::default(); let codex = conversation_manager .new_conversation_with_auth(config, None) .await .expect("create new conversation") .conversation; codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn env_var_overrides_loaded_auth() { let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" }; // Mock server let server = MockServer::start().await; // First request – must NOT include `previous_response_id`. let first = ResponseTemplate::new(200) .insert_header("content-type", "text/event-stream") .set_body_raw(sse_completed("resp1"), "text/event-stream"); // Expect POST to /openai/responses with api-version query param Mock::given(method("POST")) .and(path("/openai/responses")) .and(query_param("api-version", "2025-04-01-preview")) .and(header_regex("Custom-Header", "Value")) .and(header_regex( "Authorization", format!( "Bearer {}", std::env::var(existing_env_var_with_random_value).unwrap() ) .as_str(), )) .respond_with(first) .expect(1) .mount(&server) .await; let provider = ModelProviderInfo { name: "custom".to_string(), base_url: Some(format!("{}/openai", server.uri())), // Reuse the existing environment variable to avoid using unsafe code env_key: Some(existing_env_var_with_random_value.to_string()), query_params: Some(std::collections::HashMap::from([( "api-version".to_string(), "2025-04-01-preview".to_string(), )])), env_key_instructions: None, wire_api: WireApi::Responses, http_headers: Some(std::collections::HashMap::from([( "Custom-Header".to_string(), "Value".to_string(), )])), env_http_headers: None, request_max_retries: None, stream_max_retries: None, stream_idle_timeout_ms: None, requires_openai_auth: false, }; // Init session let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home); config.model_provider = provider; let conversation_manager = ConversationManager::default(); let codex = conversation_manager .new_conversation_with_auth(config, Some(create_dummy_codex_auth())) .await .expect("create new conversation") .conversation; codex .submit(Op::UserInput { items: vec![InputItem::Text { text: "hello".into(), }], }) .await .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; } fn create_dummy_codex_auth() -> CodexAuth { CodexAuth::create_dummy_chatgpt_auth_for_testing() }