chore: use anyhow::Result for all app-server integration tests (#5836)

There's a lot of visual noise in app-server's integration tests due to
the number of `.expect("<some_msg>")` lines which are largely redundant
/ not very useful. Clean them up by using `anyhow::Result` + `?`
consistently.

Replaces the existing pattern of:
```
    let codex_home = TempDir::new().expect("create temp dir");
    create_config_toml(codex_home.path()).expect("write config.toml");

    let mut mcp = McpProcess::new(codex_home.path())
        .await
        .expect("spawn mcp process");
    timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
        .await
        .expect("initialize timeout")
        .expect("initialize request");
```

With:
```
    let codex_home = TempDir::new()?;
    create_config_toml(codex_home.path())?;

    let mut mcp = McpProcess::new(codex_home.path()).await?;
    timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
```
This commit is contained in:
Owen Lin
2025-10-28 08:10:23 -07:00
committed by GitHub
parent be4bdfec93
commit 266419217e
13 changed files with 414 additions and 765 deletions

View File

@@ -1,5 +1,4 @@
use std::path::Path; use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::ArchiveConversationParams; use codex_app_server_protocol::ArchiveConversationParams;
@@ -9,45 +8,37 @@ use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse; use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RequestId; use codex_app_server_protocol::RequestId;
use codex_core::ARCHIVED_SESSIONS_SUBDIR; use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn archive_conversation_moves_rollout_into_archived_directory() { async fn archive_conversation_moves_rollout_into_archived_directory() -> Result<()> {
let codex_home = TempDir::new().expect("create temp dir"); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).expect("write config.toml"); create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("initialize timeout")
.expect("initialize request");
let new_request_id = mcp let new_request_id = mcp
.send_new_conversation_request(NewConversationParams { .send_new_conversation_request(NewConversationParams {
model: Some("mock-model".to_string()), model: Some("mock-model".to_string()),
..Default::default() ..Default::default()
}) })
.await .await?;
.expect("send newConversation");
let new_response: JSONRPCResponse = timeout( let new_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_request_id)), mcp.read_stream_until_response_message(RequestId::Integer(new_request_id)),
) )
.await .await??;
.expect("newConversation timeout")
.expect("newConversation response");
let NewConversationResponse { let NewConversationResponse {
conversation_id, conversation_id,
rollout_path, rollout_path,
.. ..
} = to_response::<NewConversationResponse>(new_response) } = to_response::<NewConversationResponse>(new_response)?;
.expect("deserialize newConversation response");
assert!( assert!(
rollout_path.exists(), rollout_path.exists(),
@@ -60,19 +51,15 @@ async fn archive_conversation_moves_rollout_into_archived_directory() {
conversation_id, conversation_id,
rollout_path: rollout_path.clone(), rollout_path: rollout_path.clone(),
}) })
.await .await?;
.expect("send archiveConversation");
let archive_response: JSONRPCResponse = timeout( let archive_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_request_id)), mcp.read_stream_until_response_message(RequestId::Integer(archive_request_id)),
) )
.await .await??;
.expect("archiveConversation timeout")
.expect("archiveConversation response");
let _: ArchiveConversationResponse = let _: ArchiveConversationResponse =
to_response::<ArchiveConversationResponse>(archive_response) to_response::<ArchiveConversationResponse>(archive_response)?;
.expect("deserialize archiveConversation response");
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR); let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
let archived_rollout_path = let archived_rollout_path =
@@ -90,6 +77,8 @@ async fn archive_conversation_moves_rollout_into_archived_directory() {
"expected archived rollout path {} to exist", "expected archived rollout path {} to exist",
archived_rollout_path.display() archived_rollout_path.display()
); );
Ok(())
} }
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> { fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {

View File

@@ -1,5 +1,4 @@
use std::path::Path; use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::AuthMode;
@@ -11,6 +10,7 @@ use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::LoginApiKeyResponse; use codex_app_server_protocol::LoginApiKeyResponse;
use codex_app_server_protocol::RequestId; use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
@@ -71,125 +71,99 @@ forced_login_method = "{forced_method}"
std::fs::write(config_toml, contents) std::fs::write(config_toml, contents)
} }
async fn login_with_api_key_via_request(mcp: &mut McpProcess, api_key: &str) { async fn login_with_api_key_via_request(mcp: &mut McpProcess, api_key: &str) -> Result<()> {
let request_id = mcp let request_id = mcp
.send_login_api_key_request(LoginApiKeyParams { .send_login_api_key_request(LoginApiKeyParams {
api_key: api_key.to_string(), api_key: api_key.to_string(),
}) })
.await .await?;
.unwrap_or_else(|e| panic!("send loginApiKey: {e}"));
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.unwrap_or_else(|e| panic!("loginApiKey timeout: {e}")) let _: LoginApiKeyResponse = to_response(resp)?;
.unwrap_or_else(|e| panic!("loginApiKey response: {e}")); Ok(())
let _: LoginApiKeyResponse =
to_response(resp).unwrap_or_else(|e| panic!("deserialize login response: {e}"));
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_no_auth() { async fn get_auth_status_no_auth() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).unwrap_or_else(|err| panic!("write config.toml: {err}")); create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]) let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let request_id = mcp let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams { .send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true), include_token: Some(true),
refresh_token: Some(false), refresh_token: Some(false),
}) })
.await .await?;
.expect("send getAuthStatus");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("getAuthStatus timeout") let status: GetAuthStatusResponse = to_response(resp)?;
.expect("getAuthStatus response");
let status: GetAuthStatusResponse = to_response(resp).expect("deserialize status");
assert_eq!(status.auth_method, None, "expected no auth method"); assert_eq!(status.auth_method, None, "expected no auth method");
assert_eq!(status.auth_token, None, "expected no token"); assert_eq!(status.auth_token, None, "expected no token");
Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_with_api_key() { async fn get_auth_status_with_api_key() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).unwrap_or_else(|err| panic!("write config.toml: {err}")); create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
login_with_api_key_via_request(&mut mcp, "sk-test-key").await; login_with_api_key_via_request(&mut mcp, "sk-test-key").await?;
let request_id = mcp let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams { .send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true), include_token: Some(true),
refresh_token: Some(false), refresh_token: Some(false),
}) })
.await .await?;
.expect("send getAuthStatus");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("getAuthStatus timeout") let status: GetAuthStatusResponse = to_response(resp)?;
.expect("getAuthStatus response");
let status: GetAuthStatusResponse = to_response(resp).expect("deserialize status");
assert_eq!(status.auth_method, Some(AuthMode::ApiKey)); assert_eq!(status.auth_method, Some(AuthMode::ApiKey));
assert_eq!(status.auth_token, Some("sk-test-key".to_string())); assert_eq!(status.auth_token, Some("sk-test-key".to_string()));
Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_with_api_key_when_auth_not_required() { async fn get_auth_status_with_api_key_when_auth_not_required() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml_custom_provider(codex_home.path(), false) create_config_toml_custom_provider(codex_home.path(), false)?;
.unwrap_or_else(|err| panic!("write config.toml: {err}"));
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
login_with_api_key_via_request(&mut mcp, "sk-test-key").await; login_with_api_key_via_request(&mut mcp, "sk-test-key").await?;
let request_id = mcp let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams { .send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true), include_token: Some(true),
refresh_token: Some(false), refresh_token: Some(false),
}) })
.await .await?;
.expect("send getAuthStatus");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("getAuthStatus timeout") let status: GetAuthStatusResponse = to_response(resp)?;
.expect("getAuthStatus response");
let status: GetAuthStatusResponse = to_response(resp).expect("deserialize status");
assert_eq!(status.auth_method, None, "expected no auth method"); assert_eq!(status.auth_method, None, "expected no auth method");
assert_eq!(status.auth_token, None, "expected no token"); assert_eq!(status.auth_token, None, "expected no token");
assert_eq!( assert_eq!(
@@ -197,76 +171,60 @@ async fn get_auth_status_with_api_key_when_auth_not_required() {
Some(false), Some(false),
"requires_openai_auth should be false", "requires_openai_auth should be false",
); );
Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_with_api_key_no_include_token() { async fn get_auth_status_with_api_key_no_include_token() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).unwrap_or_else(|err| panic!("write config.toml: {err}")); create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
login_with_api_key_via_request(&mut mcp, "sk-test-key").await; login_with_api_key_via_request(&mut mcp, "sk-test-key").await?;
// Build params via struct so None field is omitted in wire JSON. // Build params via struct so None field is omitted in wire JSON.
let params = GetAuthStatusParams { let params = GetAuthStatusParams {
include_token: None, include_token: None,
refresh_token: Some(false), refresh_token: Some(false),
}; };
let request_id = mcp let request_id = mcp.send_get_auth_status_request(params).await?;
.send_get_auth_status_request(params)
.await
.expect("send getAuthStatus");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("getAuthStatus timeout") let status: GetAuthStatusResponse = to_response(resp)?;
.expect("getAuthStatus response");
let status: GetAuthStatusResponse = to_response(resp).expect("deserialize status");
assert_eq!(status.auth_method, Some(AuthMode::ApiKey)); assert_eq!(status.auth_method, Some(AuthMode::ApiKey));
assert!(status.auth_token.is_none(), "token must be omitted"); assert!(status.auth_token.is_none(), "token must be omitted");
Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn login_api_key_rejected_when_forced_chatgpt() { async fn login_api_key_rejected_when_forced_chatgpt() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml_forced_login(codex_home.path(), "chatgpt") create_config_toml_forced_login(codex_home.path(), "chatgpt")?;
.unwrap_or_else(|err| panic!("write config.toml: {err}"));
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let request_id = mcp let request_id = mcp
.send_login_api_key_request(LoginApiKeyParams { .send_login_api_key_request(LoginApiKeyParams {
api_key: "sk-test-key".to_string(), api_key: "sk-test-key".to_string(),
}) })
.await .await?;
.expect("send loginApiKey");
let err: JSONRPCError = timeout( let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)), mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("loginApiKey error timeout")
.expect("loginApiKey error");
assert_eq!( assert_eq!(
err.error.message, err.error.message,
"API key login is disabled. Use ChatGPT login instead." "API key login is disabled. Use ChatGPT login instead."
); );
Ok(())
} }

