Files
llmx/codex-rs/core/tests/client.rs
Michael Bolin 08ed618f72 chore: introduce ConversationManager as a clearinghouse for all conversations (#2240)
This PR does two things because after I got deep into the first one I
started pulling on the thread to the second:

- Makes `ConversationManager` the place where all in-memory
conversations are created and stored. Previously, `MessageProcessor` in
the `codex-mcp-server` crate was doing this via its `session_map`, but
this is something that should be done in `codex-core`.
- It unwinds the `ctrl_c: tokio::sync::Notify` that was threaded
throughout our code. I think this made sense at one time, but now that
we handle Ctrl-C within the TUI and have a proper `Op::Interrupt` event,
I don't think this was quite right, so I removed it. For `codex exec`
and `codex proto`, we now use `tokio::signal::ctrl_c()` directly, but we
no longer make `Notify` a field of `Codex` or `CodexConversation`.

Changes of note:

- Adds the files `conversation_manager.rs` and `codex_conversation.rs`
to `codex-core`.
- `Codex` and `CodexSpawnOk` are no longer exported from `codex-core`:
other crates must use `CodexConversation` instead (which is created via
`ConversationManager`).
- `core/src/codex_wrapper.rs` has been deleted in favor of
`ConversationManager`.
- `ConversationManager::new_conversation()` returns `NewConversation`,
which is in line with the `new_conversation` tool we want to add to the
MCP server. Note `NewConversation` includes `SessionConfiguredEvent`, so
we eliminate checks in cases like `codex-rs/core/tests/client.rs` to
verify `SessionConfiguredEvent` is the first event because that is now
internal to `ConversationManager`.
- Quite a bit of code was deleted from
`codex-rs/mcp-server/src/message_processor.rs` since it no longer has to
manage multiple conversations itself: it goes through
`ConversationManager` instead.
- `core/tests/live_agent.rs` has been deleted because I had to update a
bunch of tests and all the tests in here were ignored, and I don't think
anyone ever ran them, so this was just technical debt, at this point.
- Removed `notify_on_sigint()` from `util.rs` (and in a follow-up, I
hope to refactor the blandly-named `util.rs` into more descriptive
files).
- In general, I started replacing local variables named `codex` as
`conversation`, where appropriate, though admittedly I didn't do it
through all the integration tests because that would have added a lot of
noise to this PR.




---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/2240).
* #2264
* #2263
* __->__ #2240
2025-08-13 13:38:18 -07:00

