use anyhow::Result; use app_test_support::McpProcess; use app_test_support::to_response; use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::GetAuthStatusParams; use codex_app_server_protocol::GetAuthStatusResponse; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::LoginApiKeyParams; use codex_app_server_protocol::LoginApiKeyResponse; use codex_app_server_protocol::RequestId; use pretty_assertions::assert_eq; use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); fn create_config_toml_custom_provider( codex_home: &Path, requires_openai_auth: bool, ) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); let requires_line = if requires_openai_auth { "requires_openai_auth = true\n" } else { "" }; let contents = format!( r#" model = "mock-model" approval_policy = "never" sandbox_mode = "danger-full-access" model_provider = "mock_provider" [model_providers.mock_provider] name = "Mock provider for test" base_url = "http://127.0.0.1:0/v1" wire_api = "chat" request_max_retries = 0 stream_max_retries = 0 {requires_line} "# ); std::fs::write(config_toml, contents) } fn create_config_toml(codex_home: &Path) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, r#" model = "mock-model" approval_policy = "never" sandbox_mode = "danger-full-access" "#, ) } fn create_config_toml_forced_login(codex_home: &Path, forced_method: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); let contents = format!( r#" model = "mock-model" approval_policy = "never" sandbox_mode = "danger-full-access" forced_login_method = "{forced_method}" "# ); std::fs::write(config_toml, contents) } async fn login_with_api_key_via_request(mcp: &mut McpProcess, api_key: &str) -> Result<()> { let request_id = mcp .send_login_api_key_request(LoginApiKeyParams { api_key: api_key.to_string(), }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; let _: LoginApiKeyResponse = to_response(resp)?; Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_auth_status_no_auth() -> Result<()> { let codex_home = TempDir::new()?; create_config_toml(codex_home.path())?; let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let request_id = mcp .send_get_auth_status_request(GetAuthStatusParams { include_token: Some(true), refresh_token: Some(false), }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; let status: GetAuthStatusResponse = to_response(resp)?; assert_eq!(status.auth_method, None, "expected no auth method"); assert_eq!(status.auth_token, None, "expected no token"); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_auth_status_with_api_key() -> Result<()> { let codex_home = TempDir::new()?; create_config_toml(codex_home.path())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; login_with_api_key_via_request(&mut mcp, "sk-test-key").await?; let request_id = mcp .send_get_auth_status_request(GetAuthStatusParams { include_token: Some(true), refresh_token: Some(false), }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; let status: GetAuthStatusResponse = to_response(resp)?; assert_eq!(status.auth_method, Some(AuthMode::ApiKey)); assert_eq!(status.auth_token, Some("sk-test-key".to_string())); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_auth_status_with_api_key_when_auth_not_required() -> Result<()> { let codex_home = TempDir::new()?; create_config_toml_custom_provider(codex_home.path(), false)?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; login_with_api_key_via_request(&mut mcp, "sk-test-key").await?; let request_id = mcp .send_get_auth_status_request(GetAuthStatusParams { include_token: Some(true), refresh_token: Some(false), }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; let status: GetAuthStatusResponse = to_response(resp)?; assert_eq!(status.auth_method, None, "expected no auth method"); assert_eq!(status.auth_token, None, "expected no token"); assert_eq!( status.requires_openai_auth, Some(false), "requires_openai_auth should be false", ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_auth_status_with_api_key_no_include_token() -> Result<()> { let codex_home = TempDir::new()?; create_config_toml(codex_home.path())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; login_with_api_key_via_request(&mut mcp, "sk-test-key").await?; // Build params via struct so None field is omitted in wire JSON. let params = GetAuthStatusParams { include_token: None, refresh_token: Some(false), }; let request_id = mcp.send_get_auth_status_request(params).await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(request_id)), ) .await??; let status: GetAuthStatusResponse = to_response(resp)?; assert_eq!(status.auth_method, Some(AuthMode::ApiKey)); assert!(status.auth_token.is_none(), "token must be omitted"); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn login_api_key_rejected_when_forced_chatgpt() -> Result<()> { let codex_home = TempDir::new()?; create_config_toml_forced_login(codex_home.path(), "chatgpt")?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let request_id = mcp .send_login_api_key_request(LoginApiKeyParams { api_key: "sk-test-key".to_string(), }) .await?; let err: JSONRPCError = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_error_message(RequestId::Integer(request_id)), ) .await??; assert_eq!( err.error.message, "API key login is disabled. Use ChatGPT login instead." ); Ok(()) }