View File

@@ -1,5 +1,4 @@
use std::path::Path; use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server; use app_test_support::create_mock_chat_completions_server;
@@ -32,26 +31,27 @@ use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::EventMsg;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::env; use std::env;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_codex_jsonrpc_conversation_flow() { async fn test_codex_jsonrpc_conversation_flow() -> Result<()> {
if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!( println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox." "Skipping test because it cannot execute when network is disabled in a Codex sandbox."
); );
return; return Ok(());
} }
let tmp = TempDir::new().expect("tmp dir"); let tmp = TempDir::new()?;
// Temporary Codex home with config pointing at the mock server. // Temporary Codex home with config pointing at the mock server.
let codex_home = tmp.path().join("codex_home"); let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home).expect("create codex home dir"); std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir"); let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory).expect("create working directory"); std::fs::create_dir(&working_directory)?;
// Create a mock model server that immediately ends each turn. // Create a mock model server that immediately ends each turn.
// Two turns are expected: initial session configure + one user message. // Two turns are expected: initial session configure + one user message.
@@ -61,20 +61,15 @@ async fn test_codex_jsonrpc_conversation_flow() {
Some(&working_directory), Some(&working_directory),
Some(5000), Some(5000),
"call1234", "call1234",
) )?,
.expect("create shell sse response"), create_final_assistant_message_sse_response("Enjoy your new git repo!")?,
create_final_assistant_message_sse_response("Enjoy your new git repo!")
.expect("create final assistant message"),
]; ];
let server = create_mock_chat_completions_server(responses).await; let server = create_mock_chat_completions_server(responses).await;
create_config_toml(&codex_home, &server.uri()).expect("write config"); create_config_toml(&codex_home, &server.uri())?;
// Start MCP server and initialize. // Start MCP server and initialize.
let mut mcp = McpProcess::new(&codex_home).await.expect("spawn mcp"); let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.await
.expect("init timeout")
.expect("init error");
// 1) newConversation // 1) newConversation
let new_conv_id = mcp let new_conv_id = mcp
@@ -82,17 +77,13 @@ async fn test_codex_jsonrpc_conversation_flow() {
cwd: Some(working_directory.to_string_lossy().into_owned()), cwd: Some(working_directory.to_string_lossy().into_owned()),
..Default::default() ..Default::default()
}) })
.await .await?;
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout( let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
) )
.await .await??;
.expect("newConversation timeout") let new_conv_resp = to_response::<NewConversationResponse>(new_conv_resp)?;
.expect("newConversation resp");
let new_conv_resp = to_response::<NewConversationResponse>(new_conv_resp)
.expect("deserialize newConversation response");
let NewConversationResponse { let NewConversationResponse {
conversation_id, conversation_id,
model, model,
@@ -107,18 +98,14 @@ async fn test_codex_jsonrpc_conversation_flow() {
conversation_id, conversation_id,
experimental_raw_events: false, experimental_raw_events: false,
}) })
.await .await?;
.expect("send addConversationListener");
let add_listener_resp: JSONRPCResponse = timeout( let add_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
) )
.await .await??;
.expect("addConversationListener timeout")
.expect("addConversationListener resp");
let AddConversationSubscriptionResponse { subscription_id } = let AddConversationSubscriptionResponse { subscription_id } =
to_response::<AddConversationSubscriptionResponse>(add_listener_resp) to_response::<AddConversationSubscriptionResponse>(add_listener_resp)?;
.expect("deserialize addConversationListener response");
// 3) sendUserMessage (should trigger notifications; we only validate an OK response) // 3) sendUserMessage (should trigger notifications; we only validate an OK response)
let send_user_id = mcp let send_user_id = mcp
@@ -128,17 +115,13 @@ async fn test_codex_jsonrpc_conversation_flow() {
text: "text".to_string(), text: "text".to_string(),
}], }],
}) })
.await .await?;
.expect("send sendUserMessage");
let send_user_resp: JSONRPCResponse = timeout( let send_user_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)), mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)),
) )
.await .await??;
.expect("sendUserMessage timeout") let SendUserMessageResponse {} = to_response::<SendUserMessageResponse>(send_user_resp)?;
.expect("sendUserMessage resp");
let SendUserMessageResponse {} = to_response::<SendUserMessageResponse>(send_user_resp)
.expect("deserialize sendUserMessage response");
// Verify the task_finished notification is received. // Verify the task_finished notification is received.
// Note this also ensures that the final request to the server was made. // Note this also ensures that the final request to the server was made.
@@ -146,9 +129,7 @@ async fn test_codex_jsonrpc_conversation_flow() {
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"), mcp.read_stream_until_notification_message("codex/event/task_complete"),
) )
.await .await??;
.expect("task_finished_notification timeout")
.expect("task_finished_notification resp");
let serde_json::Value::Object(map) = task_finished_notification let serde_json::Value::Object(map) = task_finished_notification
.params .params
.expect("notification should have params") .expect("notification should have params")
@@ -166,33 +147,31 @@ async fn test_codex_jsonrpc_conversation_flow() {
.send_remove_conversation_listener_request(RemoveConversationListenerParams { .send_remove_conversation_listener_request(RemoveConversationListenerParams {
subscription_id, subscription_id,
}) })
.await .await?;
.expect("send removeConversationListener");
let remove_listener_resp: JSONRPCResponse = timeout( let remove_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(remove_listener_id)), mcp.read_stream_until_response_message(RequestId::Integer(remove_listener_id)),
) )
.await .await??;
.expect("removeConversationListener timeout") let RemoveConversationSubscriptionResponse {} = to_response(remove_listener_resp)?;
.expect("removeConversationListener resp");
let RemoveConversationSubscriptionResponse {} = Ok(())
to_response(remove_listener_resp).expect("deserialize removeConversationListener response");
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_send_user_turn_changes_approval_policy_behavior() { async fn test_send_user_turn_changes_approval_policy_behavior() -> Result<()> {
if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!( println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox." "Skipping test because it cannot execute when network is disabled in a Codex sandbox."
); );
return; return Ok(());
} }
let tmp = TempDir::new().expect("tmp dir"); let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home"); let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home).expect("create codex home dir"); std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir"); let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory).expect("create working directory"); std::fs::create_dir(&working_directory)?;
// Mock server will request a python shell call for the first and second turn, then finish. // Mock server will request a python shell call for the first and second turn, then finish.
let responses = vec![ let responses = vec![
@@ -205,10 +184,8 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
Some(&working_directory), Some(&working_directory),
Some(5000), Some(5000),
"call1", "call1",
) )?,
.expect("create first shell sse response"), create_final_assistant_message_sse_response("done 1")?,
create_final_assistant_message_sse_response("done 1")
.expect("create final assistant message 1"),
create_shell_sse_response( create_shell_sse_response(
vec![ vec![
"python3".to_string(), "python3".to_string(),
@@ -218,20 +195,15 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
Some(&working_directory), Some(&working_directory),
Some(5000), Some(5000),
"call2", "call2",
) )?,
.expect("create second shell sse response"), create_final_assistant_message_sse_response("done 2")?,
create_final_assistant_message_sse_response("done 2")
.expect("create final assistant message 2"),
]; ];
let server = create_mock_chat_completions_server(responses).await; let server = create_mock_chat_completions_server(responses).await;
create_config_toml(&codex_home, &server.uri()).expect("write config"); create_config_toml(&codex_home, &server.uri())?;
// Start MCP server and initialize. // Start MCP server and initialize.
let mut mcp = McpProcess::new(&codex_home).await.expect("spawn mcp"); let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.await
.expect("init timeout")
.expect("init error");
// 1) Start conversation with approval_policy=untrusted // 1) Start conversation with approval_policy=untrusted
let new_conv_id = mcp let new_conv_id = mcp
@@ -239,19 +211,15 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
cwd: Some(working_directory.to_string_lossy().into_owned()), cwd: Some(working_directory.to_string_lossy().into_owned()),
..Default::default() ..Default::default()
}) })
.await .await?;
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout( let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
) )
.await .await??;
.expect("newConversation timeout")
.expect("newConversation resp");
let NewConversationResponse { let NewConversationResponse {
conversation_id, .. conversation_id, ..
} = to_response::<NewConversationResponse>(new_conv_resp) } = to_response::<NewConversationResponse>(new_conv_resp)?;
.expect("deserialize newConversation response");
// 2) addConversationListener // 2) addConversationListener
let add_listener_id = mcp let add_listener_id = mcp
@@ -259,19 +227,14 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
conversation_id, conversation_id,
experimental_raw_events: false, experimental_raw_events: false,
}) })
.await .await?;
.expect("send addConversationListener"); let _: AddConversationSubscriptionResponse = to_response::<AddConversationSubscriptionResponse>(
let _: AddConversationSubscriptionResponse = timeout(
to_response::<AddConversationSubscriptionResponse>( DEFAULT_READ_TIMEOUT,
timeout( mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
)
.await
.expect("addConversationListener timeout")
.expect("addConversationListener resp"),
) )
.expect("deserialize addConversationListener response"); .await??,
)?;
// 3) sendUserMessage triggers a shell call; approval policy is Untrusted so we should get an elicitation // 3) sendUserMessage triggers a shell call; approval policy is Untrusted so we should get an elicitation
let send_user_id = mcp let send_user_id = mcp
@@ -281,27 +244,21 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
text: "run python".to_string(), text: "run python".to_string(),
}], }],
}) })
.await .await?;
.expect("send sendUserMessage");
let _send_user_resp: SendUserMessageResponse = to_response::<SendUserMessageResponse>( let _send_user_resp: SendUserMessageResponse = to_response::<SendUserMessageResponse>(
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)), mcp.read_stream_until_response_message(RequestId::Integer(send_user_id)),
) )
.await .await??,
.expect("sendUserMessage timeout") )?;
.expect("sendUserMessage resp"),
)
.expect("deserialize sendUserMessage response");
// Expect an ExecCommandApproval request (elicitation) // Expect an ExecCommandApproval request (elicitation)
let request = timeout( let request = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(), mcp.read_stream_until_request_message(),
) )
.await .await??;
.expect("waiting for exec approval request timeout")
.expect("exec approval request");
let ServerRequest::ExecCommandApproval { request_id, params } = request else { let ServerRequest::ExecCommandApproval { request_id, params } = request else {
panic!("expected ExecCommandApproval request, got: {request:?}"); panic!("expected ExecCommandApproval request, got: {request:?}");
}; };
@@ -330,17 +287,14 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
request_id, request_id,
serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }), serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }),
) )
.await .await?;
.expect("send approval response");
// Wait for first TaskComplete // Wait for first TaskComplete
let _ = timeout( let _ = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"), mcp.read_stream_until_notification_message("codex/event/task_complete"),
) )
.await .await??;
.expect("task_complete 1 timeout")
.expect("task_complete 1 notification");
// 4) sendUserTurn with approval_policy=never should run without elicitation // 4) sendUserTurn with approval_policy=never should run without elicitation
let send_turn_id = mcp let send_turn_id = mcp
@@ -356,19 +310,15 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
effort: Some(ReasoningEffort::Medium), effort: Some(ReasoningEffort::Medium),
summary: ReasoningSummary::Auto, summary: ReasoningSummary::Auto,
}) })
.await .await?;
.expect("send sendUserTurn");
// Acknowledge sendUserTurn // Acknowledge sendUserTurn
let _send_turn_resp: SendUserTurnResponse = to_response::<SendUserTurnResponse>( let _send_turn_resp: SendUserTurnResponse = to_response::<SendUserTurnResponse>(
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id)), mcp.read_stream_until_response_message(RequestId::Integer(send_turn_id)),
) )
.await .await??,
.expect("sendUserTurn timeout") )?;
.expect("sendUserTurn resp"),
)
.expect("deserialize sendUserTurn response");
// Ensure we do NOT receive an ExecCommandApproval request before the task completes. // Ensure we do NOT receive an ExecCommandApproval request before the task completes.
// If any Request is seen while waiting for task_complete, the helper will error and the test fails. // If any Request is seen while waiting for task_complete, the helper will error and the test fails.
@@ -376,31 +326,31 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"), mcp.read_stream_until_notification_message("codex/event/task_complete"),
) )
.await .await??;
.expect("task_complete 2 timeout")
.expect("task_complete 2 notification"); Ok(())
} }
// Helper: minimal config.toml pointing at mock provider. // Helper: minimal config.toml pointing at mock provider.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() { async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() -> Result<()> {
if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!( println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox." "Skipping test because it cannot execute when network is disabled in a Codex sandbox."
); );
return; return Ok(());
} }
let tmp = TempDir::new().expect("tmp dir"); let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home"); let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home).expect("create codex home dir"); std::fs::create_dir(&codex_home)?;
let workspace_root = tmp.path().join("workspace"); let workspace_root = tmp.path().join("workspace");
std::fs::create_dir(&workspace_root).expect("create workspace root"); std::fs::create_dir(&workspace_root)?;
let first_cwd = workspace_root.join("turn1"); let first_cwd = workspace_root.join("turn1");
let second_cwd = workspace_root.join("turn2"); let second_cwd = workspace_root.join("turn2");
std::fs::create_dir(&first_cwd).expect("create first cwd"); std::fs::create_dir(&first_cwd)?;
std::fs::create_dir(&second_cwd).expect("create second cwd"); std::fs::create_dir(&second_cwd)?;
let responses = vec![ let responses = vec![
create_shell_sse_response( create_shell_sse_response(
@@ -412,10 +362,8 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
None, None,
Some(5000), Some(5000),
"call-first", "call-first",
) )?,
.expect("create first shell response"), create_final_assistant_message_sse_response("done first")?,
create_final_assistant_message_sse_response("done first")
.expect("create first final assistant message"),
create_shell_sse_response( create_shell_sse_response(
vec![ vec![
"bash".to_string(), "bash".to_string(),
@@ -425,21 +373,14 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
None, None,
Some(5000), Some(5000),
"call-second", "call-second",
) )?,
.expect("create second shell response"), create_final_assistant_message_sse_response("done second")?,
create_final_assistant_message_sse_response("done second")
.expect("create second final assistant message"),
]; ];
let server = create_mock_chat_completions_server(responses).await; let server = create_mock_chat_completions_server(responses).await;
create_config_toml(&codex_home, &server.uri()).expect("write config"); create_config_toml(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(&codex_home) let mut mcp = McpProcess::new(&codex_home).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let new_conv_id = mcp let new_conv_id = mcp
.send_new_conversation_request(NewConversationParams { .send_new_conversation_request(NewConversationParams {
@@ -448,36 +389,29 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
sandbox: Some(SandboxMode::WorkspaceWrite), sandbox: Some(SandboxMode::WorkspaceWrite),
..Default::default() ..Default::default()
}) })
.await .await?;
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout( let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
) )
.await .await??;
.expect("newConversation timeout")
.expect("newConversation resp");
let NewConversationResponse { let NewConversationResponse {
conversation_id, conversation_id,
model, model,
.. ..
} = to_response::<NewConversationResponse>(new_conv_resp) } = to_response::<NewConversationResponse>(new_conv_resp)?;
.expect("deserialize newConversation response");
let add_listener_id = mcp let add_listener_id = mcp
.send_add_conversation_listener_request(AddConversationListenerParams { .send_add_conversation_listener_request(AddConversationListenerParams {
conversation_id, conversation_id,
experimental_raw_events: false, experimental_raw_events: false,
}) })
.await .await?;
.expect("send addConversationListener");
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
) )
.await .await??;
.expect("addConversationListener timeout")
.expect("addConversationListener resp");
let first_turn_id = mcp let first_turn_id = mcp
.send_send_user_turn_request(SendUserTurnParams { .send_send_user_turn_request(SendUserTurnParams {
@@ -497,22 +431,17 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
effort: Some(ReasoningEffort::Medium), effort: Some(ReasoningEffort::Medium),
summary: ReasoningSummary::Auto, summary: ReasoningSummary::Auto,
}) })
.await .await?;
.expect("send first sendUserTurn");
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(first_turn_id)), mcp.read_stream_until_response_message(RequestId::Integer(first_turn_id)),
) )
.await .await??;
.expect("sendUserTurn 1 timeout")
.expect("sendUserTurn 1 resp");
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"), mcp.read_stream_until_notification_message("codex/event/task_complete"),
) )
.await .await??;
.expect("task_complete 1 timeout")
.expect("task_complete 1 notification");
let second_turn_id = mcp let second_turn_id = mcp
.send_send_user_turn_request(SendUserTurnParams { .send_send_user_turn_request(SendUserTurnParams {
@@ -527,23 +456,18 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
effort: Some(ReasoningEffort::Medium), effort: Some(ReasoningEffort::Medium),
summary: ReasoningSummary::Auto, summary: ReasoningSummary::Auto,
}) })
.await .await?;
.expect("send second sendUserTurn");
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_turn_id)), mcp.read_stream_until_response_message(RequestId::Integer(second_turn_id)),
) )
.await .await??;
.expect("sendUserTurn 2 timeout")
.expect("sendUserTurn 2 resp");
let exec_begin_notification = timeout( let exec_begin_notification = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/exec_command_begin"), mcp.read_stream_until_notification_message("codex/event/exec_command_begin"),
) )
.await .await??;
.expect("exec_command_begin timeout")
.expect("exec_command_begin notification");
let params = exec_begin_notification let params = exec_begin_notification
.params .params
.clone() .clone()
@@ -571,9 +495,9 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"), mcp.read_stream_until_notification_message("codex/event/task_complete"),
) )
.await .await??;
.expect("task_complete 2 timeout")
.expect("task_complete 2 notification"); Ok(())
} }
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {

View File

@@ -1,6 +1,4 @@
use std::collections::HashMap; use anyhow::Result;
use std::path::Path;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::GetUserSavedConfigResponse; use codex_app_server_protocol::GetUserSavedConfigResponse;
@@ -17,6 +15,8 @@ use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::SandboxMode; use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::Verbosity; use codex_protocol::config_types::Verbosity;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
@@ -60,31 +60,21 @@ chatgpt_base_url = "https://api.chatgpt.com"
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn get_config_toml_parses_all_fields() { async fn get_config_toml_parses_all_fields() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).expect("write config.toml"); create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let request_id = mcp let request_id = mcp.send_get_user_saved_config_request().await?;
.send_get_user_saved_config_request()
.await
.expect("send getUserSavedConfig");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("getUserSavedConfig timeout")
.expect("getUserSavedConfig response");
let config: GetUserSavedConfigResponse = to_response(resp).expect("deserialize config"); let config: GetUserSavedConfigResponse = to_response(resp)?;
let expected = GetUserSavedConfigResponse { let expected = GetUserSavedConfigResponse {
config: UserSavedConfig { config: UserSavedConfig {
approval_policy: Some(AskForApproval::OnRequest), approval_policy: Some(AskForApproval::OnRequest),
@@ -122,33 +112,24 @@ async fn get_config_toml_parses_all_fields() {
}; };
assert_eq!(config, expected); assert_eq!(config, expected);
Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_config_toml_empty() { async fn get_config_toml_empty() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let request_id = mcp let request_id = mcp.send_get_user_saved_config_request().await?;
.send_get_user_saved_config_request()
.await
.expect("send getUserSavedConfig");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("getUserSavedConfig timeout")
.expect("getUserSavedConfig response");
let config: GetUserSavedConfigResponse = to_response(resp).expect("deserialize config"); let config: GetUserSavedConfigResponse = to_response(resp)?;
let expected = GetUserSavedConfigResponse { let expected = GetUserSavedConfigResponse {
config: UserSavedConfig { config: UserSavedConfig {
approval_policy: None, approval_policy: None,
@@ -167,4 +148,5 @@ async fn get_config_toml_empty() {
}; };
assert_eq!(config, expected); assert_eq!(config, expected);
Ok(())
} }

View File

@@ -1,5 +1,4 @@
use std::path::Path; use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server; use app_test_support::create_mock_chat_completions_server;
@@ -15,31 +14,25 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::SendUserMessageResponse;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use serde_json::json; use serde_json::json;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_conversation_create_and_send_message_ok() { async fn test_conversation_create_and_send_message_ok() -> Result<()> {
// Mock server we won't strictly rely on it, but provide one to satisfy any model wiring. // Mock server we won't strictly rely on it, but provide one to satisfy any model wiring.
let responses = vec![ let responses = vec![create_final_assistant_message_sse_response("Done")?];
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
];
let server = create_mock_chat_completions_server(responses).await; let server = create_mock_chat_completions_server(responses).await;
// Temporary Codex home with config pointing at the mock server. // Temporary Codex home with config pointing at the mock server.
let codex_home = TempDir::new().expect("create temp dir"); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml"); create_config_toml(codex_home.path(), &server.uri())?;
// Start MCP server process and initialize. // Start MCP server process and initialize.
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
// Create a conversation via the new JSON-RPC API. // Create a conversation via the new JSON-RPC API.
let new_conv_id = mcp let new_conv_id = mcp
@@ -47,22 +40,18 @@ async fn test_conversation_create_and_send_message_ok() {
model: Some("o3".to_string()), model: Some("o3".to_string()),
..Default::default() ..Default::default()
}) })
.await .await?;
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout( let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
) )
.await .await??;
.expect("newConversation timeout")
.expect("newConversation resp");
let NewConversationResponse { let NewConversationResponse {
conversation_id, conversation_id,
model, model,
reasoning_effort: _, reasoning_effort: _,
rollout_path: _, rollout_path: _,
} = to_response::<NewConversationResponse>(new_conv_resp) } = to_response::<NewConversationResponse>(new_conv_resp)?;
.expect("deserialize newConversation response");
assert_eq!(model, "o3"); assert_eq!(model, "o3");
// Add a listener so we receive notifications for this conversation (not strictly required for this test). // Add a listener so we receive notifications for this conversation (not strictly required for this test).
@@ -71,19 +60,15 @@ async fn test_conversation_create_and_send_message_ok() {
conversation_id, conversation_id,
experimental_raw_events: false, experimental_raw_events: false,
}) })
.await .await?;
.expect("send addConversationListener");
let _sub: AddConversationSubscriptionResponse = let _sub: AddConversationSubscriptionResponse =
to_response::<AddConversationSubscriptionResponse>( to_response::<AddConversationSubscriptionResponse>(
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
) )
.await .await??,
.expect("addConversationListener timeout") )?;
.expect("addConversationListener resp"),
)
.expect("deserialize addConversationListener response");
// Now send a user message via the wire API and expect an OK (empty object) result. // Now send a user message via the wire API and expect an OK (empty object) result.
let send_id = mcp let send_id = mcp
@@ -93,36 +78,32 @@ async fn test_conversation_create_and_send_message_ok() {
text: "Hello".to_string(), text: "Hello".to_string(),
}], }],
}) })
.await .await?;
.expect("send sendUserMessage");
let send_resp: JSONRPCResponse = timeout( let send_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_id)), mcp.read_stream_until_response_message(RequestId::Integer(send_id)),
) )
.await .await??;
.expect("sendUserMessage timeout") let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(send_resp)?;
.expect("sendUserMessage resp");
let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(send_resp)
.expect("deserialize sendUserMessage response");
// avoid race condition by waiting for the mock server to receive the chat.completions request // avoid race condition by waiting for the mock server to receive the chat.completions request
let deadline = std::time::Instant::now() + DEFAULT_READ_TIMEOUT; let deadline = std::time::Instant::now() + DEFAULT_READ_TIMEOUT;
loop { let requests = loop {
let requests = server.received_requests().await.unwrap_or_default(); let requests = server.received_requests().await.unwrap_or_default();
if !requests.is_empty() { if !requests.is_empty() {
break; break requests;
} }
if std::time::Instant::now() >= deadline { if std::time::Instant::now() >= deadline {
panic!("mock server did not receive the chat.completions request in time"); panic!("mock server did not receive the chat.completions request in time");
} }
tokio::time::sleep(std::time::Duration::from_millis(10)).await; tokio::time::sleep(std::time::Duration::from_millis(10)).await;
} };
// Verify the outbound request body matches expectations for Chat Completions. // Verify the outbound request body matches expectations for Chat Completions.
let request = &server.received_requests().await.unwrap()[0]; let request = requests
let body = request .first()
.body_json::<serde_json::Value>() .expect("mock server should have received at least one request");
.expect("parse request body as JSON"); let body = request.body_json::<serde_json::Value>()?;
assert_eq!(body["model"], json!("o3")); assert_eq!(body["model"], json!("o3"));
assert!(body["stream"].as_bool().unwrap_or(false)); assert!(body["stream"].as_bool().unwrap_or(false));
let messages = body["messages"] let messages = body["messages"]
@@ -133,6 +114,7 @@ async fn test_conversation_create_and_send_message_ok() {
assert_eq!(last["content"], json!("Hello")); assert_eq!(last["content"], json!("Hello"));
drop(server); drop(server);
Ok(())
} }
// Helper to create a config.toml pointing at the mock model server. // Helper to create a config.toml pointing at the mock model server.