542 lines
18 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#![allow(clippy::expect_used, clippy::unwrap_used)]
use codex_core::ConversationManager;
use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::WireApi;
use codex_core::built_in_model_providers;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_login::CodexAuth;
use core_test_support::load_default_config_for_test;
use core_test_support::load_sse_fixture_with_id;
use core_test_support::wait_for_event;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header_regex;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::query_param;
/// Build minimal SSE stream with completed marker using the JSON fixture.
fn sse_completed(id: &str) -> String {
load_sse_fixture_with_id("tests/fixtures/completed_template.json", id)
}
fn assert_message_role(request_body: &serde_json::Value, role: &str) {
assert_eq!(request_body["role"].as_str().unwrap(), role);
}
fn assert_message_starts_with(request_body: &serde_json::Value, text: &str) {
let content = request_body["content"][0]["text"]
.as_str()
.expect("invalid message content");
assert!(
content.starts_with(text),
"expected message content '{content}' to start with '{text}'"
);
}
fn assert_message_ends_with(request_body: &serde_json::Value, text: &str) {
let content = request_body["content"][0]["text"]
.as_str()
.expect("invalid message content");
assert!(
content.ends_with(text),
"expected message content '{content}' to end with '{text}'"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn includes_session_id_and_model_headers_in_request() {
#![allow(clippy::unwrap_used)]
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
// Init session
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
let conversation_manager = ConversationManager::default();
let NewConversation {
conversation: codex,
conversation_id,
session_configured: _,
} = conversation_manager
.new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key")))
.await
.expect("create new conversation");
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// get request from the server
let request = &server.received_requests().await.unwrap()[0];
let request_session_id = request.headers.get("session_id").unwrap();
let request_authorization = request.headers.get("authorization").unwrap();
let request_originator = request.headers.get("originator").unwrap();
assert_eq!(
request_session_id.to_str().unwrap(),
conversation_id.to_string()
);
assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs");
assert_eq!(
request_authorization.to_str().unwrap(),
"Bearer Test API Key"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn includes_base_instructions_override_in_request() {
#![allow(clippy::unwrap_used)]
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.base_instructions = Some("test instructions".to_string());
config.model_provider = model_provider;
let conversation_manager = ConversationManager::default();
let codex = conversation_manager
.new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key")))
.await
.expect("create new conversation")
.conversation;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let request = &server.received_requests().await.unwrap()[0];
let request_body = request.body_json::<serde_json::Value>().unwrap();
assert!(
request_body["instructions"]
.as_str()
.unwrap()
.contains("test instructions")
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn originator_config_override_is_used() {
#![allow(clippy::unwrap_used)]
// Mock server
let server = MockServer::start().await;
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
config.internal_originator = Some("my_override".to_string());
let conversation_manager = ConversationManager::default();
let codex = conversation_manager
.new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key")))
.await
.expect("create new conversation")
.conversation;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let request = &server.received_requests().await.unwrap()[0];
let request_originator = request.headers.get("originator").unwrap();
assert_eq!(request_originator.to_str().unwrap(), "my_override");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn chatgpt_auth_sends_correct_request() {
#![allow(clippy::unwrap_used)]
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/api/codex/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/api/codex", server.uri())),
..built_in_model_providers()["openai"].clone()
};
// Init session
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
let conversation_manager = ConversationManager::default();
let NewConversation {
conversation: codex,
conversation_id,
session_configured: _,
} = conversation_manager
.new_conversation_with_auth(config, Some(create_dummy_codex_auth()))
.await
.expect("create new conversation");
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// get request from the server
let request = &server.received_requests().await.unwrap()[0];
let request_session_id = request.headers.get("session_id").unwrap();
let request_authorization = request.headers.get("authorization").unwrap();
let request_originator = request.headers.get("originator").unwrap();
let request_chatgpt_account_id = request.headers.get("chatgpt-account-id").unwrap();
let request_body = request.body_json::<serde_json::Value>().unwrap();
assert_eq!(
request_session_id.to_str().unwrap(),
conversation_id.to_string()
);
assert_eq!(request_originator.to_str().unwrap(), "codex_cli_rs");
assert_eq!(
request_authorization.to_str().unwrap(),
"Bearer Access Token"
);
assert_eq!(request_chatgpt_account_id.to_str().unwrap(), "account_id");
assert!(!request_body["store"].as_bool().unwrap());
assert!(request_body["stream"].as_bool().unwrap());
assert_eq!(
request_body["include"][0].as_str().unwrap(),
"reasoning.encrypted_content"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn includes_user_instructions_message_in_request() {
#![allow(clippy::unwrap_used)]
let server = MockServer::start().await;
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
config.user_instructions = Some("be nice".to_string());
let conversation_manager = ConversationManager::default();
let codex = conversation_manager
.new_conversation_with_auth(config, Some(CodexAuth::from_api_key("Test API Key")))
.await
.expect("create new conversation")
.conversation;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let request = &server.received_requests().await.unwrap()[0];
let request_body = request.body_json::<serde_json::Value>().unwrap();
assert!(
!request_body["instructions"]
.as_str()
.unwrap()
.contains("be nice")
);
assert_message_role(&request_body["input"][0], "user");
assert_message_starts_with(&request_body["input"][0], "<environment_context>\n\n");
assert_message_ends_with(&request_body["input"][0], "</environment_context>");
assert_message_role(&request_body["input"][1], "user");
assert_message_starts_with(&request_body["input"][1], "<user_instructions>\n\n");
assert_message_ends_with(&request_body["input"][1], "</user_instructions>");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn azure_overrides_assign_properties_used_for_responses_url() {
#![allow(clippy::unwrap_used)]
let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" };
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
// Expect POST to /openai/responses with api-version query param
Mock::given(method("POST"))
.and(path("/openai/responses"))
.and(query_param("api-version", "2025-04-01-preview"))
.and(header_regex("Custom-Header", "Value"))
.and(header_regex(
"Authorization",
format!(
"Bearer {}",
std::env::var(existing_env_var_with_random_value).unwrap()
)
.as_str(),
))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let provider = ModelProviderInfo {
name: "custom".to_string(),
base_url: Some(format!("{}/openai", server.uri())),
// Reuse the existing environment variable to avoid using unsafe code
env_key: Some(existing_env_var_with_random_value.to_string()),
query_params: Some(std::collections::HashMap::from([(
"api-version".to_string(),
"2025-04-01-preview".to_string(),
)])),
env_key_instructions: None,
wire_api: WireApi::Responses,
http_headers: Some(std::collections::HashMap::from([(
"Custom-Header".to_string(),
"Value".to_string(),
)])),
env_http_headers: None,
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
requires_openai_auth: false,
};
// Init session
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = provider;
let conversation_manager = ConversationManager::default();
let codex = conversation_manager
.new_conversation_with_auth(config, None)
.await
.expect("create new conversation")
.conversation;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn env_var_overrides_loaded_auth() {
#![allow(clippy::unwrap_used)]
let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" };
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
// Expect POST to /openai/responses with api-version query param
Mock::given(method("POST"))
.and(path("/openai/responses"))
.and(query_param("api-version", "2025-04-01-preview"))
.and(header_regex("Custom-Header", "Value"))
.and(header_regex(
"Authorization",
format!(
"Bearer {}",
std::env::var(existing_env_var_with_random_value).unwrap()
)
.as_str(),
))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let provider = ModelProviderInfo {
name: "custom".to_string(),
base_url: Some(format!("{}/openai", server.uri())),
// Reuse the existing environment variable to avoid using unsafe code
env_key: Some(existing_env_var_with_random_value.to_string()),
query_params: Some(std::collections::HashMap::from([(
"api-version".to_string(),
"2025-04-01-preview".to_string(),
)])),
env_key_instructions: None,
wire_api: WireApi::Responses,
http_headers: Some(std::collections::HashMap::from([(
"Custom-Header".to_string(),
"Value".to_string(),
)])),
env_http_headers: None,
request_max_retries: None,
stream_max_retries: None,
stream_idle_timeout_ms: None,
requires_openai_auth: false,
};
// Init session
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = provider;
let conversation_manager = ConversationManager::default();
let codex = conversation_manager
.new_conversation_with_auth(config, Some(create_dummy_codex_auth()))
.await
.expect("create new conversation")
.conversation;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
}
fn create_dummy_codex_auth() -> CodexAuth {
CodexAuth::create_dummy_chatgpt_auth_for_testing()
}