[app-server] feat: v2 Thread APIs (#6214)

Implements:
```
thread/list
thread/start
thread/resume
thread/archive
```

along with their integration tests. These are relatively light wrappers
around the existing core logic, and changes to core logic are minimal.

However, an improvement made for developer ergonomics:
- `thread/start` and `thread/resume` automatically attaches a
conversation listener internally, so clients don't have to make a
separate `AddConversationListener` call like they do today.

For consistency, also updated `model/list` and `feedback/upload` (naming
conventions, list API params).
This commit is contained in:
Owen Lin
2025-11-05 12:28:43 -08:00
committed by GitHub
parent 79aa83ee39
commit 2ab1650d4d
19 changed files with 1482 additions and 287 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -186,9 +186,11 @@ dependencies = [
"chrono", "chrono",
"codex-app-server-protocol", "codex-app-server-protocol",
"codex-core", "codex-core",
"codex-protocol",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"uuid",
"wiremock", "wiremock",
] ]

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use crate::JSONRPCNotification; use crate::JSONRPCNotification;
@@ -101,11 +102,43 @@ macro_rules! client_request_definitions {
client_request_definitions! { client_request_definitions! {
/// NEW APIs /// NEW APIs
// Thread lifecycle
#[serde(rename = "thread/start")]
#[ts(rename = "thread/start")]
ThreadStart {
params: v2::ThreadStartParams,
response: v2::ThreadStartResponse,
},
#[serde(rename = "thread/resume")]
#[ts(rename = "thread/resume")]
ThreadResume {
params: v2::ThreadResumeParams,
response: v2::ThreadResumeResponse,
},
#[serde(rename = "thread/archive")]
#[ts(rename = "thread/archive")]
ThreadArchive {
params: v2::ThreadArchiveParams,
response: v2::ThreadArchiveResponse,
},
#[serde(rename = "thread/list")]
#[ts(rename = "thread/list")]
ThreadList {
params: v2::ThreadListParams,
response: v2::ThreadListResponse,
},
#[serde(rename = "thread/compact")]
#[ts(rename = "thread/compact")]
ThreadCompact {
params: v2::ThreadCompactParams,
response: v2::ThreadCompactResponse,
},
#[serde(rename = "model/list")] #[serde(rename = "model/list")]
#[ts(rename = "model/list")] #[ts(rename = "model/list")]
ListModels { ModelList {
params: v2::ListModelsParams, params: v2::ModelListParams,
response: v2::ListModelsResponse, response: v2::ModelListResponse,
}, },
#[serde(rename = "account/login")] #[serde(rename = "account/login")]
@@ -131,9 +164,9 @@ client_request_definitions! {
#[serde(rename = "feedback/upload")] #[serde(rename = "feedback/upload")]
#[ts(rename = "feedback/upload")] #[ts(rename = "feedback/upload")]
UploadFeedback { FeedbackUpload {
params: v2::UploadFeedbackParams, params: v2::FeedbackUploadParams,
response: v2::UploadFeedbackResponse, response: v2::FeedbackUploadResponse,
}, },
#[serde(rename = "account/read")] #[serde(rename = "account/read")]
@@ -292,7 +325,7 @@ macro_rules! server_request_definitions {
#[allow(clippy::vec_init_then_push)] #[allow(clippy::vec_init_then_push)]
pub fn export_server_response_schemas( pub fn export_server_response_schemas(
out_dir: &::std::path::Path, out_dir: &Path,
) -> ::anyhow::Result<Vec<GeneratedSchema>> { ) -> ::anyhow::Result<Vec<GeneratedSchema>> {
let mut schemas = Vec::new(); let mut schemas = Vec::new();
paste! { paste! {
@@ -303,7 +336,7 @@ macro_rules! server_request_definitions {
#[allow(clippy::vec_init_then_push)] #[allow(clippy::vec_init_then_push)]
pub fn export_server_param_schemas( pub fn export_server_param_schemas(
out_dir: &::std::path::Path, out_dir: &Path,
) -> ::anyhow::Result<Vec<GeneratedSchema>> { ) -> ::anyhow::Result<Vec<GeneratedSchema>> {
let mut schemas = Vec::new(); let mut schemas = Vec::new();
paste! { paste! {
@@ -741,16 +774,16 @@ mod tests {
#[test] #[test]
fn serialize_list_models() -> Result<()> { fn serialize_list_models() -> Result<()> {
let request = ClientRequest::ListModels { let request = ClientRequest::ModelList {
request_id: RequestId::Integer(6), request_id: RequestId::Integer(6),
params: v2::ListModelsParams::default(), params: v2::ModelListParams::default(),
}; };
assert_eq!( assert_eq!(
json!({ json!({
"method": "model/list", "method": "model/list",
"id": 6, "id": 6,
"params": { "params": {
"pageSize": null, "limit": null,
"cursor": null "cursor": null
} }
}), }),

View File

@@ -1,3 +1,6 @@
use std::collections::HashMap;
use std::path::PathBuf;
use crate::protocol::common::AuthMode; use crate::protocol::common::AuthMode;
use codex_protocol::ConversationId; use codex_protocol::ConversationId;
use codex_protocol::account::PlanType; use codex_protocol::account::PlanType;
@@ -9,10 +12,109 @@ use schemars::JsonSchema;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use std::path::PathBuf;
use ts_rs::TS; use ts_rs::TS;
use uuid::Uuid; use uuid::Uuid;
// Macro to declare a camelCased API v2 enum mirroring a core enum which
// tends to use kebab-case.
macro_rules! v2_enum_from_core {
(
pub enum $Name:ident from $Src:path { $( $Variant:ident ),+ $(,)? }
) => {
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum $Name { $( $Variant ),+ }
impl $Name {
pub fn to_core(self) -> $Src {
match self { $( $Name::$Variant => <$Src>::$Variant ),+ }
}
}
impl From<$Src> for $Name {
fn from(value: $Src) -> Self {
match value { $( <$Src>::$Variant => $Name::$Variant ),+ }
}
}
};
}
v2_enum_from_core!(
pub enum AskForApproval from codex_protocol::protocol::AskForApproval {
UnlessTrusted, OnFailure, OnRequest, Never
}
);
v2_enum_from_core!(
pub enum SandboxMode from codex_protocol::config_types::SandboxMode {
ReadOnly, WorkspaceWrite, DangerFullAccess
}
);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(tag = "mode", rename_all = "camelCase")]
#[ts(tag = "mode")]
#[ts(export_to = "v2/")]
pub enum SandboxPolicy {
DangerFullAccess,
ReadOnly,
WorkspaceWrite {
#[serde(default)]
writable_roots: Vec<PathBuf>,
#[serde(default)]
network_access: bool,
#[serde(default)]
exclude_tmpdir_env_var: bool,
#[serde(default)]
exclude_slash_tmp: bool,
},
}
impl SandboxPolicy {
pub fn to_core(&self) -> codex_protocol::protocol::SandboxPolicy {
match self {
SandboxPolicy::DangerFullAccess => {
codex_protocol::protocol::SandboxPolicy::DangerFullAccess
}
SandboxPolicy::ReadOnly => codex_protocol::protocol::SandboxPolicy::ReadOnly,
SandboxPolicy::WorkspaceWrite {
writable_roots,
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
} => codex_protocol::protocol::SandboxPolicy::WorkspaceWrite {
writable_roots: writable_roots.clone(),
network_access: *network_access,
exclude_tmpdir_env_var: *exclude_tmpdir_env_var,
exclude_slash_tmp: *exclude_slash_tmp,
},
}
}
}
impl From<codex_protocol::protocol::SandboxPolicy> for SandboxPolicy {
fn from(value: codex_protocol::protocol::SandboxPolicy) -> Self {
match value {
codex_protocol::protocol::SandboxPolicy::DangerFullAccess => {
SandboxPolicy::DangerFullAccess
}
codex_protocol::protocol::SandboxPolicy::ReadOnly => SandboxPolicy::ReadOnly,
codex_protocol::protocol::SandboxPolicy::WorkspaceWrite {
writable_roots,
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
} => SandboxPolicy::WorkspaceWrite {
writable_roots,
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
},
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")] #[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")] #[ts(tag = "type")]
@@ -82,11 +184,11 @@ pub struct GetAccountResponse {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")] #[ts(export_to = "v2/")]
pub struct ListModelsParams { pub struct ModelListParams {
/// Optional page size; defaults to a reasonable server-side value.
pub page_size: Option<usize>,
/// Opaque pagination cursor returned by a previous call. /// Opaque pagination cursor returned by a previous call.
pub cursor: Option<String>, pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
pub limit: Option<u32>,
} }
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -114,17 +216,17 @@ pub struct ReasoningEffortOption {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")] #[ts(export_to = "v2/")]
pub struct ListModelsResponse { pub struct ModelListResponse {
pub items: Vec<Model>, pub data: Vec<Model>,
/// Opaque cursor to pass to the next call to continue after the last item. /// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return. /// If None, there are no more items to return.
pub next_cursor: Option<String>, pub next_cursor: Option<String>,
} }
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")] #[ts(export_to = "v2/")]
pub struct UploadFeedbackParams { pub struct FeedbackUploadParams {
pub classification: String, pub classification: String,
pub reason: Option<String>, pub reason: Option<String>,
pub conversation_id: Option<ConversationId>, pub conversation_id: Option<ConversationId>,
@@ -134,10 +236,101 @@ pub struct UploadFeedbackParams {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")] #[ts(export_to = "v2/")]
pub struct UploadFeedbackResponse { pub struct FeedbackUploadResponse {
pub thread_id: String, pub thread_id: String,
} }
// === Threads, Turns, and Items ===
// Thread APIs
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadStartParams {
pub model: Option<String>,
pub model_provider: Option<String>,
pub cwd: Option<String>,
pub approval_policy: Option<AskForApproval>,
pub sandbox: Option<SandboxMode>,
pub config: Option<HashMap<String, serde_json::Value>>,
pub base_instructions: Option<String>,
pub developer_instructions: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadStartResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadResumeParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadResumeResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadArchiveParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadArchiveResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadListParams {
/// Opaque pagination cursor returned by a previous call.
pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
pub limit: Option<u32>,
/// Optional provider filter; when set, only sessions recorded under these
/// providers are returned. When present but empty, includes all providers.
pub model_providers: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadListResponse {
pub data: Vec<Thread>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct Thread {
pub id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")] #[ts(export_to = "v2/")]
@@ -145,13 +338,6 @@ pub struct AccountUpdatedNotification {
pub auth_method: Option<AuthMode>, pub auth_method: Option<AuthMode>,
} }
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct Thread {
pub id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")] #[ts(export_to = "v2/")]

View File

@@ -20,6 +20,8 @@ use codex_app_server_protocol::ExecCommandApprovalParams;
use codex_app_server_protocol::ExecCommandApprovalResponse; use codex_app_server_protocol::ExecCommandApprovalResponse;
use codex_app_server_protocol::ExecOneOffCommandParams; use codex_app_server_protocol::ExecOneOffCommandParams;
use codex_app_server_protocol::ExecOneOffCommandResponse; use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::FeedbackUploadResponse;
use codex_app_server_protocol::FuzzyFileSearchParams; use codex_app_server_protocol::FuzzyFileSearchParams;
use codex_app_server_protocol::FuzzyFileSearchResponse; use codex_app_server_protocol::FuzzyFileSearchResponse;
use codex_app_server_protocol::GetAccountRateLimitsResponse; use codex_app_server_protocol::GetAccountRateLimitsResponse;
@@ -34,12 +36,12 @@ use codex_app_server_protocol::InterruptConversationResponse;
use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::ListConversationsParams; use codex_app_server_protocol::ListConversationsParams;
use codex_app_server_protocol::ListConversationsResponse; use codex_app_server_protocol::ListConversationsResponse;
use codex_app_server_protocol::ListModelsParams;
use codex_app_server_protocol::ListModelsResponse;
use codex_app_server_protocol::LoginApiKeyParams; use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::LoginApiKeyResponse; use codex_app_server_protocol::LoginApiKeyResponse;
use codex_app_server_protocol::LoginChatGptCompleteNotification; use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::LoginChatGptResponse; use codex_app_server_protocol::LoginChatGptResponse;
use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::ModelListResponse;
use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse; use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RemoveConversationListenerParams; use codex_app_server_protocol::RemoveConversationListenerParams;
@@ -56,8 +58,16 @@ use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SessionConfiguredNotification; use codex_app_server_protocol::SessionConfiguredNotification;
use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse; use codex_app_server_protocol::SetDefaultModelResponse;
use codex_app_server_protocol::UploadFeedbackParams; use codex_app_server_protocol::Thread;
use codex_app_server_protocol::UploadFeedbackResponse; use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::UserInfoResponse; use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserSavedConfig; use codex_app_server_protocol::UserSavedConfig;
use codex_backend_client::Client as BackendClient; use codex_backend_client::Client as BackendClient;
@@ -83,6 +93,7 @@ use codex_core::exec_env::create_env;
use codex_core::find_conversation_path_by_id_str; use codex_core::find_conversation_path_by_id_str;
use codex_core::get_platform_sandbox; use codex_core::get_platform_sandbox;
use codex_core::git_info::git_diff_to_remote; use codex_core::git_info::git_diff_to_remote;
use codex_core::parse_cursor;
use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event; use codex_core::protocol::Event;
use codex_core::protocol::EventMsg; use codex_core::protocol::EventMsg;
@@ -98,7 +109,7 @@ use codex_protocol::ConversationId;
use codex_protocol::config_types::ForcedLoginMethod; use codex_protocol::config_types::ForcedLoginMethod;
use codex_protocol::items::TurnItem; use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem; use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem; use codex_protocol::user_input::UserInput as CoreInputItem;
@@ -176,6 +187,26 @@ impl CodexMessageProcessor {
ClientRequest::Initialize { .. } => { ClientRequest::Initialize { .. } => {
panic!("Initialize should be handled in MessageProcessor"); panic!("Initialize should be handled in MessageProcessor");
} }
// === v2 Thread APIs ===
ClientRequest::ThreadStart { request_id, params } => {
self.thread_start(request_id, params).await;
}
ClientRequest::ThreadResume { request_id, params } => {
self.thread_resume(request_id, params).await;
}
ClientRequest::ThreadArchive { request_id, params } => {
self.thread_archive(request_id, params).await;
}
ClientRequest::ThreadList { request_id, params } => {
self.thread_list(request_id, params).await;
}
ClientRequest::ThreadCompact {
request_id,
params: _,
} => {
self.send_unimplemented_error(request_id, "thread/compact")
.await;
}
ClientRequest::NewConversation { request_id, params } => { ClientRequest::NewConversation { request_id, params } => {
// Do not tokio::spawn() to process new_conversation() // Do not tokio::spawn() to process new_conversation()
// asynchronously because we need to ensure the conversation is // asynchronously because we need to ensure the conversation is
@@ -188,7 +219,7 @@ impl CodexMessageProcessor {
ClientRequest::ListConversations { request_id, params } => { ClientRequest::ListConversations { request_id, params } => {
self.handle_list_conversations(request_id, params).await; self.handle_list_conversations(request_id, params).await;
} }
ClientRequest::ListModels { request_id, params } => { ClientRequest::ModelList { request_id, params } => {
self.list_models(request_id, params).await; self.list_models(request_id, params).await;
} }
ClientRequest::LoginAccount { ClientRequest::LoginAccount {
@@ -289,7 +320,7 @@ impl CodexMessageProcessor {
} => { } => {
self.get_account_rate_limits(request_id).await; self.get_account_rate_limits(request_id).await;
} }
ClientRequest::UploadFeedback { request_id, params } => { ClientRequest::FeedbackUpload { request_id, params } => {
self.upload_feedback(request_id, params).await; self.upload_feedback(request_id, params).await;
} }
} }
@@ -637,7 +668,7 @@ impl CodexMessageProcessor {
} }
} }
async fn fetch_account_rate_limits(&self) -> Result<RateLimitSnapshot, JSONRPCErrorError> { async fn fetch_account_rate_limits(&self) -> Result<CoreRateLimitSnapshot, JSONRPCErrorError> {
let Some(auth) = self.auth_manager.auth() else { let Some(auth) = self.auth_manager.auth() else {
return Err(JSONRPCErrorError { return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE, code: INVALID_REQUEST_ERROR_CODE,
@@ -816,8 +847,36 @@ impl CodexMessageProcessor {
} }
async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) { async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) {
let config = let NewConversationParams {
match derive_config_from_params(params, self.codex_linux_sandbox_exe.clone()).await { model,
model_provider,
profile,
cwd,
approval_policy,
sandbox: sandbox_mode,
config: cli_overrides,
base_instructions,
developer_instructions,
compact_prompt,
include_apply_patch_tool,
} = params;
let overrides = ConfigOverrides {
model,
config_profile: profile,
cwd: cwd.map(PathBuf::from),
approval_policy,
sandbox_mode,
model_provider,
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
base_instructions,
developer_instructions,
compact_prompt,
include_apply_patch_tool,
..Default::default()
};
let config = match derive_config_from_params(overrides, cli_overrides).await {
Ok(config) => config, Ok(config) => config,
Err(err) => { Err(err) => {
let error = JSONRPCErrorError { let error = JSONRPCErrorError {
@@ -856,6 +915,276 @@ impl CodexMessageProcessor {
} }
} }
async fn thread_start(&mut self, request_id: RequestId, params: ThreadStartParams) {
// Build ConfigOverrides directly from ThreadStartParams for config derivation.
let cli_overrides = params.config;
let overrides = ConfigOverrides {
model: params.model,
cwd: params.cwd.map(PathBuf::from),
approval_policy: params
.approval_policy
.map(codex_app_server_protocol::AskForApproval::to_core),
sandbox_mode: params
.sandbox
.map(codex_app_server_protocol::SandboxMode::to_core),
model_provider: params.model_provider,
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
base_instructions: params.base_instructions,
developer_instructions: params.developer_instructions,
..Default::default()
};
let config = match derive_config_from_params(overrides, cli_overrides).await {
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match self.conversation_manager.new_conversation(config).await {
Ok(new_conv) => {
let thread = Thread {
id: new_conv.conversation_id.to_string(),
};
let response = ThreadStartResponse {
thread: thread.clone(),
};
// Auto-attach a conversation listener when starting a thread.
// Use the same behavior as the v1 API with experimental_raw_events=false.
if let Err(err) = self
.attach_conversation_listener(new_conv.conversation_id, false)
.await
{
tracing::warn!(
"failed to attach listener for conversation {}: {}",
new_conv.conversation_id,
err.message
);
}
self.outgoing.send_response(request_id, response).await;
let notif = ThreadStartedNotification { thread };
self.outgoing
.send_server_notification(ServerNotification::ThreadStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating thread: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn thread_archive(&mut self, request_id: RequestId, params: ThreadArchiveParams) {
let conversation_id = match ConversationId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let rollout_path = match find_conversation_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
)
.await
{
Ok(Some(p)) => p,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for conversation id {conversation_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate conversation id {conversation_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match self
.archive_conversation_common(conversation_id, &rollout_path)
.await
{
Ok(()) => {
let response = ThreadArchiveResponse {};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
}
}
}
async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) {
let ThreadListParams {
cursor,
limit,
model_providers,
} = params;
let page_size = limit.unwrap_or(25).max(1) as usize;
let (summaries, next_cursor) = match self
.list_conversations_common(page_size, cursor, model_providers)
.await
{
Ok(r) => r,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let data = summaries
.into_iter()
.map(|s| Thread {
id: s.conversation_id.to_string(),
})
.collect();
let response = ThreadListResponse { data, next_cursor };
self.outgoing.send_response(request_id, response).await;
}
async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) {
let conversation_id = match ConversationId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let path = match find_conversation_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
)
.await
{
Ok(Some(p)) => p,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no rollout found for conversation id {conversation_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate conversation id {conversation_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let fallback_provider = self.config.model_provider_id.as_str();
let summary = match read_summary_from_rollout(&path, fallback_provider).await {
Ok(s) => s,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to load rollout `{}`: {err}", path.display()),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let initial_history = match RolloutRecorder::get_rollout_history(&summary.path).await {
Ok(initial_history) => initial_history,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
summary.path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match self
.conversation_manager
.resume_conversation_with_history(
self.config.as_ref().clone(),
initial_history,
self.auth_manager.clone(),
)
.await
{
Ok(_) => {
// Auto-attach a conversation listener when resuming a thread.
if let Err(err) = self
.attach_conversation_listener(conversation_id, false)
.await
{
tracing::warn!(
"failed to attach listener for conversation {}: {}",
conversation_id,
err.message
);
}
let response = ThreadResumeResponse {
thread: Thread {
id: conversation_id.to_string(),
},
};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming thread: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn get_conversation_summary( async fn get_conversation_summary(
&self, &self,
request_id: RequestId, request_id: RequestId,
@@ -925,11 +1254,7 @@ impl CodexMessageProcessor {
model_providers: model_provider, model_providers: model_provider,
} = params; } = params;
let page_size = page_size.unwrap_or(25); let page_size = page_size.unwrap_or(25);
// Decode the optional cursor string to a Cursor via serde (Cursor implements Deserialize from string) let cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
let cursor_obj: Option<RolloutCursor> = match cursor {
Some(s) => serde_json::from_str::<RolloutCursor>(&format!("\"{s}\"")).ok(),
None => None,
};
let cursor_ref = cursor_obj.as_ref(); let cursor_ref = cursor_obj.as_ref();
let model_provider_filter = match model_provider { let model_provider_filter = match model_provider {
Some(providers) => { Some(providers) => {
@@ -973,33 +1298,86 @@ impl CodexMessageProcessor {
.collect(); .collect();
// Encode next_cursor as a plain string // Encode next_cursor as a plain string
let next_cursor = match page.next_cursor { let next_cursor = page
Some(c) => match serde_json::to_value(&c) { .next_cursor
Ok(serde_json::Value::String(s)) => Some(s), .and_then(|cursor| serde_json::to_value(&cursor).ok())
_ => None, .and_then(|value| value.as_str().map(str::to_owned));
},
None => None,
};
let response = ListConversationsResponse { items, next_cursor }; let response = ListConversationsResponse { items, next_cursor };
self.outgoing.send_response(request_id, response).await; self.outgoing.send_response(request_id, response).await;
} }
async fn list_models(&self, request_id: RequestId, params: ListModelsParams) { async fn list_conversations_common(
let ListModelsParams { page_size, cursor } = params; &self,
page_size: usize,
cursor: Option<String>,
model_providers: Option<Vec<String>>,
) -> Result<(Vec<ConversationSummary>, Option<String>), JSONRPCErrorError> {
let cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
let cursor_ref = cursor_obj.as_ref();
let model_provider_filter = match model_providers {
Some(providers) => {
if providers.is_empty() {
None
} else {
Some(providers)
}
}
None => Some(vec![self.config.model_provider_id.clone()]),
};
let fallback_provider = self.config.model_provider_id.clone();
let page = match RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_ref,
INTERACTIVE_SESSION_SOURCES,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
)
.await
{
Ok(p) => p,
Err(err) => {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
});
}
};
let items = page
.items
.into_iter()
.filter_map(|it| extract_conversation_summary(it.path, &it.head, &fallback_provider))
.collect::<Vec<_>>();
let next_cursor = page
.next_cursor
.and_then(|cursor| serde_json::to_value(&cursor).ok())
.and_then(|value| value.as_str().map(str::to_owned));
Ok((items, next_cursor))
}
async fn list_models(&self, request_id: RequestId, params: ModelListParams) {
let ModelListParams { limit, cursor } = params;
let models = supported_models(); let models = supported_models();
let total = models.len(); let total = models.len();
if total == 0 { if total == 0 {
let response = ListModelsResponse { let response = ModelListResponse {
items: Vec::new(), data: Vec::new(),
next_cursor: None, next_cursor: None,
}; };
self.outgoing.send_response(request_id, response).await; self.outgoing.send_response(request_id, response).await;
return; return;
} }
let effective_page_size = page_size.unwrap_or(total).max(1).min(total); let effective_page_size = limit.unwrap_or(total as u32).max(1) as usize;
let effective_page_size = effective_page_size.min(total);
let start = match cursor { let start = match cursor {
Some(cursor) => match cursor.parse::<usize>() { Some(cursor) => match cursor.parse::<usize>() {
Ok(idx) => idx, Ok(idx) => idx,
@@ -1033,7 +1411,10 @@ impl CodexMessageProcessor {
} else { } else {
None None
}; };
let response = ListModelsResponse { items, next_cursor }; let response = ModelListResponse {
data: items,
next_cursor,
};
self.outgoing.send_response(request_id, response).await; self.outgoing.send_response(request_id, response).await;
} }
@@ -1052,7 +1433,36 @@ impl CodexMessageProcessor {
// Derive a Config using the same logic as new conversation, honoring overrides if provided. // Derive a Config using the same logic as new conversation, honoring overrides if provided.
let config = match overrides { let config = match overrides {
Some(overrides) => { Some(overrides) => {
derive_config_from_params(overrides, self.codex_linux_sandbox_exe.clone()).await let NewConversationParams {
model,
model_provider,
profile,
cwd,
approval_policy,
sandbox,
config: cli_overrides,
base_instructions,
developer_instructions,
compact_prompt,
include_apply_patch_tool,
} = overrides;
let overrides = ConfigOverrides {
model,
config_profile: profile,
cwd: cwd.map(PathBuf::from),
approval_policy,
sandbox_mode: sandbox,
model_provider,
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
base_instructions,
developer_instructions,
compact_prompt,
include_apply_patch_tool,
..Default::default()
};
derive_config_from_params(overrides, cli_overrides).await
} }
None => Ok(self.config.as_ref().clone()), None => Ok(self.config.as_ref().clone()),
}; };
@@ -1196,82 +1606,103 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await; self.outgoing.send_error(request_id, error).await;
} }
async fn archive_conversation(&self, request_id: RequestId, params: ArchiveConversationParams) { async fn archive_conversation(
&mut self,
request_id: RequestId,
params: ArchiveConversationParams,
) {
let ArchiveConversationParams { let ArchiveConversationParams {
conversation_id, conversation_id,
rollout_path, rollout_path,
} = params; } = params;
// Verify that the rollout path is in the sessions directory or else match self
// a malicious client could specify an arbitrary path. .archive_conversation_common(conversation_id, &rollout_path)
.await
{
Ok(()) => {
tracing::info!("thread/archive succeeded for {conversation_id}");
let response = ArchiveConversationResponse {};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
tracing::warn!(
"thread/archive failed for {conversation_id}: {}",
err.message
);
self.outgoing.send_error(request_id, err).await;
}
}
}
async fn archive_conversation_common(
&mut self,
conversation_id: ConversationId,
rollout_path: &Path,
) -> Result<(), JSONRPCErrorError> {
// Verify rollout_path is under sessions dir.
let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR); let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR);
let canonical_sessions_dir = match tokio::fs::canonicalize(&rollout_folder).await { let canonical_sessions_dir = match tokio::fs::canonicalize(&rollout_folder).await {
Ok(path) => path, Ok(path) => path,
Err(err) => { Err(err) => {
let error = JSONRPCErrorError { return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE, code: INTERNAL_ERROR_CODE,
message: format!( message: format!(
"failed to archive conversation: unable to resolve sessions directory: {err}" "failed to archive conversation: unable to resolve sessions directory: {err}"
), ),
data: None, data: None,
}; });
self.outgoing.send_error(request_id, error).await;
return;
} }
}; };
let canonical_rollout_path = tokio::fs::canonicalize(&rollout_path).await; let canonical_rollout_path = tokio::fs::canonicalize(rollout_path).await;
let canonical_rollout_path = if let Ok(path) = canonical_rollout_path let canonical_rollout_path = if let Ok(path) = canonical_rollout_path
&& path.starts_with(&canonical_sessions_dir) && path.starts_with(&canonical_sessions_dir)
{ {
path path
} else { } else {
let error = JSONRPCErrorError { return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE, code: INVALID_REQUEST_ERROR_CODE,
message: format!( message: format!(
"rollout path `{}` must be in sessions directory", "rollout path `{}` must be in sessions directory",
rollout_path.display() rollout_path.display()
), ),
data: None, data: None,
}; });
self.outgoing.send_error(request_id, error).await;
return;
}; };
// Verify file name matches conversation id.
let required_suffix = format!("{conversation_id}.jsonl"); let required_suffix = format!("{conversation_id}.jsonl");
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else { let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
let error = JSONRPCErrorError { return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE, code: INVALID_REQUEST_ERROR_CODE,
message: format!( message: format!(
"rollout path `{}` missing file name", "rollout path `{}` missing file name",
rollout_path.display() rollout_path.display()
), ),
data: None, data: None,
});
}; };
self.outgoing.send_error(request_id, error).await;
return;
};
if !file_name if !file_name
.to_string_lossy() .to_string_lossy()
.ends_with(required_suffix.as_str()) .ends_with(required_suffix.as_str())
{ {
let error = JSONRPCErrorError { return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE, code: INVALID_REQUEST_ERROR_CODE,
message: format!( message: format!(
"rollout path `{}` does not match conversation id {conversation_id}", "rollout path `{}` does not match conversation id {conversation_id}",
rollout_path.display() rollout_path.display()
), ),
data: None, data: None,
}; });
self.outgoing.send_error(request_id, error).await;
return;
} }
let removed_conversation = self // If the conversation is active, request shutdown and wait briefly.
if let Some(conversation) = self
.conversation_manager .conversation_manager
.remove_conversation(&conversation_id) .remove_conversation(&conversation_id)
.await; .await
if let Some(conversation) = removed_conversation { {
info!("conversation {conversation_id} was active; shutting down"); info!("conversation {conversation_id} was active; shutting down");
let conversation_clone = conversation.clone(); let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new()); let notify = Arc::new(tokio::sync::Notify::new());
@@ -1280,20 +1711,24 @@ impl CodexMessageProcessor {
// Establish the listener for ShutdownComplete before submitting // Establish the listener for ShutdownComplete before submitting
// Shutdown so it is not missed. // Shutdown so it is not missed.
let is_shutdown = tokio::spawn(async move { let is_shutdown = tokio::spawn(async move {
// Create the notified future outside the loop to avoid losing notifications.
let notified = notify_clone.notified();
tokio::pin!(notified);
loop { loop {
select! { select! {
_ = notify_clone.notified() => { _ = &mut notified => { break; }
break;
}
event = conversation_clone.next_event() => { event = conversation_clone.next_event() => {
if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) { match event {
break; Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
// Break on errors to avoid tight loops when the agent loop has exited.
Err(_) => { break; }
} }
} }
} }
} }
}); });
// Request shutdown. // Request shutdown.
match conversation.submit(Op::Shutdown).await { match conversation.submit(Op::Shutdown).await {
Ok(_) => { Ok(_) => {
@@ -1304,20 +1739,21 @@ impl CodexMessageProcessor {
} }
_ = tokio::time::sleep(Duration::from_secs(10)) => { _ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive"); warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
notify.notify_one(); // Wake any waiter; use notify_waiters to avoid missing the signal.
} notify.notify_waiters();
}
}
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_one();
// Perhaps we lost a shutdown race, so let's continue to // Perhaps we lost a shutdown race, so let's continue to
// clean up the .jsonl file. // clean up the .jsonl file.
} }
} }
} }
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_waiters();
}
}
}
// Move the .jsonl file to the archived sessions subdir. // Move the rollout file to archived.
let result: std::io::Result<()> = async { let result: std::io::Result<()> = async {
let archive_folder = self let archive_folder = self
.config .config
@@ -1329,20 +1765,11 @@ impl CodexMessageProcessor {
} }
.await; .await;
match result { result.map_err(|err| JSONRPCErrorError {
Ok(()) => {
let response = ArchiveConversationResponse {};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE, code: INTERNAL_ERROR_CODE,
message: format!("failed to archive conversation: {err}"), message: format!("failed to archive conversation: {err}"),
data: None, data: None,
}; })
self.outgoing.send_error(request_id, error).await;
}
}
} }
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) { async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
@@ -1478,24 +1905,70 @@ impl CodexMessageProcessor {
conversation_id, conversation_id,
experimental_raw_events, experimental_raw_events,
} = params; } = params;
let Ok(conversation) = self
.conversation_manager match self
.get_conversation(conversation_id) .attach_conversation_listener(conversation_id, experimental_raw_events)
.await .await
else { {
Ok(subscription_id) => {
let response = AddConversationSubscriptionResponse { subscription_id };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
}
}
}
async fn remove_conversation_listener(
&mut self,
request_id: RequestId,
params: RemoveConversationListenerParams,
) {
let RemoveConversationListenerParams { subscription_id } = params;
match self.conversation_listeners.remove(&subscription_id) {
Some(sender) => {
// Signal the spawned task to exit and acknowledge.
let _ = sender.send(());
let response = RemoveConversationSubscriptionResponse {};
self.outgoing.send_response(request_id, response).await;
}
None => {
let error = JSONRPCErrorError { let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE, code: INVALID_REQUEST_ERROR_CODE,
message: format!("conversation not found: {conversation_id}"), message: format!("subscription not found: {subscription_id}"),
data: None, data: None,
}; };
self.outgoing.send_error(request_id, error).await; self.outgoing.send_error(request_id, error).await;
return; }
}
}
async fn attach_conversation_listener(
&mut self,
conversation_id: ConversationId,
experimental_raw_events: bool,
) -> Result<Uuid, JSONRPCErrorError> {
let conversation = match self
.conversation_manager
.get_conversation(conversation_id)
.await
{
Ok(conv) => conv,
Err(_) => {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("conversation not found: {conversation_id}"),
data: None,
});
}
}; };
let subscription_id = Uuid::new_v4(); let subscription_id = Uuid::new_v4();
let (cancel_tx, mut cancel_rx) = oneshot::channel(); let (cancel_tx, mut cancel_rx) = oneshot::channel();
self.conversation_listeners self.conversation_listeners
.insert(subscription_id, cancel_tx); .insert(subscription_id, cancel_tx);
let outgoing_for_task = self.outgoing.clone(); let outgoing_for_task = self.outgoing.clone();
let pending_interrupts = self.pending_interrupts.clone(); let pending_interrupts = self.pending_interrupts.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -1535,45 +2008,32 @@ impl CodexMessageProcessor {
continue; continue;
} }
}; };
params.insert("conversationId".to_string(), conversation_id.to_string().into()); params.insert(
"conversationId".to_string(),
conversation_id.to_string().into(),
);
outgoing_for_task.send_notification(OutgoingNotification { outgoing_for_task
.send_notification(OutgoingNotification {
method, method,
params: Some(params.into()), params: Some(params.into()),
}) })
.await; .await;
apply_bespoke_event_handling(event.clone(), conversation_id, conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone()).await; apply_bespoke_event_handling(
event.clone(),
conversation_id,
conversation.clone(),
outgoing_for_task.clone(),
pending_interrupts.clone(),
)
.await;
} }
} }
} }
}); });
let response = AddConversationSubscriptionResponse { subscription_id };
self.outgoing.send_response(request_id, response).await;
}
async fn remove_conversation_listener( Ok(subscription_id)
&mut self,
request_id: RequestId,
params: RemoveConversationListenerParams,
) {
let RemoveConversationListenerParams { subscription_id } = params;
match self.conversation_listeners.remove(&subscription_id) {
Some(sender) => {
// Signal the spawned task to exit and acknowledge.
let _ = sender.send(());
let response = RemoveConversationSubscriptionResponse {};
self.outgoing.send_response(request_id, response).await;
}
None => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("subscription not found: {subscription_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
} }
async fn git_diff_to_origin(&self, request_id: RequestId, cwd: PathBuf) { async fn git_diff_to_origin(&self, request_id: RequestId, cwd: PathBuf) {
@@ -1637,8 +2097,8 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await; self.outgoing.send_response(request_id, response).await;
} }
async fn upload_feedback(&self, request_id: RequestId, params: UploadFeedbackParams) { async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) {
let UploadFeedbackParams { let FeedbackUploadParams {
classification, classification,
reason, reason,
conversation_id, conversation_id,
@@ -1683,7 +2143,7 @@ impl CodexMessageProcessor {
match upload_result { match upload_result {
Ok(()) => { Ok(()) => {
let response = UploadFeedbackResponse { thread_id }; let response = FeedbackUploadResponse { thread_id };
self.outgoing.send_response(request_id, response).await; self.outgoing.send_response(request_id, response).await;
} }
Err(err) => { Err(err) => {
@@ -1797,41 +2257,9 @@ async fn apply_bespoke_event_handling(
} }
async fn derive_config_from_params( async fn derive_config_from_params(
params: NewConversationParams, overrides: ConfigOverrides,
codex_linux_sandbox_exe: Option<PathBuf>, cli_overrides: Option<std::collections::HashMap<String, serde_json::Value>>,
) -> std::io::Result<Config> { ) -> std::io::Result<Config> {
let NewConversationParams {
model,
model_provider,
profile,
cwd,
approval_policy,
sandbox: sandbox_mode,
config: cli_overrides,
base_instructions,
developer_instructions,
compact_prompt,
include_apply_patch_tool,
} = params;
let overrides = ConfigOverrides {
model,
review_model: None,
config_profile: profile,
cwd: cwd.map(PathBuf::from),
approval_policy,
sandbox_mode,
model_provider,
codex_linux_sandbox_exe,
base_instructions,
developer_instructions,
compact_prompt,
include_apply_patch_tool,
show_raw_agent_reasoning: None,
tools_web_search_request: None,
experimental_sandbox_command_assessment: None,
additional_writable_roots: Vec::new(),
};
let cli_overrides = cli_overrides let cli_overrides = cli_overrides
.unwrap_or_default() .unwrap_or_default()
.into_iter() .into_iter()

View File

@@ -13,6 +13,7 @@ base64 = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
codex-app-server-protocol = { workspace = true } codex-app-server-protocol = { workspace = true }
codex-core = { workspace = true } codex-core = { workspace = true }
codex-protocol = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tokio = { workspace = true, features = [ tokio = { workspace = true, features = [
@@ -21,4 +22,5 @@ tokio = { workspace = true, features = [
"process", "process",
"rt-multi-thread", "rt-multi-thread",
] } ] }
uuid = { workspace = true }
wiremock = { workspace = true } wiremock = { workspace = true }

View File

@@ -2,6 +2,7 @@ mod auth_fixtures;
mod mcp_process; mod mcp_process;
mod mock_model_server; mod mock_model_server;
mod responses; mod responses;
mod rollout;
pub use auth_fixtures::ChatGptAuthFixture; pub use auth_fixtures::ChatGptAuthFixture;
pub use auth_fixtures::ChatGptIdTokenClaims; pub use auth_fixtures::ChatGptIdTokenClaims;
@@ -10,9 +11,11 @@ pub use auth_fixtures::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::JSONRPCResponse;
pub use mcp_process::McpProcess; pub use mcp_process::McpProcess;
pub use mock_model_server::create_mock_chat_completions_server; pub use mock_model_server::create_mock_chat_completions_server;
pub use mock_model_server::create_mock_chat_completions_server_unchecked;
pub use responses::create_apply_patch_sse_response; pub use responses::create_apply_patch_sse_response;
pub use responses::create_final_assistant_message_sse_response; pub use responses::create_final_assistant_message_sse_response;
pub use responses::create_shell_sse_response; pub use responses::create_shell_sse_response;
pub use rollout::create_fake_rollout;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
pub fn to_response<T: DeserializeOwned>(response: JSONRPCResponse) -> anyhow::Result<T> { pub fn to_response<T: DeserializeOwned>(response: JSONRPCResponse) -> anyhow::Result<T> {

View File

@@ -17,12 +17,13 @@ use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::CancelLoginChatGptParams; use codex_app_server_protocol::CancelLoginChatGptParams;
use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::GetAuthStatusParams; use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::InterruptConversationParams; use codex_app_server_protocol::InterruptConversationParams;
use codex_app_server_protocol::ListConversationsParams; use codex_app_server_protocol::ListConversationsParams;
use codex_app_server_protocol::ListModelsParams;
use codex_app_server_protocol::LoginApiKeyParams; use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::RemoveConversationListenerParams; use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::ResumeConversationParams; use codex_app_server_protocol::ResumeConversationParams;
@@ -30,7 +31,10 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserTurnParams; use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::UploadFeedbackParams; use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCMessage;
@@ -246,7 +250,7 @@ impl McpProcess {
/// Send a `feedback/upload` JSON-RPC request. /// Send a `feedback/upload` JSON-RPC request.
pub async fn send_upload_feedback_request( pub async fn send_upload_feedback_request(
&mut self, &mut self,
params: UploadFeedbackParams, params: FeedbackUploadParams,
) -> anyhow::Result<i64> { ) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?); let params = Some(serde_json::to_value(params)?);
self.send_request("feedback/upload", params).await self.send_request("feedback/upload", params).await
@@ -275,10 +279,46 @@ impl McpProcess {
self.send_request("listConversations", params).await self.send_request("listConversations", params).await
} }
/// Send a `thread/start` JSON-RPC request.
pub async fn send_thread_start_request(
&mut self,
params: ThreadStartParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/start", params).await
}
/// Send a `thread/resume` JSON-RPC request.
pub async fn send_thread_resume_request(
&mut self,
params: ThreadResumeParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/resume", params).await
}
/// Send a `thread/archive` JSON-RPC request.
pub async fn send_thread_archive_request(
&mut self,
params: ThreadArchiveParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/archive", params).await
}
/// Send a `thread/list` JSON-RPC request.
pub async fn send_thread_list_request(
&mut self,
params: ThreadListParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/list", params).await
}
/// Send a `model/list` JSON-RPC request. /// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request( pub async fn send_list_models_request(
&mut self, &mut self,
params: ListModelsParams, params: ModelListParams,
) -> anyhow::Result<i64> { ) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?); let params = Some(serde_json::to_value(params)?);
self.send_request("model/list", params).await self.send_request("model/list", params).await

View File

@@ -29,6 +29,25 @@ pub async fn create_mock_chat_completions_server(responses: Vec<String>) -> Mock
server server
} }
/// Same as `create_mock_chat_completions_server` but does not enforce an
/// expectation on the number of calls.
pub async fn create_mock_chat_completions_server_unchecked(responses: Vec<String>) -> MockServer {
let server = MockServer::start().await;
let seq_responder = SeqResponder {
num_calls: AtomicUsize::new(0),
responses,
};
Mock::given(method("POST"))
.and(path("/v1/chat/completions"))
.respond_with(seq_responder)
.mount(&server)
.await;
server
}
struct SeqResponder { struct SeqResponder {
num_calls: AtomicUsize, num_calls: AtomicUsize,
responses: Vec<String>, responses: Vec<String>,

View File

@@ -0,0 +1,82 @@
use anyhow::Result;
use codex_protocol::ConversationId;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionSource;
use serde_json::json;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use uuid::Uuid;
/// Create a minimal rollout file under `CODEX_HOME/sessions/YYYY/MM/DD/`.
///
/// - `filename_ts` is the filename timestamp component in `YYYY-MM-DDThh-mm-ss` format.
/// - `meta_rfc3339` is the envelope timestamp used in JSON lines.
/// - `preview` is the user message preview text.
/// - `model_provider` optionally sets the provider in the session meta payload.
///
/// Returns the generated conversation/session UUID as a string.
pub fn create_fake_rollout(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
) -> Result<String> {
let uuid = Uuid::new_v4();
let uuid_str = uuid.to_string();
let conversation_id = ConversationId::from_string(&uuid_str)?;
// sessions/YYYY/MM/DD derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4];
let month = &filename_ts[5..7];
let day = &filename_ts[8..10];
let dir = codex_home.join("sessions").join(year).join(month).join(day);
fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
// Build JSONL lines
let payload = serde_json::to_value(SessionMeta {
id: conversation_id,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
instructions: None,
source: SessionSource::Cli,
model_provider: model_provider.map(str::to_string),
})?;
let lines = [
json!({
"timestamp": meta_rfc3339,
"type": "session_meta",
"payload": payload
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": preview}]
}
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type":"event_msg",
"payload": {
"type":"user_message",
"message": preview,
"kind": "plain"
}
})
.to_string(),
];
fs::write(file_path, lines.join("\n") + "\n")?;
Ok(uuid_str)
}

View File

@@ -1,5 +1,6 @@
use anyhow::Result; use anyhow::Result;
use app_test_support::McpProcess; use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::JSONRPCResponse;
@@ -15,12 +16,8 @@ use codex_core::protocol::EventMsg;
use codex_protocol::models::ContentItem; use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem; use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use serde_json::json;
use std::fs;
use std::path::Path;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::time::timeout; use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
@@ -357,70 +354,3 @@ async fn test_list_and_resume_conversations() -> Result<()> {
Ok(()) Ok(())
} }
fn create_fake_rollout(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
) -> Result<()> {
let uuid = Uuid::new_v4();
// sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4];
let month = &filename_ts[5..7];
let day = &filename_ts[8..10];
let dir = codex_home.join("sessions").join(year).join(month).join(day);
fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new();
// Meta line with timestamp (flattened meta in payload for new schema)
let mut payload = json!({
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null,
});
if let Some(provider) = model_provider {
payload["model_provider"] = json!(provider);
}
lines.push(
json!({
"timestamp": meta_rfc3339,
"type": "session_meta",
"payload": payload
})
.to_string(),
);
// Minimal user message entry as a persisted response item (with envelope timestamp)
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": preview}]
}
})
.to_string(),
);
// Add a matching user message event line to satisfy filters
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"event_msg",
"payload": {
"type":"user_message",
"message": preview,
"kind": "plain"
}
})
.to_string(),
);
fs::write(file_path, lines.join("\n") + "\n")?;
Ok(())
}

View File

@@ -6,9 +6,9 @@ use app_test_support::McpProcess;
use app_test_support::to_response; use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ListModelsParams;
use codex_app_server_protocol::ListModelsResponse;
use codex_app_server_protocol::Model; use codex_app_server_protocol::Model;
use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::ModelListResponse;
use codex_app_server_protocol::ReasoningEffortOption; use codex_app_server_protocol::ReasoningEffortOption;
use codex_app_server_protocol::RequestId; use codex_app_server_protocol::RequestId;
use codex_protocol::config_types::ReasoningEffort; use codex_protocol::config_types::ReasoningEffort;
@@ -27,8 +27,8 @@ async fn list_models_returns_all_models_with_large_limit() -> Result<()> {
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp let request_id = mcp
.send_list_models_request(ListModelsParams { .send_list_models_request(ModelListParams {
page_size: Some(100), limit: Some(100),
cursor: None, cursor: None,
}) })
.await?; .await?;
@@ -39,7 +39,10 @@ async fn list_models_returns_all_models_with_large_limit() -> Result<()> {
) )
.await??; .await??;
let ListModelsResponse { items, next_cursor } = to_response::<ListModelsResponse>(response)?; let ModelListResponse {
data: items,
next_cursor,
} = to_response::<ModelListResponse>(response)?;
let expected_models = vec![ let expected_models = vec![
Model { Model {
@@ -111,8 +114,8 @@ async fn list_models_pagination_works() -> Result<()> {
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let first_request = mcp let first_request = mcp
.send_list_models_request(ListModelsParams { .send_list_models_request(ModelListParams {
page_size: Some(1), limit: Some(1),
cursor: None, cursor: None,
}) })
.await?; .await?;
@@ -123,18 +126,18 @@ async fn list_models_pagination_works() -> Result<()> {
) )
.await??; .await??;
let ListModelsResponse { let ModelListResponse {
items: first_items, data: first_items,
next_cursor: first_cursor, next_cursor: first_cursor,
} = to_response::<ListModelsResponse>(first_response)?; } = to_response::<ModelListResponse>(first_response)?;
assert_eq!(first_items.len(), 1); assert_eq!(first_items.len(), 1);
assert_eq!(first_items[0].id, "gpt-5-codex"); assert_eq!(first_items[0].id, "gpt-5-codex");
let next_cursor = first_cursor.ok_or_else(|| anyhow!("cursor for second page"))?; let next_cursor = first_cursor.ok_or_else(|| anyhow!("cursor for second page"))?;
let second_request = mcp let second_request = mcp
.send_list_models_request(ListModelsParams { .send_list_models_request(ModelListParams {
page_size: Some(1), limit: Some(1),
cursor: Some(next_cursor.clone()), cursor: Some(next_cursor.clone()),
}) })
.await?; .await?;
@@ -145,10 +148,10 @@ async fn list_models_pagination_works() -> Result<()> {
) )
.await??; .await??;
let ListModelsResponse { let ModelListResponse {
items: second_items, data: second_items,
next_cursor: second_cursor, next_cursor: second_cursor,
} = to_response::<ListModelsResponse>(second_response)?; } = to_response::<ModelListResponse>(second_response)?;
assert_eq!(second_items.len(), 1); assert_eq!(second_items.len(), 1);
assert_eq!(second_items[0].id, "gpt-5"); assert_eq!(second_items[0].id, "gpt-5");
@@ -164,8 +167,8 @@ async fn list_models_rejects_invalid_cursor() -> Result<()> {
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp let request_id = mcp
.send_list_models_request(ListModelsParams { .send_list_models_request(ModelListParams {
page_size: None, limit: None,
cursor: Some("invalid".to_string()), cursor: Some("invalid".to_string()),
}) })
.await?; .await?;

View File

@@ -1,2 +1,6 @@
// v2 test suite modules // v2 test suite modules
mod account; mod account;
mod thread_archive;
mod thread_list;
mod thread_resume;
mod thread_start;

View File

@@ -0,0 +1,93 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::find_conversation_path_by_id_str;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
assert!(!thread.id.is_empty());
// Locate the rollout path recorded for this thread id.
let rollout_path = find_conversation_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
assert!(
rollout_path.exists(),
"expected {} to exist",
rollout_path.display()
);
// Archive the thread.
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
// Verify file moved.
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
// The archived file keeps the original filename (rollout-...-<id>.jsonl).
let archived_rollout_path =
archived_directory.join(rollout_path.file_name().expect("rollout file name"));
assert!(
!rollout_path.exists(),
"expected rollout path {} to be moved",
rollout_path.display()
);
assert!(
archived_rollout_path.exists(),
"expected archived rollout path {} to exist",
archived_rollout_path.display()
);
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
"#
}

View File

@@ -0,0 +1,205 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_list_basic_empty() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// List threads in an empty CODEX_HOME; should return an empty page with nextCursor: null.
let list_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(10),
model_providers: None,
})
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(list_resp)?;
assert!(data.is_empty());
assert_eq!(next_cursor, None);
Ok(())
}
// Minimal config.toml for listing.
fn create_minimal_config(codex_home: &std::path::Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
r#"
model = "mock-model"
approval_policy = "never"
"#,
)
}
#[tokio::test]
async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
// Create three rollouts so we can paginate with limit=2.
let _a = create_fake_rollout(
codex_home.path(),
"2025-01-02T12-00-00",
"2025-01-02T12:00:00Z",
"Hello",
Some("mock_provider"),
)?;
let _b = create_fake_rollout(
codex_home.path(),
"2025-01-01T13-00-00",
"2025-01-01T13:00:00Z",
"Hello",
Some("mock_provider"),
)?;
let _c = create_fake_rollout(
codex_home.path(),
"2025-01-01T12-00-00",
"2025-01-01T12:00:00Z",
"Hello",
Some("mock_provider"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Page 1: limit 2 → expect next_cursor Some.
let page1_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(2),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let page1_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(page1_id)),
)
.await??;
let ThreadListResponse {
data: data1,
next_cursor: cursor1,
} = to_response::<ThreadListResponse>(page1_resp)?;
assert_eq!(data1.len(), 2);
let cursor1 = cursor1.expect("expected nextCursor on first page");
// Page 2: with cursor → expect next_cursor None when no more results.
let page2_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: Some(cursor1),
limit: Some(2),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let page2_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(page2_id)),
)
.await??;
let ThreadListResponse {
data: data2,
next_cursor: cursor2,
} = to_response::<ThreadListResponse>(page2_resp)?;
assert!(data2.len() <= 2);
assert_eq!(cursor2, None, "expected nextCursor to be null on last page");
Ok(())
}
#[tokio::test]
async fn thread_list_respects_provider_filter() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
// Create rollouts under two providers.
let _a = create_fake_rollout(
codex_home.path(),
"2025-01-02T10-00-00",
"2025-01-02T10:00:00Z",
"X",
Some("mock_provider"),
)?; // mock_provider
// one with a different provider
let uuid = Uuid::new_v4();
let dir = codex_home
.path()
.join("sessions")
.join("2025")
.join("01")
.join("02");
std::fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-2025-01-02T11-00-00-{uuid}.jsonl"));
let lines = [
json!({
"timestamp": "2025-01-02T11:00:00Z",
"type": "session_meta",
"payload": {
"id": uuid,
"timestamp": "2025-01-02T11:00:00Z",
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null,
"source": "vscode",
"model_provider": "other_provider"
}
})
.to_string(),
json!({
"timestamp": "2025-01-02T11:00:00Z",
"type":"response_item",
"payload": {"type":"message","role":"user","content":[{"type":"input_text","text":"X"}]}
})
.to_string(),
json!({
"timestamp": "2025-01-02T11:00:00Z",
"type":"event_msg",
"payload": {"type":"user_message","message":"X","kind":"plain"}
})
.to_string(),
];
std::fs::write(file_path, lines.join("\n") + "\n")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Filter to only other_provider; expect 1 item, nextCursor None.
let list_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(10),
model_providers: Some(vec!["other_provider".to_string()]),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(resp)?;
assert_eq!(data.len(), 1);
assert_eq!(next_cursor, None);
Ok(())
}

View File

@@ -0,0 +1,79 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_resume_returns_existing_thread() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5-codex".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
// Resume it via v2 API.
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread: resumed } =
to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed.id, thread.id);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -0,0 +1,81 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
// Provide a mock server and config so model wiring is valid.
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
// Start server and initialize.
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a v2 thread with an explicit model override.
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5".to_string()),
..Default::default()
})
.await?;
// Expect a proper JSON-RPC response with a thread id.
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(resp)?;
assert!(!thread.id.is_empty(), "thread id should not be empty");
// A corresponding thread/started notification should arrive.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
let started: ThreadStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread.id, thread.id);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -75,6 +75,7 @@ pub use rollout::find_conversation_path_by_id_str;
pub use rollout::list::ConversationItem; pub use rollout::list::ConversationItem;
pub use rollout::list::ConversationsPage; pub use rollout::list::ConversationsPage;
pub use rollout::list::Cursor; pub use rollout::list::Cursor;
pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary; pub use rollout::list::read_head_for_summary;
mod function_tool; mod function_tool;
mod state; mod state;

View File

@@ -273,7 +273,7 @@ async fn traverse_directories_for_paths(
/// Pagination cursor token format: "<file_ts>|<uuid>" where `file_ts` matches the /// Pagination cursor token format: "<file_ts>|<uuid>" where `file_ts` matches the
/// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames. /// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames.
/// The cursor orders files by timestamp desc, then UUID desc. /// The cursor orders files by timestamp desc, then UUID desc.
fn parse_cursor(token: &str) -> Option<Cursor> { pub fn parse_cursor(token: &str) -> Option<Cursor> {
let (file_ts, uuid_str) = token.split_once('|')?; let (file_ts, uuid_str) = token.split_once('|')?;
let Ok(uuid) = Uuid::parse_str(uuid_str) else { let Ok(uuid) = Uuid::parse_str(uuid_str) else {

View File

@@ -24,6 +24,10 @@ use responses::start_mock_server;
use std::time::Duration; use std::time::Duration;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "flaky on ubuntu-24.04-arm - aarch64-unknown-linux-gnu"]
// The notify script gets far enough to create (and therefore surface) the file,
// but hasnt flushed the JSON yet. Reading an empty file produces EOF while parsing
// a value at line 1 column 0. May be caused by a slow runner.
async fn summarize_context_three_requests_and_instructions() -> anyhow::Result<()> { async fn summarize_context_three_requests_and_instructions() -> anyhow::Result<()> {
skip_if_no_network!(Ok(())); skip_if_no_network!(Ok(()));