View File

@@ -1,5 +1,5 @@
use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use anyhow::anyhow;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId; use codex_app_server_protocol::RequestId;
@@ -13,48 +13,39 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> { async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
// Prepare a temporary Codex home and a separate root with test files. // Prepare a temporary Codex home and a separate root with test files.
let codex_home = TempDir::new().context("create temp codex home")?; let codex_home = TempDir::new()?;
let root = TempDir::new().context("create temp search root")?; let root = TempDir::new()?;
// Create files designed to have deterministic ordering for query "abe". // Create files designed to have deterministic ordering for query "abe".
std::fs::write(root.path().join("abc"), "x").context("write file abc")?; std::fs::write(root.path().join("abc"), "x")?;
std::fs::write(root.path().join("abcde"), "x").context("write file abcde")?; std::fs::write(root.path().join("abcde"), "x")?;
std::fs::write(root.path().join("abexy"), "x").context("write file abexy")?; std::fs::write(root.path().join("abexy"), "x")?;
std::fs::write(root.path().join("zzz.txt"), "x").context("write file zzz")?; std::fs::write(root.path().join("zzz.txt"), "x")?;
let sub_dir = root.path().join("sub"); let sub_dir = root.path().join("sub");
std::fs::create_dir_all(&sub_dir).context("create sub dir")?; std::fs::create_dir_all(&sub_dir)?;
let sub_abce_path = sub_dir.join("abce"); let sub_abce_path = sub_dir.join("abce");
std::fs::write(&sub_abce_path, "x").context("write file sub/abce")?; std::fs::write(&sub_abce_path, "x")?;
let sub_abce_rel = sub_abce_path let sub_abce_rel = sub_abce_path
.strip_prefix(root.path()) .strip_prefix(root.path())?
.context("strip root prefix from sub/abce")?
.to_string_lossy() .to_string_lossy()
.to_string(); .to_string();
// Start MCP server and initialize. // Start MCP server and initialize.
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.context("spawn mcp")?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.context("init timeout")?
.context("init failed")?;
let root_path = root.path().to_string_lossy().to_string(); let root_path = root.path().to_string_lossy().to_string();
// Send fuzzyFileSearch request. // Send fuzzyFileSearch request.
let request_id = mcp let request_id = mcp
.send_fuzzy_file_search_request("abe", vec![root_path.clone()], None) .send_fuzzy_file_search_request("abe", vec![root_path.clone()], None)
.await .await?;
.context("send fuzzyFileSearch")?;
// Read response and verify shape and ordering. // Read response and verify shape and ordering.
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.context("fuzzyFileSearch timeout")?
.context("fuzzyFileSearch resp")?;
let value = resp.result; let value = resp.result;
// The path separator on Windows affects the score. // The path separator on Windows affects the score.
@@ -94,24 +85,18 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> { async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
let codex_home = TempDir::new().context("create temp codex home")?; let codex_home = TempDir::new()?;
let root = TempDir::new().context("create temp search root")?; let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents").context("write alpha")?; std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.context("spawn mcp")?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.context("init timeout")?
.context("init failed")?;
let root_path = root.path().to_string_lossy().to_string(); let root_path = root.path().to_string_lossy().to_string();
let request_id = mcp let request_id = mcp
.send_fuzzy_file_search_request("alp", vec![root_path.clone()], None) .send_fuzzy_file_search_request("alp", vec![root_path.clone()], None)
.await .await?;
.context("send fuzzyFileSearch")?;
let request_id_2 = mcp let request_id_2 = mcp
.send_fuzzy_file_search_request( .send_fuzzy_file_search_request(
@@ -119,23 +104,20 @@ async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
vec![root_path.clone()], vec![root_path.clone()],
Some(request_id.to_string()), Some(request_id.to_string()),
) )
.await .await?;
.context("send fuzzyFileSearch")?;
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id_2)), mcp.read_stream_until_response_message(RequestId::Integer(request_id_2)),
) )
.await .await??;
.context("fuzzyFileSearch timeout")?
.context("fuzzyFileSearch resp")?;
let files = resp let files = resp
.result .result
.get("files") .get("files")
.context("files key missing")? .ok_or_else(|| anyhow!("files key missing"))?
.as_array() .as_array()
.context("files not array")? .ok_or_else(|| anyhow!("files not array"))?
.clone(); .clone();
assert_eq!(files.len(), 1); assert_eq!(files.len(), 1);

View File

@@ -1,6 +1,4 @@
use std::fs; use anyhow::Result;
use std::path::Path;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCNotification;
@@ -15,6 +13,8 @@ use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionConfiguredNotification; use codex_app_server_protocol::SessionConfiguredNotification;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use serde_json::json; use serde_json::json;
use std::fs;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
use uuid::Uuid; use uuid::Uuid;
@@ -22,38 +22,33 @@ use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_list_and_resume_conversations() { async fn test_list_and_resume_conversations() -> Result<()> {
// Prepare a temporary CODEX_HOME with a few fake rollout files. // Prepare a temporary CODEX_HOME with a few fake rollout files.
let codex_home = TempDir::new().expect("create temp dir"); let codex_home = TempDir::new()?;
create_fake_rollout( create_fake_rollout(
codex_home.path(), codex_home.path(),
"2025-01-02T12-00-00", "2025-01-02T12-00-00",
"2025-01-02T12:00:00Z", "2025-01-02T12:00:00Z",
"Hello A", "Hello A",
Some("openai"), Some("openai"),
); )?;
create_fake_rollout( create_fake_rollout(
codex_home.path(), codex_home.path(),
"2025-01-01T13-00-00", "2025-01-01T13-00-00",
"2025-01-01T13:00:00Z", "2025-01-01T13:00:00Z",
"Hello B", "Hello B",
Some("openai"), Some("openai"),
); )?;
create_fake_rollout( create_fake_rollout(
codex_home.path(), codex_home.path(),
"2025-01-01T12-00-00", "2025-01-01T12-00-00",
"2025-01-01T12:00:00Z", "2025-01-01T12:00:00Z",
"Hello C", "Hello C",
None, None,
); )?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
// Request first page with size 2 // Request first page with size 2
let req_id = mcp let req_id = mcp
@@ -62,17 +57,14 @@ async fn test_list_and_resume_conversations() {
cursor: None, cursor: None,
model_providers: None, model_providers: None,
}) })
.await .await?;
.expect("send listConversations");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)), mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
) )
.await .await??;
.expect("listConversations timeout")
.expect("listConversations resp");
let ListConversationsResponse { items, next_cursor } = let ListConversationsResponse { items, next_cursor } =
to_response::<ListConversationsResponse>(resp).expect("deserialize response"); to_response::<ListConversationsResponse>(resp)?;
assert_eq!(items.len(), 2); assert_eq!(items.len(), 2);
// Newest first; preview text should match // Newest first; preview text should match
@@ -90,20 +82,17 @@ async fn test_list_and_resume_conversations() {
cursor: next_cursor, cursor: next_cursor,
model_providers: None, model_providers: None,
}) })
.await .await?;
.expect("send listConversations page 2");
let resp2: JSONRPCResponse = timeout( let resp2: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id2)), mcp.read_stream_until_response_message(RequestId::Integer(req_id2)),
) )
.await .await??;
.expect("listConversations page 2 timeout")
.expect("listConversations page 2 resp");
let ListConversationsResponse { let ListConversationsResponse {
items: items2, items: items2,
next_cursor: next2, next_cursor: next2,
.. ..
} = to_response::<ListConversationsResponse>(resp2).expect("deserialize response"); } = to_response::<ListConversationsResponse>(resp2)?;
assert_eq!(items2.len(), 1); assert_eq!(items2.len(), 1);
assert_eq!(items2[0].preview, "Hello C"); assert_eq!(items2[0].preview, "Hello C");
assert_eq!(items2[0].model_provider, "openai"); assert_eq!(items2[0].model_provider, "openai");
@@ -116,7 +105,7 @@ async fn test_list_and_resume_conversations() {
"2025-01-01T11:30:00Z", "2025-01-01T11:30:00Z",
"Hello TP", "Hello TP",
Some("test-provider"), Some("test-provider"),
); )?;
// Filtering by model provider should return only matching sessions. // Filtering by model provider should return only matching sessions.
let filter_req_id = mcp let filter_req_id = mcp
@@ -125,19 +114,16 @@ async fn test_list_and_resume_conversations() {
cursor: None, cursor: None,
model_providers: Some(vec!["test-provider".to_string()]), model_providers: Some(vec!["test-provider".to_string()]),
}) })
.await .await?;
.expect("send listConversations filtered");
let filter_resp: JSONRPCResponse = timeout( let filter_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(filter_req_id)), mcp.read_stream_until_response_message(RequestId::Integer(filter_req_id)),
) )
.await .await??;
.expect("listConversations filtered timeout")
.expect("listConversations filtered resp");
let ListConversationsResponse { let ListConversationsResponse {
items: filtered_items, items: filtered_items,
next_cursor: filtered_next, next_cursor: filtered_next,
} = to_response::<ListConversationsResponse>(filter_resp).expect("deserialize filtered"); } = to_response::<ListConversationsResponse>(filter_resp)?;
assert_eq!(filtered_items.len(), 1); assert_eq!(filtered_items.len(), 1);
assert_eq!(filtered_next, None); assert_eq!(filtered_next, None);
assert_eq!(filtered_items[0].preview, "Hello TP"); assert_eq!(filtered_items[0].preview, "Hello TP");
@@ -150,20 +136,16 @@ async fn test_list_and_resume_conversations() {
cursor: None, cursor: None,
model_providers: Some(Vec::new()), model_providers: Some(Vec::new()),
}) })
.await .await?;
.expect("send listConversations unfiltered");
let unfiltered_resp: JSONRPCResponse = timeout( let unfiltered_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unfiltered_req_id)), mcp.read_stream_until_response_message(RequestId::Integer(unfiltered_req_id)),
) )
.await .await??;
.expect("listConversations unfiltered timeout")
.expect("listConversations unfiltered resp");
let ListConversationsResponse { let ListConversationsResponse {
items: unfiltered_items, items: unfiltered_items,
next_cursor: unfiltered_next, next_cursor: unfiltered_next,
} = to_response::<ListConversationsResponse>(unfiltered_resp) } = to_response::<ListConversationsResponse>(unfiltered_resp)?;
.expect("deserialize unfiltered response");
assert_eq!(unfiltered_items.len(), 4); assert_eq!(unfiltered_items.len(), 4);
assert!(unfiltered_next.is_none()); assert!(unfiltered_next.is_none());
@@ -173,19 +155,16 @@ async fn test_list_and_resume_conversations() {
cursor: None, cursor: None,
model_providers: Some(vec!["other".to_string()]), model_providers: Some(vec!["other".to_string()]),
}) })
.await .await?;
.expect("send listConversations filtered empty");
let empty_resp: JSONRPCResponse = timeout( let empty_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(empty_req_id)), mcp.read_stream_until_response_message(RequestId::Integer(empty_req_id)),
) )
.await .await??;
.expect("listConversations filtered empty timeout")
.expect("listConversations filtered empty resp");
let ListConversationsResponse { let ListConversationsResponse {
items: empty_items, items: empty_items,
next_cursor: empty_next, next_cursor: empty_next,
} = to_response::<ListConversationsResponse>(empty_resp).expect("deserialize filtered empty"); } = to_response::<ListConversationsResponse>(empty_resp)?;
assert!(empty_items.is_empty()); assert!(empty_items.is_empty());
assert!(empty_next.is_none()); assert!(empty_next.is_none());
@@ -198,20 +177,15 @@ async fn test_list_and_resume_conversations() {
..Default::default() ..Default::default()
}), }),
}) })
.await .await?;
.expect("send resumeConversation");
// Expect a codex/event notification with msg.type == sessionConfigured // Expect a codex/event notification with msg.type == sessionConfigured
let notification: JSONRPCNotification = timeout( let notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("sessionConfigured"), mcp.read_stream_until_notification_message("sessionConfigured"),
) )
.await .await??;
.expect("sessionConfigured notification timeout") let session_configured: ServerNotification = notification.try_into()?;
.expect("sessionConfigured notification");
let session_configured: ServerNotification = notification
.try_into()
.expect("deserialize sessionConfigured notification");
// Basic shape assertion: ensure event type is sessionConfigured // Basic shape assertion: ensure event type is sessionConfigured
let ServerNotification::SessionConfigured(SessionConfiguredNotification { let ServerNotification::SessionConfigured(SessionConfiguredNotification {
model, model,
@@ -229,15 +203,14 @@ async fn test_list_and_resume_conversations() {
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_req_id)), mcp.read_stream_until_response_message(RequestId::Integer(resume_req_id)),
) )
.await .await??;
.expect("resumeConversation timeout")
.expect("resumeConversation resp");
let ResumeConversationResponse { let ResumeConversationResponse {
conversation_id, .. conversation_id, ..
} = to_response::<ResumeConversationResponse>(resume_resp) } = to_response::<ResumeConversationResponse>(resume_resp)?;
.expect("deserialize resumeConversation response");
// conversation id should be a valid UUID // conversation id should be a valid UUID
assert!(!conversation_id.to_string().is_empty()); assert!(!conversation_id.to_string().is_empty());
Ok(())
} }
fn create_fake_rollout( fn create_fake_rollout(
@@ -246,14 +219,14 @@ fn create_fake_rollout(
meta_rfc3339: &str, meta_rfc3339: &str,
preview: &str, preview: &str,
model_provider: Option<&str>, model_provider: Option<&str>,
) { ) -> Result<()> {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
// sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss) // sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4]; let year = &filename_ts[0..4];
let month = &filename_ts[5..7]; let month = &filename_ts[5..7];
let day = &filename_ts[8..10]; let day = &filename_ts[8..10];
let dir = codex_home.join("sessions").join(year).join(month).join(day); let dir = codex_home.join("sessions").join(year).join(month).join(day);
fs::create_dir_all(&dir).unwrap_or_else(|e| panic!("create sessions dir: {e}")); fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl")); let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new(); let mut lines = Vec::new();
@@ -303,6 +276,6 @@ fn create_fake_rollout(
}) })
.to_string(), .to_string(),
); );
fs::write(file_path, lines.join("\n") + "\n") fs::write(file_path, lines.join("\n") + "\n")?;
.unwrap_or_else(|e| panic!("write rollout file: {e}")); Ok(())
} }

View File

@@ -1,6 +1,4 @@
use std::path::Path; use anyhow::Result;
use std::time::Duration;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::CancelLoginChatGptParams; use codex_app_server_protocol::CancelLoginChatGptParams;
@@ -15,6 +13,8 @@ use codex_app_server_protocol::RequestId;
use codex_core::auth::AuthCredentialsStoreMode; use codex_core::auth::AuthCredentialsStoreMode;
use codex_login::login_with_api_key; use codex_login::login_with_api_key;
use serial_test::serial; use serial_test::serial;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
@@ -43,37 +43,26 @@ stream_max_retries = 0
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn logout_chatgpt_removes_auth() { async fn logout_chatgpt_removes_auth() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).expect("write config.toml"); create_config_toml(codex_home.path())?;
login_with_api_key( login_with_api_key(
codex_home.path(), codex_home.path(),
"sk-test-key", "sk-test-key",
AuthCredentialsStoreMode::File, AuthCredentialsStoreMode::File,
) )?;
.expect("seed api key");
assert!(codex_home.path().join("auth.json").exists()); assert!(codex_home.path().join("auth.json").exists());
let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]) let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let id = mcp let id = mcp.send_logout_chat_gpt_request().await?;
.send_logout_chat_gpt_request()
.await
.expect("send logoutChatGpt");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(id)), mcp.read_stream_until_response_message(RequestId::Integer(id)),
) )
.await .await??;
.expect("logoutChatGpt timeout") let _ok: LogoutChatGptResponse = to_response(resp)?;
.expect("logoutChatGpt response");
let _ok: LogoutChatGptResponse = to_response(resp).expect("deserialize logout response");
assert!( assert!(
!codex_home.path().join("auth.json").exists(), !codex_home.path().join("auth.json").exists(),
@@ -86,63 +75,47 @@ async fn logout_chatgpt_removes_auth() {
include_token: Some(true), include_token: Some(true),
refresh_token: Some(false), refresh_token: Some(false),
}) })
.await .await?;
.expect("send getAuthStatus");
let status_resp: JSONRPCResponse = timeout( let status_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(status_id)), mcp.read_stream_until_response_message(RequestId::Integer(status_id)),
) )
.await .await??;
.expect("getAuthStatus timeout") let status: GetAuthStatusResponse = to_response(status_resp)?;
.expect("getAuthStatus response");
let status: GetAuthStatusResponse = to_response(status_resp).expect("deserialize status");
assert_eq!(status.auth_method, None); assert_eq!(status.auth_method, None);
assert_eq!(status.auth_token, None); assert_eq!(status.auth_token, None);
Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch the login server since it binds to a fixed port. // Serialize tests that launch the login server since it binds to a fixed port.
#[serial(login_port)] #[serial(login_port)]
async fn login_and_cancel_chatgpt() { async fn login_and_cancel_chatgpt() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).unwrap_or_else(|err| panic!("write config.toml: {err}")); create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let login_id = mcp let login_id = mcp.send_login_chat_gpt_request().await?;
.send_login_chat_gpt_request()
.await
.expect("send loginChatGpt");
let login_resp: JSONRPCResponse = timeout( let login_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(login_id)), mcp.read_stream_until_response_message(RequestId::Integer(login_id)),
) )
.await .await??;
.expect("loginChatGpt timeout") let login: LoginChatGptResponse = to_response(login_resp)?;
.expect("loginChatGpt response");
let login: LoginChatGptResponse = to_response(login_resp).expect("deserialize login resp");
let cancel_id = mcp let cancel_id = mcp
.send_cancel_login_chat_gpt_request(CancelLoginChatGptParams { .send_cancel_login_chat_gpt_request(CancelLoginChatGptParams {
login_id: login.login_id, login_id: login.login_id,
}) })
.await .await?;
.expect("send cancelLoginChatGpt");
let cancel_resp: JSONRPCResponse = timeout( let cancel_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(cancel_id)), mcp.read_stream_until_response_message(RequestId::Integer(cancel_id)),
) )
.await .await??;
.expect("cancelLoginChatGpt timeout") let _ok: CancelLoginChatGptResponse = to_response(cancel_resp)?;
.expect("cancelLoginChatGpt response");
let _ok: CancelLoginChatGptResponse =
to_response(cancel_resp).expect("deserialize cancel response");
// Optionally observe the completion notification; do not fail if it races. // Optionally observe the completion notification; do not fail if it races.
let maybe_note = timeout( let maybe_note = timeout(
@@ -153,6 +126,7 @@ async fn login_and_cancel_chatgpt() {
if maybe_note.is_err() { if maybe_note.is_err() {
eprintln!("warning: did not observe login_chat_gpt_complete notification after cancel"); eprintln!("warning: did not observe login_chat_gpt_complete notification after cancel");
} }
Ok(())
} }
fn create_config_toml_forced_login(codex_home: &Path, forced_method: &str) -> std::io::Result<()> { fn create_config_toml_forced_login(codex_home: &Path, forced_method: &str) -> std::io::Result<()> {
@@ -185,68 +159,48 @@ forced_chatgpt_workspace_id = "{workspace_id}"
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn login_chatgpt_rejected_when_forced_api() { async fn login_chatgpt_rejected_when_forced_api() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml_forced_login(codex_home.path(), "api") create_config_toml_forced_login(codex_home.path(), "api")?;
.unwrap_or_else(|err| panic!("write config.toml: {err}"));
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let request_id = mcp let request_id = mcp.send_login_chat_gpt_request().await?;
.send_login_chat_gpt_request()
.await
.expect("send loginChatGpt");
let err: JSONRPCError = timeout( let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)), mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("loginChatGpt error timeout")
.expect("loginChatGpt error");
assert_eq!( assert_eq!(
err.error.message, err.error.message,
"ChatGPT login is disabled. Use API key login instead." "ChatGPT login is disabled. Use API key login instead."
); );
Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// Serialize tests that launch the login server since it binds to a fixed port. // Serialize tests that launch the login server since it binds to a fixed port.
#[serial(login_port)] #[serial(login_port)]
async fn login_chatgpt_includes_forced_workspace_query_param() { async fn login_chatgpt_includes_forced_workspace_query_param() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|e| panic!("create tempdir: {e}")); let codex_home = TempDir::new()?;
create_config_toml_forced_workspace(codex_home.path(), "ws-forced") create_config_toml_forced_workspace(codex_home.path(), "ws-forced")?;
.unwrap_or_else(|err| panic!("write config.toml: {err}"));
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let request_id = mcp let request_id = mcp.send_login_chat_gpt_request().await?;
.send_login_chat_gpt_request()
.await
.expect("send loginChatGpt");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("loginChatGpt timeout")
.expect("loginChatGpt response");
let login: LoginChatGptResponse = to_response(resp).expect("deserialize login resp"); let login: LoginChatGptResponse = to_response(resp)?;
assert!( assert!(
login.auth_url.contains("allowed_workspace_id=ws-forced"), login.auth_url.contains("allowed_workspace_id=ws-forced"),
"auth URL should include forced workspace" "auth URL should include forced workspace"
); );
Ok(())
} }

View File

@@ -1,4 +1,3 @@
use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use app_test_support::ChatGptAuthFixture; use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess; use app_test_support::McpProcess;
@@ -29,28 +28,18 @@ const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_account_rate_limits_requires_auth() -> Result<()> { async fn get_account_rate_limits_requires_auth() -> Result<()> {
let codex_home = TempDir::new().context("create codex home tempdir")?; let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]) let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.context("spawn mcp process")?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.context("initialize timeout")?
.context("initialize request")?;
let request_id = mcp let request_id = mcp.send_get_account_rate_limits_request().await?;
.send_get_account_rate_limits_request()
.await
.context("send account/rateLimits/read")?;
let error: JSONRPCError = timeout( let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)), mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
) )
.await .await??;
.context("account/rateLimits/read timeout")?
.context("account/rateLimits/read error")?;
assert_eq!(error.id, RequestId::Integer(request_id)); assert_eq!(error.id, RequestId::Integer(request_id));
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE); assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
@@ -64,30 +53,20 @@ async fn get_account_rate_limits_requires_auth() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_account_rate_limits_requires_chatgpt_auth() -> Result<()> { async fn get_account_rate_limits_requires_chatgpt_auth() -> Result<()> {
let codex_home = TempDir::new().context("create codex home tempdir")?; let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.context("spawn mcp process")?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.context("initialize timeout")?
.context("initialize request")?;
login_with_api_key(&mut mcp, "sk-test-key").await?; login_with_api_key(&mut mcp, "sk-test-key").await?;
let request_id = mcp let request_id = mcp.send_get_account_rate_limits_request().await?;
.send_get_account_rate_limits_request()
.await
.context("send account/rateLimits/read")?;
let error: JSONRPCError = timeout( let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)), mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
) )
.await .await??;
.context("account/rateLimits/read timeout")?
.context("account/rateLimits/read error")?;
assert_eq!(error.id, RequestId::Integer(request_id)); assert_eq!(error.id, RequestId::Integer(request_id));
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE); assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
@@ -101,19 +80,18 @@ async fn get_account_rate_limits_requires_chatgpt_auth() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_account_rate_limits_returns_snapshot() -> Result<()> { async fn get_account_rate_limits_returns_snapshot() -> Result<()> {
let codex_home = TempDir::new().context("create codex home tempdir")?; let codex_home = TempDir::new()?;
write_chatgpt_auth( write_chatgpt_auth(
codex_home.path(), codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token") ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123") .account_id("account-123")
.plan_type("pro"), .plan_type("pro"),
AuthCredentialsStoreMode::File, AuthCredentialsStoreMode::File,
) )?;
.context("write chatgpt auth")?;
let server = MockServer::start().await; let server = MockServer::start().await;
let server_url = server.uri(); let server_url = server.uri();
write_chatgpt_base_url(codex_home.path(), &server_url).context("write chatgpt base url")?; write_chatgpt_base_url(codex_home.path(), &server_url)?;
let primary_reset_timestamp = chrono::DateTime::parse_from_rfc3339("2025-01-01T00:02:00Z") let primary_reset_timestamp = chrono::DateTime::parse_from_rfc3339("2025-01-01T00:02:00Z")
.expect("parse primary reset timestamp") .expect("parse primary reset timestamp")
@@ -149,29 +127,18 @@ async fn get_account_rate_limits_returns_snapshot() -> Result<()> {
.mount(&server) .mount(&server)
.await; .await;
let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]) let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.context("spawn mcp process")?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.context("initialize timeout")?
.context("initialize request")?;
let request_id = mcp let request_id = mcp.send_get_account_rate_limits_request().await?;
.send_get_account_rate_limits_request()
.await
.context("send account/rateLimits/read")?;
let response: JSONRPCResponse = timeout( let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.context("account/rateLimits/read timeout")?
.context("account/rateLimits/read response")?;
let received: GetAccountRateLimitsResponse = let received: GetAccountRateLimitsResponse = to_response(response)?;
to_response(response).context("deserialize rate limit response")?;
let expected = GetAccountRateLimitsResponse { let expected = GetAccountRateLimitsResponse {
rate_limits: RateLimitSnapshot { rate_limits: RateLimitSnapshot {
@@ -197,16 +164,13 @@ async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> {
.send_login_api_key_request(LoginApiKeyParams { .send_login_api_key_request(LoginApiKeyParams {
api_key: api_key.to_string(), api_key: api_key.to_string(),
}) })
.await .await?;
.context("send loginApiKey")?;
timeout( timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.context("loginApiKey timeout")?
.context("loginApiKey response")?;
Ok(()) Ok(())
} }

View File

@@ -1,5 +1,4 @@
use std::path::Path; use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server; use app_test_support::create_mock_chat_completions_server;
@@ -18,49 +17,42 @@ use codex_protocol::ConversationId;
use codex_protocol::models::ContentItem; use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem; use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test] #[tokio::test]
async fn test_send_message_success() { async fn test_send_message_success() -> Result<()> {
// Spin up a mock completions server that immediately ends the Codex turn. // Spin up a mock completions server that immediately ends the Codex turn.
// Two Codex turns hit the mock model (session start + send-user-message). Provide two SSE responses. // Two Codex turns hit the mock model (session start + send-user-message). Provide two SSE responses.
let responses = vec![ let responses = vec![
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"), create_final_assistant_message_sse_response("Done")?,
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"), create_final_assistant_message_sse_response("Done")?,
]; ];
let server = create_mock_chat_completions_server(responses).await; let server = create_mock_chat_completions_server(responses).await;
// Create a temporary Codex home with config pointing at the mock server. // Create a temporary Codex home with config pointing at the mock server.
let codex_home = TempDir::new().expect("create temp dir"); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml"); create_config_toml(codex_home.path(), &server.uri())?;
// Start MCP server process and initialize. // Start MCP server process and initialize.
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timed out")
.expect("init failed");
// Start a conversation using the new wire API. // Start a conversation using the new wire API.
let new_conv_id = mcp let new_conv_id = mcp
.send_new_conversation_request(NewConversationParams::default()) .send_new_conversation_request(NewConversationParams::default())
.await .await?;
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout( let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
) )
.await .await??;
.expect("newConversation timeout")
.expect("newConversation resp");
let NewConversationResponse { let NewConversationResponse {
conversation_id, .. conversation_id, ..
} = to_response::<_>(new_conv_resp).expect("deserialize newConversation response"); } = to_response::<_>(new_conv_resp)?;
// 2) addConversationListener // 2) addConversationListener
let add_listener_id = mcp let add_listener_id = mcp
@@ -68,25 +60,27 @@ async fn test_send_message_success() {
conversation_id, conversation_id,
experimental_raw_events: false, experimental_raw_events: false,
}) })
.await .await?;
.expect("send addConversationListener");
let add_listener_resp: JSONRPCResponse = timeout( let add_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
) )
.await .await??;
.expect("addConversationListener timeout")
.expect("addConversationListener resp");
let AddConversationSubscriptionResponse { subscription_id: _ } = let AddConversationSubscriptionResponse { subscription_id: _ } =
to_response::<_>(add_listener_resp).expect("deserialize addConversationListener response"); to_response::<_>(add_listener_resp)?;
// Now exercise sendUserMessage twice. // Now exercise sendUserMessage twice.
send_message("Hello", conversation_id, &mut mcp).await; send_message("Hello", conversation_id, &mut mcp).await?;
send_message("Hello again", conversation_id, &mut mcp).await; send_message("Hello again", conversation_id, &mut mcp).await?;
Ok(())
} }
#[expect(clippy::expect_used)] #[expect(clippy::expect_used)]
async fn send_message(message: &str, conversation_id: ConversationId, mcp: &mut McpProcess) { async fn send_message(
message: &str,
conversation_id: ConversationId,
mcp: &mut McpProcess,
) -> Result<()> {
// Now exercise sendUserMessage. // Now exercise sendUserMessage.
let send_id = mcp let send_id = mcp
.send_send_user_message_request(SendUserMessageParams { .send_send_user_message_request(SendUserMessageParams {
@@ -95,19 +89,15 @@ async fn send_message(message: &str, conversation_id: ConversationId, mcp: &mut
text: message.to_string(), text: message.to_string(),
}], }],
}) })
.await .await?;
.expect("send sendUserMessage");
let response: JSONRPCResponse = timeout( let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_id)), mcp.read_stream_until_response_message(RequestId::Integer(send_id)),
) )
.await .await??;
.expect("sendUserMessage response timeout")
.expect("sendUserMessage response error");
let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(response) let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(response)?;
.expect("deserialize sendUserMessage response");
// Verify the task_finished notification is received. // Verify the task_finished notification is received.
// Note this also ensures that the final request to the server was made. // Note this also ensures that the final request to the server was made.
@@ -115,9 +105,7 @@ async fn send_message(message: &str, conversation_id: ConversationId, mcp: &mut
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"), mcp.read_stream_until_notification_message("codex/event/task_complete"),
) )
.await .await??;
.expect("task_finished_notification timeout")
.expect("task_finished_notification resp");
let serde_json::Value::Object(map) = task_finished_notification let serde_json::Value::Object(map) = task_finished_notification
.params .params
.expect("notification should have params") .expect("notification should have params")
@@ -139,57 +127,45 @@ async fn send_message(message: &str, conversation_id: ConversationId, mcp: &mut
raw_attempt.is_err(), raw_attempt.is_err(),
"unexpected raw item notification when not opted in" "unexpected raw item notification when not opted in"
); );
Ok(())
} }
#[tokio::test] #[tokio::test]
async fn test_send_message_raw_notifications_opt_in() { async fn test_send_message_raw_notifications_opt_in() -> Result<()> {
let responses = vec![ let responses = vec![create_final_assistant_message_sse_response("Done")?];
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
];
let server = create_mock_chat_completions_server(responses).await; let server = create_mock_chat_completions_server(responses).await;
let codex_home = TempDir::new().expect("create temp dir"); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml"); create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timed out")
.expect("init failed");
let new_conv_id = mcp let new_conv_id = mcp
.send_new_conversation_request(NewConversationParams::default()) .send_new_conversation_request(NewConversationParams::default())
.await .await?;
.expect("send newConversation");
let new_conv_resp: JSONRPCResponse = timeout( let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
) )
.await .await??;
.expect("newConversation timeout")
.expect("newConversation resp");
let NewConversationResponse { let NewConversationResponse {
conversation_id, .. conversation_id, ..
} = to_response::<_>(new_conv_resp).expect("deserialize newConversation response"); } = to_response::<_>(new_conv_resp)?;
let add_listener_id = mcp let add_listener_id = mcp
.send_add_conversation_listener_request(AddConversationListenerParams { .send_add_conversation_listener_request(AddConversationListenerParams {
conversation_id, conversation_id,
experimental_raw_events: true, experimental_raw_events: true,
}) })
.await .await?;
.expect("send addConversationListener");
let add_listener_resp: JSONRPCResponse = timeout( let add_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
) )
.await .await??;
.expect("addConversationListener timeout")
.expect("addConversationListener resp");
let AddConversationSubscriptionResponse { subscription_id: _ } = let AddConversationSubscriptionResponse { subscription_id: _ } =
to_response::<_>(add_listener_resp).expect("deserialize addConversationListener response"); to_response::<_>(add_listener_resp)?;
let send_id = mcp let send_id = mcp
.send_send_user_message_request(SendUserMessageParams { .send_send_user_message_request(SendUserMessageParams {
@@ -198,8 +174,7 @@ async fn test_send_message_raw_notifications_opt_in() {
text: "Hello".to_string(), text: "Hello".to_string(),
}], }],
}) })
.await .await?;
.expect("send sendUserMessage");
let instructions = read_raw_response_item(&mut mcp, conversation_id).await; let instructions = read_raw_response_item(&mut mcp, conversation_id).await;
assert_instructions_message(&instructions); assert_instructions_message(&instructions);
@@ -211,11 +186,8 @@ async fn test_send_message_raw_notifications_opt_in() {
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_id)), mcp.read_stream_until_response_message(RequestId::Integer(send_id)),
) )
.await .await??;
.expect("sendUserMessage response timeout") let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(response)?;
.expect("sendUserMessage response error");
let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(response)
.expect("deserialize sendUserMessage response");
let user_message = read_raw_response_item(&mut mcp, conversation_id).await; let user_message = read_raw_response_item(&mut mcp, conversation_id).await;
assert_user_message(&user_message, "Hello"); assert_user_message(&user_message, "Hello");
@@ -228,17 +200,16 @@ async fn test_send_message_raw_notifications_opt_in() {
mcp.read_stream_until_notification_message("codex/event/task_complete"), mcp.read_stream_until_notification_message("codex/event/task_complete"),
) )
.await; .await;
Ok(())
} }
#[tokio::test] #[tokio::test]
async fn test_send_message_session_not_found() { async fn test_send_message_session_not_found() -> Result<()> {
// Start MCP without creating a Codex session // Start MCP without creating a Codex session
let codex_home = TempDir::new().expect("tempdir"); let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await.expect("spawn"); let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.await
.expect("timeout")
.expect("init");
let unknown = ConversationId::new(); let unknown = ConversationId::new();
let req_id = mcp let req_id = mcp
@@ -248,18 +219,16 @@ async fn test_send_message_session_not_found() {
text: "ping".to_string(), text: "ping".to_string(),
}], }],
}) })
.await .await?;
.expect("send sendUserMessage");
// Expect an error response for unknown conversation. // Expect an error response for unknown conversation.
let err = timeout( let err = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(req_id)), mcp.read_stream_until_error_message(RequestId::Integer(req_id)),
) )
.await .await??;
.expect("timeout")
.expect("error");
assert_eq!(err.id, RequestId::Integer(req_id)); assert_eq!(err.id, RequestId::Integer(req_id));
Ok(())
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@@ -1,5 +1,4 @@
use std::path::Path; use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::JSONRPCResponse;
@@ -8,50 +7,38 @@ use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse; use codex_app_server_protocol::SetDefaultModelResponse;
use codex_core::config::ConfigToml; use codex_core::config::ConfigToml;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn set_default_model_persists_overrides() { async fn set_default_model_persists_overrides() -> Result<()> {
let codex_home = TempDir::new().expect("create tempdir"); let codex_home = TempDir::new()?;
create_config_toml(codex_home.path()).expect("write config.toml"); create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("init timeout")
.expect("init failed");
let params = SetDefaultModelParams { let params = SetDefaultModelParams {
model: Some("gpt-4.1".to_string()), model: Some("gpt-4.1".to_string()),
reasoning_effort: None, reasoning_effort: None,
}; };
let request_id = mcp let request_id = mcp.send_set_default_model_request(params).await?;
.send_set_default_model_request(params)
.await
.expect("send setDefaultModel");
let resp: JSONRPCResponse = timeout( let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("setDefaultModel timeout")
.expect("setDefaultModel response");
let _: SetDefaultModelResponse = let _: SetDefaultModelResponse = to_response(resp)?;
to_response(resp).expect("deserialize setDefaultModel response");
let config_path = codex_home.path().join("config.toml"); let config_path = codex_home.path().join("config.toml");
let config_contents = tokio::fs::read_to_string(&config_path) let config_contents = tokio::fs::read_to_string(&config_path).await?;
.await let config_toml: ConfigToml = toml::from_str(&config_contents)?;
.expect("read config.toml");
let config_toml: ConfigToml = toml::from_str(&config_contents).expect("parse config.toml");
assert_eq!( assert_eq!(
ConfigToml { ConfigToml {
@@ -61,6 +48,7 @@ async fn set_default_model_persists_overrides() {
}, },
config_toml, config_toml,
); );
Ok(())
} }
// Helper to create a config.toml; mirrors create_conversation.rs // Helper to create a config.toml; mirrors create_conversation.rs

View File

@@ -1,3 +1,4 @@
use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::GetUserAgentResponse; use codex_app_server_protocol::GetUserAgentResponse;
@@ -10,28 +11,18 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_user_agent_returns_current_codex_user_agent() { async fn get_user_agent_returns_current_codex_user_agent() -> Result<()> {
let codex_home = TempDir::new().unwrap_or_else(|err| panic!("create tempdir: {err}")); let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("initialize timeout")
.expect("initialize request");
let request_id = mcp let request_id = mcp.send_get_user_agent_request().await?;
.send_get_user_agent_request()
.await
.expect("send getUserAgent");
let response: JSONRPCResponse = timeout( let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("getUserAgent timeout")
.expect("getUserAgent response");
let os_info = os_info::get(); let os_info = os_info::get();
let user_agent = format!( let user_agent = format!(
@@ -42,9 +33,9 @@ async fn get_user_agent_returns_current_codex_user_agent() {
codex_core::terminal::user_agent() codex_core::terminal::user_agent()
); );
let received: GetUserAgentResponse = let received: GetUserAgentResponse = to_response(response)?;
to_response(response).expect("deserialize getUserAgent response");
let expected = GetUserAgentResponse { user_agent }; let expected = GetUserAgentResponse { user_agent };
assert_eq!(received, expected); assert_eq!(received, expected);
Ok(())
} }

View File

@@ -1,5 +1,4 @@
use std::time::Duration; use anyhow::Result;
use app_test_support::ChatGptAuthFixture; use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
@@ -9,14 +8,15 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::UserInfoResponse; use codex_app_server_protocol::UserInfoResponse;
use codex_core::auth::AuthCredentialsStoreMode; use codex_core::auth::AuthCredentialsStoreMode;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::time::Duration;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_info_returns_email_from_auth_json() { async fn user_info_returns_email_from_auth_json() -> Result<()> {
let codex_home = TempDir::new().expect("create tempdir"); let codex_home = TempDir::new()?;
write_chatgpt_auth( write_chatgpt_auth(
codex_home.path(), codex_home.path(),
@@ -24,30 +24,23 @@ async fn user_info_returns_email_from_auth_json() {
.refresh_token("refresh") .refresh_token("refresh")
.email("user@example.com"), .email("user@example.com"),
AuthCredentialsStoreMode::File, AuthCredentialsStoreMode::File,
) )?;
.expect("write chatgpt auth");
let mut mcp = McpProcess::new(codex_home.path()) let mut mcp = McpProcess::new(codex_home.path()).await?;
.await timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("initialize timeout")
.expect("initialize request");
let request_id = mcp.send_user_info_request().await.expect("send userInfo"); let request_id = mcp.send_user_info_request().await?;
let response: JSONRPCResponse = timeout( let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT, DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)), mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
) )
.await .await??;
.expect("userInfo timeout")
.expect("userInfo response");
let received: UserInfoResponse = to_response(response).expect("deserialize userInfo response"); let received: UserInfoResponse = to_response(response)?;
let expected = UserInfoResponse { let expected = UserInfoResponse {
alleged_user_email: Some("user@example.com".to_string()), alleged_user_email: Some("user@example.com".to_string()),
}; };
assert_eq!(received, expected); assert_eq!(received, expected);
Ok(())
} }