From 2ab1650d4d73b789cdd5b76a0debb8d9d8f0e7d0 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Wed, 5 Nov 2025 12:28:43 -0800 Subject: [PATCH] [app-server] feat: v2 Thread APIs (#6214) Implements: ``` thread/list thread/start thread/resume thread/archive ``` along with their integration tests. These are relatively light wrappers around the existing core logic, and changes to core logic are minimal. However, an improvement made for developer ergonomics: - `thread/start` and `thread/resume` automatically attaches a conversation listener internally, so clients don't have to make a separate `AddConversationListener` call like they do today. For consistency, also updated `model/list` and `feedback/upload` (naming conventions, list API params). --- codex-rs/Cargo.lock | 2 + .../src/protocol/common.rs | 55 +- .../app-server-protocol/src/protocol/v2.rs | 218 ++++- .../app-server/src/codex_message_processor.rs | 762 ++++++++++++++---- codex-rs/app-server/tests/common/Cargo.toml | 2 + codex-rs/app-server/tests/common/lib.rs | 3 + .../app-server/tests/common/mcp_process.rs | 48 +- .../tests/common/mock_model_server.rs | 19 + codex-rs/app-server/tests/common/rollout.rs | 82 ++ .../app-server/tests/suite/list_resume.rs | 72 +- codex-rs/app-server/tests/suite/model_list.rs | 37 +- codex-rs/app-server/tests/suite/v2/mod.rs | 4 + .../tests/suite/v2/thread_archive.rs | 93 +++ .../app-server/tests/suite/v2/thread_list.rs | 205 +++++ .../tests/suite/v2/thread_resume.rs | 79 ++ .../app-server/tests/suite/v2/thread_start.rs | 81 ++ codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/rollout/list.rs | 2 +- .../core/tests/suite/user_notification.rs | 4 + 19 files changed, 1482 insertions(+), 287 deletions(-) create mode 100644 codex-rs/app-server/tests/common/rollout.rs create mode 100644 codex-rs/app-server/tests/suite/v2/thread_archive.rs create mode 100644 codex-rs/app-server/tests/suite/v2/thread_list.rs create mode 100644 codex-rs/app-server/tests/suite/v2/thread_resume.rs create mode 100644 codex-rs/app-server/tests/suite/v2/thread_start.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f11a1658..c4121da2 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -186,9 +186,11 @@ dependencies = [ "chrono", "codex-app-server-protocol", "codex-core", + "codex-protocol", "serde", "serde_json", "tokio", + "uuid", "wiremock", ] diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 75618905..275d294a 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::path::Path; use std::path::PathBuf; use crate::JSONRPCNotification; @@ -101,11 +102,43 @@ macro_rules! client_request_definitions { client_request_definitions! { /// NEW APIs + // Thread lifecycle + #[serde(rename = "thread/start")] + #[ts(rename = "thread/start")] + ThreadStart { + params: v2::ThreadStartParams, + response: v2::ThreadStartResponse, + }, + #[serde(rename = "thread/resume")] + #[ts(rename = "thread/resume")] + ThreadResume { + params: v2::ThreadResumeParams, + response: v2::ThreadResumeResponse, + }, + #[serde(rename = "thread/archive")] + #[ts(rename = "thread/archive")] + ThreadArchive { + params: v2::ThreadArchiveParams, + response: v2::ThreadArchiveResponse, + }, + #[serde(rename = "thread/list")] + #[ts(rename = "thread/list")] + ThreadList { + params: v2::ThreadListParams, + response: v2::ThreadListResponse, + }, + #[serde(rename = "thread/compact")] + #[ts(rename = "thread/compact")] + ThreadCompact { + params: v2::ThreadCompactParams, + response: v2::ThreadCompactResponse, + }, + #[serde(rename = "model/list")] #[ts(rename = "model/list")] - ListModels { - params: v2::ListModelsParams, - response: v2::ListModelsResponse, + ModelList { + params: v2::ModelListParams, + response: v2::ModelListResponse, }, #[serde(rename = "account/login")] @@ -131,9 +164,9 @@ client_request_definitions! { #[serde(rename = "feedback/upload")] #[ts(rename = "feedback/upload")] - UploadFeedback { - params: v2::UploadFeedbackParams, - response: v2::UploadFeedbackResponse, + FeedbackUpload { + params: v2::FeedbackUploadParams, + response: v2::FeedbackUploadResponse, }, #[serde(rename = "account/read")] @@ -292,7 +325,7 @@ macro_rules! server_request_definitions { #[allow(clippy::vec_init_then_push)] pub fn export_server_response_schemas( - out_dir: &::std::path::Path, + out_dir: &Path, ) -> ::anyhow::Result> { let mut schemas = Vec::new(); paste! { @@ -303,7 +336,7 @@ macro_rules! server_request_definitions { #[allow(clippy::vec_init_then_push)] pub fn export_server_param_schemas( - out_dir: &::std::path::Path, + out_dir: &Path, ) -> ::anyhow::Result> { let mut schemas = Vec::new(); paste! { @@ -741,16 +774,16 @@ mod tests { #[test] fn serialize_list_models() -> Result<()> { - let request = ClientRequest::ListModels { + let request = ClientRequest::ModelList { request_id: RequestId::Integer(6), - params: v2::ListModelsParams::default(), + params: v2::ModelListParams::default(), }; assert_eq!( json!({ "method": "model/list", "id": 6, "params": { - "pageSize": null, + "limit": null, "cursor": null } }), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 96cb2d76..f96bbf43 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1,3 +1,6 @@ +use std::collections::HashMap; +use std::path::PathBuf; + use crate::protocol::common::AuthMode; use codex_protocol::ConversationId; use codex_protocol::account::PlanType; @@ -9,10 +12,109 @@ use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; -use std::path::PathBuf; use ts_rs::TS; use uuid::Uuid; +// Macro to declare a camelCased API v2 enum mirroring a core enum which +// tends to use kebab-case. +macro_rules! v2_enum_from_core { + ( + pub enum $Name:ident from $Src:path { $( $Variant:ident ),+ $(,)? } + ) => { + #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] + #[serde(rename_all = "camelCase")] + #[ts(export_to = "v2/")] + pub enum $Name { $( $Variant ),+ } + + impl $Name { + pub fn to_core(self) -> $Src { + match self { $( $Name::$Variant => <$Src>::$Variant ),+ } + } + } + + impl From<$Src> for $Name { + fn from(value: $Src) -> Self { + match value { $( <$Src>::$Variant => $Name::$Variant ),+ } + } + } + }; +} + +v2_enum_from_core!( + pub enum AskForApproval from codex_protocol::protocol::AskForApproval { + UnlessTrusted, OnFailure, OnRequest, Never + } +); + +v2_enum_from_core!( + pub enum SandboxMode from codex_protocol::config_types::SandboxMode { + ReadOnly, WorkspaceWrite, DangerFullAccess + } +); + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(tag = "mode", rename_all = "camelCase")] +#[ts(tag = "mode")] +#[ts(export_to = "v2/")] +pub enum SandboxPolicy { + DangerFullAccess, + ReadOnly, + WorkspaceWrite { + #[serde(default)] + writable_roots: Vec, + #[serde(default)] + network_access: bool, + #[serde(default)] + exclude_tmpdir_env_var: bool, + #[serde(default)] + exclude_slash_tmp: bool, + }, +} + +impl SandboxPolicy { + pub fn to_core(&self) -> codex_protocol::protocol::SandboxPolicy { + match self { + SandboxPolicy::DangerFullAccess => { + codex_protocol::protocol::SandboxPolicy::DangerFullAccess + } + SandboxPolicy::ReadOnly => codex_protocol::protocol::SandboxPolicy::ReadOnly, + SandboxPolicy::WorkspaceWrite { + writable_roots, + network_access, + exclude_tmpdir_env_var, + exclude_slash_tmp, + } => codex_protocol::protocol::SandboxPolicy::WorkspaceWrite { + writable_roots: writable_roots.clone(), + network_access: *network_access, + exclude_tmpdir_env_var: *exclude_tmpdir_env_var, + exclude_slash_tmp: *exclude_slash_tmp, + }, + } + } +} + +impl From for SandboxPolicy { + fn from(value: codex_protocol::protocol::SandboxPolicy) -> Self { + match value { + codex_protocol::protocol::SandboxPolicy::DangerFullAccess => { + SandboxPolicy::DangerFullAccess + } + codex_protocol::protocol::SandboxPolicy::ReadOnly => SandboxPolicy::ReadOnly, + codex_protocol::protocol::SandboxPolicy::WorkspaceWrite { + writable_roots, + network_access, + exclude_tmpdir_env_var, + exclude_slash_tmp, + } => SandboxPolicy::WorkspaceWrite { + writable_roots, + network_access, + exclude_tmpdir_env_var, + exclude_slash_tmp, + }, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(tag = "type", rename_all = "camelCase")] #[ts(tag = "type")] @@ -82,11 +184,11 @@ pub struct GetAccountResponse { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] -pub struct ListModelsParams { - /// Optional page size; defaults to a reasonable server-side value. - pub page_size: Option, +pub struct ModelListParams { /// Opaque pagination cursor returned by a previous call. pub cursor: Option, + /// Optional page size; defaults to a reasonable server-side value. + pub limit: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -114,17 +216,17 @@ pub struct ReasoningEffortOption { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] -pub struct ListModelsResponse { - pub items: Vec, +pub struct ModelListResponse { + pub data: Vec, /// Opaque cursor to pass to the next call to continue after the last item. - /// if None, there are no more items to return. + /// If None, there are no more items to return. pub next_cursor: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] -pub struct UploadFeedbackParams { +pub struct FeedbackUploadParams { pub classification: String, pub reason: Option, pub conversation_id: Option, @@ -134,10 +236,101 @@ pub struct UploadFeedbackParams { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] -pub struct UploadFeedbackResponse { +pub struct FeedbackUploadResponse { pub thread_id: String, } +// === Threads, Turns, and Items === +// Thread APIs +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadStartParams { + pub model: Option, + pub model_provider: Option, + pub cwd: Option, + pub approval_policy: Option, + pub sandbox: Option, + pub config: Option>, + pub base_instructions: Option, + pub developer_instructions: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadStartResponse { + pub thread: Thread, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadResumeParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadResumeResponse { + pub thread: Thread, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadArchiveParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadArchiveResponse {} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadListParams { + /// Opaque pagination cursor returned by a previous call. + pub cursor: Option, + /// Optional page size; defaults to a reasonable server-side value. + pub limit: Option, + /// Optional provider filter; when set, only sessions recorded under these + /// providers are returned. When present but empty, includes all providers. + pub model_providers: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadListResponse { + pub data: Vec, + /// Opaque cursor to pass to the next call to continue after the last item. + /// if None, there are no more items to return. + pub next_cursor: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadCompactParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadCompactResponse {} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct Thread { + pub id: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -145,13 +338,6 @@ pub struct AccountUpdatedNotification { pub auth_method: Option, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] -#[serde(rename_all = "camelCase")] -#[ts(export_to = "v2/")] -pub struct Thread { - pub id: String, -} - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 3609b044..1d7c0a48 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -20,6 +20,8 @@ use codex_app_server_protocol::ExecCommandApprovalParams; use codex_app_server_protocol::ExecCommandApprovalResponse; use codex_app_server_protocol::ExecOneOffCommandParams; use codex_app_server_protocol::ExecOneOffCommandResponse; +use codex_app_server_protocol::FeedbackUploadParams; +use codex_app_server_protocol::FeedbackUploadResponse; use codex_app_server_protocol::FuzzyFileSearchParams; use codex_app_server_protocol::FuzzyFileSearchResponse; use codex_app_server_protocol::GetAccountRateLimitsResponse; @@ -34,12 +36,12 @@ use codex_app_server_protocol::InterruptConversationResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::ListConversationsParams; use codex_app_server_protocol::ListConversationsResponse; -use codex_app_server_protocol::ListModelsParams; -use codex_app_server_protocol::ListModelsResponse; use codex_app_server_protocol::LoginApiKeyParams; use codex_app_server_protocol::LoginApiKeyResponse; use codex_app_server_protocol::LoginChatGptCompleteNotification; use codex_app_server_protocol::LoginChatGptResponse; +use codex_app_server_protocol::ModelListParams; +use codex_app_server_protocol::ModelListResponse; use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationResponse; use codex_app_server_protocol::RemoveConversationListenerParams; @@ -56,8 +58,16 @@ use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::SessionConfiguredNotification; use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::SetDefaultModelResponse; -use codex_app_server_protocol::UploadFeedbackParams; -use codex_app_server_protocol::UploadFeedbackResponse; +use codex_app_server_protocol::Thread; +use codex_app_server_protocol::ThreadArchiveParams; +use codex_app_server_protocol::ThreadArchiveResponse; +use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadListResponse; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::UserInfoResponse; use codex_app_server_protocol::UserSavedConfig; use codex_backend_client::Client as BackendClient; @@ -83,6 +93,7 @@ use codex_core::exec_env::create_env; use codex_core::find_conversation_path_by_id_str; use codex_core::get_platform_sandbox; use codex_core::git_info::git_diff_to_remote; +use codex_core::parse_cursor; use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; @@ -98,7 +109,7 @@ use codex_protocol::ConversationId; use codex_protocol::config_types::ForcedLoginMethod; use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; -use codex_protocol::protocol::RateLimitSnapshot; +use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::user_input::UserInput as CoreInputItem; @@ -176,6 +187,26 @@ impl CodexMessageProcessor { ClientRequest::Initialize { .. } => { panic!("Initialize should be handled in MessageProcessor"); } + // === v2 Thread APIs === + ClientRequest::ThreadStart { request_id, params } => { + self.thread_start(request_id, params).await; + } + ClientRequest::ThreadResume { request_id, params } => { + self.thread_resume(request_id, params).await; + } + ClientRequest::ThreadArchive { request_id, params } => { + self.thread_archive(request_id, params).await; + } + ClientRequest::ThreadList { request_id, params } => { + self.thread_list(request_id, params).await; + } + ClientRequest::ThreadCompact { + request_id, + params: _, + } => { + self.send_unimplemented_error(request_id, "thread/compact") + .await; + } ClientRequest::NewConversation { request_id, params } => { // Do not tokio::spawn() to process new_conversation() // asynchronously because we need to ensure the conversation is @@ -188,7 +219,7 @@ impl CodexMessageProcessor { ClientRequest::ListConversations { request_id, params } => { self.handle_list_conversations(request_id, params).await; } - ClientRequest::ListModels { request_id, params } => { + ClientRequest::ModelList { request_id, params } => { self.list_models(request_id, params).await; } ClientRequest::LoginAccount { @@ -289,7 +320,7 @@ impl CodexMessageProcessor { } => { self.get_account_rate_limits(request_id).await; } - ClientRequest::UploadFeedback { request_id, params } => { + ClientRequest::FeedbackUpload { request_id, params } => { self.upload_feedback(request_id, params).await; } } @@ -637,7 +668,7 @@ impl CodexMessageProcessor { } } - async fn fetch_account_rate_limits(&self) -> Result { + async fn fetch_account_rate_limits(&self) -> Result { let Some(auth) = self.auth_manager.auth() else { return Err(JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, @@ -816,19 +847,47 @@ impl CodexMessageProcessor { } async fn process_new_conversation(&self, request_id: RequestId, params: NewConversationParams) { - let config = - match derive_config_from_params(params, self.codex_linux_sandbox_exe.clone()).await { - Ok(config) => config, - Err(err) => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("error deriving config: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let NewConversationParams { + model, + model_provider, + profile, + cwd, + approval_policy, + sandbox: sandbox_mode, + config: cli_overrides, + base_instructions, + developer_instructions, + compact_prompt, + include_apply_patch_tool, + } = params; + + let overrides = ConfigOverrides { + model, + config_profile: profile, + cwd: cwd.map(PathBuf::from), + approval_policy, + sandbox_mode, + model_provider, + codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(), + base_instructions, + developer_instructions, + compact_prompt, + include_apply_patch_tool, + ..Default::default() + }; + + let config = match derive_config_from_params(overrides, cli_overrides).await { + Ok(config) => config, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("error deriving config: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; match self.conversation_manager.new_conversation(config).await { Ok(conversation_id) => { @@ -856,6 +915,276 @@ impl CodexMessageProcessor { } } + async fn thread_start(&mut self, request_id: RequestId, params: ThreadStartParams) { + // Build ConfigOverrides directly from ThreadStartParams for config derivation. + let cli_overrides = params.config; + let overrides = ConfigOverrides { + model: params.model, + cwd: params.cwd.map(PathBuf::from), + approval_policy: params + .approval_policy + .map(codex_app_server_protocol::AskForApproval::to_core), + sandbox_mode: params + .sandbox + .map(codex_app_server_protocol::SandboxMode::to_core), + model_provider: params.model_provider, + codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(), + base_instructions: params.base_instructions, + developer_instructions: params.developer_instructions, + ..Default::default() + }; + + let config = match derive_config_from_params(overrides, cli_overrides).await { + Ok(config) => config, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("error deriving config: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match self.conversation_manager.new_conversation(config).await { + Ok(new_conv) => { + let thread = Thread { + id: new_conv.conversation_id.to_string(), + }; + + let response = ThreadStartResponse { + thread: thread.clone(), + }; + + // Auto-attach a conversation listener when starting a thread. + // Use the same behavior as the v1 API with experimental_raw_events=false. + if let Err(err) = self + .attach_conversation_listener(new_conv.conversation_id, false) + .await + { + tracing::warn!( + "failed to attach listener for conversation {}: {}", + new_conv.conversation_id, + err.message + ); + } + + self.outgoing.send_response(request_id, response).await; + + let notif = ThreadStartedNotification { thread }; + self.outgoing + .send_server_notification(ServerNotification::ThreadStarted(notif)) + .await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("error creating thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + + async fn thread_archive(&mut self, request_id: RequestId, params: ThreadArchiveParams) { + let conversation_id = match ConversationId::from_string(¶ms.thread_id) { + Ok(id) => id, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid thread id: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let rollout_path = match find_conversation_path_by_id_str( + &self.config.codex_home, + &conversation_id.to_string(), + ) + .await + { + Ok(Some(p)) => p, + Ok(None) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("no rollout found for conversation id {conversation_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("failed to locate conversation id {conversation_id}: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match self + .archive_conversation_common(conversation_id, &rollout_path) + .await + { + Ok(()) => { + let response = ThreadArchiveResponse {}; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + self.outgoing.send_error(request_id, err).await; + } + } + } + + async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) { + let ThreadListParams { + cursor, + limit, + model_providers, + } = params; + + let page_size = limit.unwrap_or(25).max(1) as usize; + + let (summaries, next_cursor) = match self + .list_conversations_common(page_size, cursor, model_providers) + .await + { + Ok(r) => r, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let data = summaries + .into_iter() + .map(|s| Thread { + id: s.conversation_id.to_string(), + }) + .collect(); + + let response = ThreadListResponse { data, next_cursor }; + self.outgoing.send_response(request_id, response).await; + } + + async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) { + let conversation_id = match ConversationId::from_string(¶ms.thread_id) { + Ok(id) => id, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid thread id: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let path = match find_conversation_path_by_id_str( + &self.config.codex_home, + &conversation_id.to_string(), + ) + .await + { + Ok(Some(p)) => p, + Ok(None) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("no rollout found for conversation id {conversation_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("failed to locate conversation id {conversation_id}: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let fallback_provider = self.config.model_provider_id.as_str(); + let summary = match read_summary_from_rollout(&path, fallback_provider).await { + Ok(s) => s, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("failed to load rollout `{}`: {err}", path.display()), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let initial_history = match RolloutRecorder::get_rollout_history(&summary.path).await { + Ok(initial_history) => initial_history, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "failed to load rollout `{}` for conversation {conversation_id}: {err}", + summary.path.display() + ), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match self + .conversation_manager + .resume_conversation_with_history( + self.config.as_ref().clone(), + initial_history, + self.auth_manager.clone(), + ) + .await + { + Ok(_) => { + // Auto-attach a conversation listener when resuming a thread. + if let Err(err) = self + .attach_conversation_listener(conversation_id, false) + .await + { + tracing::warn!( + "failed to attach listener for conversation {}: {}", + conversation_id, + err.message + ); + } + + let response = ThreadResumeResponse { + thread: Thread { + id: conversation_id.to_string(), + }, + }; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("error resuming thread: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + async fn get_conversation_summary( &self, request_id: RequestId, @@ -925,11 +1254,7 @@ impl CodexMessageProcessor { model_providers: model_provider, } = params; let page_size = page_size.unwrap_or(25); - // Decode the optional cursor string to a Cursor via serde (Cursor implements Deserialize from string) - let cursor_obj: Option = match cursor { - Some(s) => serde_json::from_str::(&format!("\"{s}\"")).ok(), - None => None, - }; + let cursor_obj: Option = cursor.as_ref().and_then(|s| parse_cursor(s)); let cursor_ref = cursor_obj.as_ref(); let model_provider_filter = match model_provider { Some(providers) => { @@ -973,33 +1298,86 @@ impl CodexMessageProcessor { .collect(); // Encode next_cursor as a plain string - let next_cursor = match page.next_cursor { - Some(c) => match serde_json::to_value(&c) { - Ok(serde_json::Value::String(s)) => Some(s), - _ => None, - }, - None => None, - }; + let next_cursor = page + .next_cursor + .and_then(|cursor| serde_json::to_value(&cursor).ok()) + .and_then(|value| value.as_str().map(str::to_owned)); let response = ListConversationsResponse { items, next_cursor }; self.outgoing.send_response(request_id, response).await; } - async fn list_models(&self, request_id: RequestId, params: ListModelsParams) { - let ListModelsParams { page_size, cursor } = params; + async fn list_conversations_common( + &self, + page_size: usize, + cursor: Option, + model_providers: Option>, + ) -> Result<(Vec, Option), JSONRPCErrorError> { + let cursor_obj: Option = cursor.as_ref().and_then(|s| parse_cursor(s)); + let cursor_ref = cursor_obj.as_ref(); + + let model_provider_filter = match model_providers { + Some(providers) => { + if providers.is_empty() { + None + } else { + Some(providers) + } + } + None => Some(vec![self.config.model_provider_id.clone()]), + }; + let fallback_provider = self.config.model_provider_id.clone(); + + let page = match RolloutRecorder::list_conversations( + &self.config.codex_home, + page_size, + cursor_ref, + INTERACTIVE_SESSION_SOURCES, + model_provider_filter.as_deref(), + fallback_provider.as_str(), + ) + .await + { + Ok(p) => p, + Err(err) => { + return Err(JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to list conversations: {err}"), + data: None, + }); + } + }; + + let items = page + .items + .into_iter() + .filter_map(|it| extract_conversation_summary(it.path, &it.head, &fallback_provider)) + .collect::>(); + + let next_cursor = page + .next_cursor + .and_then(|cursor| serde_json::to_value(&cursor).ok()) + .and_then(|value| value.as_str().map(str::to_owned)); + + Ok((items, next_cursor)) + } + + async fn list_models(&self, request_id: RequestId, params: ModelListParams) { + let ModelListParams { limit, cursor } = params; let models = supported_models(); let total = models.len(); if total == 0 { - let response = ListModelsResponse { - items: Vec::new(), + let response = ModelListResponse { + data: Vec::new(), next_cursor: None, }; self.outgoing.send_response(request_id, response).await; return; } - let effective_page_size = page_size.unwrap_or(total).max(1).min(total); + let effective_page_size = limit.unwrap_or(total as u32).max(1) as usize; + let effective_page_size = effective_page_size.min(total); let start = match cursor { Some(cursor) => match cursor.parse::() { Ok(idx) => idx, @@ -1033,7 +1411,10 @@ impl CodexMessageProcessor { } else { None }; - let response = ListModelsResponse { items, next_cursor }; + let response = ModelListResponse { + data: items, + next_cursor, + }; self.outgoing.send_response(request_id, response).await; } @@ -1052,7 +1433,36 @@ impl CodexMessageProcessor { // Derive a Config using the same logic as new conversation, honoring overrides if provided. let config = match overrides { Some(overrides) => { - derive_config_from_params(overrides, self.codex_linux_sandbox_exe.clone()).await + let NewConversationParams { + model, + model_provider, + profile, + cwd, + approval_policy, + sandbox, + config: cli_overrides, + base_instructions, + developer_instructions, + compact_prompt, + include_apply_patch_tool, + } = overrides; + + let overrides = ConfigOverrides { + model, + config_profile: profile, + cwd: cwd.map(PathBuf::from), + approval_policy, + sandbox_mode: sandbox, + model_provider, + codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(), + base_instructions, + developer_instructions, + compact_prompt, + include_apply_patch_tool, + ..Default::default() + }; + + derive_config_from_params(overrides, cli_overrides).await } None => Ok(self.config.as_ref().clone()), }; @@ -1196,82 +1606,103 @@ impl CodexMessageProcessor { self.outgoing.send_error(request_id, error).await; } - async fn archive_conversation(&self, request_id: RequestId, params: ArchiveConversationParams) { + async fn archive_conversation( + &mut self, + request_id: RequestId, + params: ArchiveConversationParams, + ) { let ArchiveConversationParams { conversation_id, rollout_path, } = params; - // Verify that the rollout path is in the sessions directory or else - // a malicious client could specify an arbitrary path. + match self + .archive_conversation_common(conversation_id, &rollout_path) + .await + { + Ok(()) => { + tracing::info!("thread/archive succeeded for {conversation_id}"); + let response = ArchiveConversationResponse {}; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + tracing::warn!( + "thread/archive failed for {conversation_id}: {}", + err.message + ); + self.outgoing.send_error(request_id, err).await; + } + } + } + + async fn archive_conversation_common( + &mut self, + conversation_id: ConversationId, + rollout_path: &Path, + ) -> Result<(), JSONRPCErrorError> { + // Verify rollout_path is under sessions dir. let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR); + let canonical_sessions_dir = match tokio::fs::canonicalize(&rollout_folder).await { Ok(path) => path, Err(err) => { - let error = JSONRPCErrorError { + return Err(JSONRPCErrorError { code: INTERNAL_ERROR_CODE, message: format!( "failed to archive conversation: unable to resolve sessions directory: {err}" ), data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + }); } }; - let canonical_rollout_path = tokio::fs::canonicalize(&rollout_path).await; + let canonical_rollout_path = tokio::fs::canonicalize(rollout_path).await; let canonical_rollout_path = if let Ok(path) = canonical_rollout_path && path.starts_with(&canonical_sessions_dir) { path } else { - let error = JSONRPCErrorError { + return Err(JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: format!( "rollout path `{}` must be in sessions directory", rollout_path.display() ), data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + }); }; + // Verify file name matches conversation id. let required_suffix = format!("{conversation_id}.jsonl"); let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else { - let error = JSONRPCErrorError { + return Err(JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: format!( "rollout path `{}` missing file name", rollout_path.display() ), data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + }); }; - if !file_name .to_string_lossy() .ends_with(required_suffix.as_str()) { - let error = JSONRPCErrorError { + return Err(JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: format!( "rollout path `{}` does not match conversation id {conversation_id}", rollout_path.display() ), data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + }); } - let removed_conversation = self + // If the conversation is active, request shutdown and wait briefly. + if let Some(conversation) = self .conversation_manager .remove_conversation(&conversation_id) - .await; - if let Some(conversation) = removed_conversation { + .await + { info!("conversation {conversation_id} was active; shutting down"); let conversation_clone = conversation.clone(); let notify = Arc::new(tokio::sync::Notify::new()); @@ -1280,20 +1711,24 @@ impl CodexMessageProcessor { // Establish the listener for ShutdownComplete before submitting // Shutdown so it is not missed. let is_shutdown = tokio::spawn(async move { + // Create the notified future outside the loop to avoid losing notifications. + let notified = notify_clone.notified(); + tokio::pin!(notified); loop { select! { - _ = notify_clone.notified() => { - break; - } + _ = &mut notified => { break; } event = conversation_clone.next_event() => { - if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) { - break; + match event { + Ok(event) => { + if matches!(event.msg, EventMsg::ShutdownComplete) { break; } + } + // Break on errors to avoid tight loops when the agent loop has exited. + Err(_) => { break; } } } } } }); - // Request shutdown. match conversation.submit(Op::Shutdown).await { Ok(_) => { @@ -1304,20 +1739,21 @@ impl CodexMessageProcessor { } _ = tokio::time::sleep(Duration::from_secs(10)) => { warn!("conversation {conversation_id} shutdown timed out; proceeding with archive"); - notify.notify_one(); + // Wake any waiter; use notify_waiters to avoid missing the signal. + notify.notify_waiters(); + // Perhaps we lost a shutdown race, so let's continue to + // clean up the .jsonl file. } } } Err(err) => { error!("failed to submit Shutdown to conversation {conversation_id}: {err}"); - notify.notify_one(); - // Perhaps we lost a shutdown race, so let's continue to - // clean up the .jsonl file. + notify.notify_waiters(); } } } - // Move the .jsonl file to the archived sessions subdir. + // Move the rollout file to archived. let result: std::io::Result<()> = async { let archive_folder = self .config @@ -1329,20 +1765,11 @@ impl CodexMessageProcessor { } .await; - match result { - Ok(()) => { - let response = ArchiveConversationResponse {}; - self.outgoing.send_response(request_id, response).await; - } - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to archive conversation: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - } + result.map_err(|err| JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to archive conversation: {err}"), + data: None, + }) } async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) { @@ -1478,24 +1905,70 @@ impl CodexMessageProcessor { conversation_id, experimental_raw_events, } = params; - let Ok(conversation) = self + + match self + .attach_conversation_listener(conversation_id, experimental_raw_events) + .await + { + Ok(subscription_id) => { + let response = AddConversationSubscriptionResponse { subscription_id }; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + self.outgoing.send_error(request_id, err).await; + } + } + } + + async fn remove_conversation_listener( + &mut self, + request_id: RequestId, + params: RemoveConversationListenerParams, + ) { + let RemoveConversationListenerParams { subscription_id } = params; + match self.conversation_listeners.remove(&subscription_id) { + Some(sender) => { + // Signal the spawned task to exit and acknowledge. + let _ = sender.send(()); + let response = RemoveConversationSubscriptionResponse {}; + self.outgoing.send_response(request_id, response).await; + } + None => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("subscription not found: {subscription_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + + async fn attach_conversation_listener( + &mut self, + conversation_id: ConversationId, + experimental_raw_events: bool, + ) -> Result { + let conversation = match self .conversation_manager .get_conversation(conversation_id) .await - else { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("conversation not found: {conversation_id}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + { + Ok(conv) => conv, + Err(_) => { + return Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("conversation not found: {conversation_id}"), + data: None, + }); + } }; let subscription_id = Uuid::new_v4(); let (cancel_tx, mut cancel_rx) = oneshot::channel(); self.conversation_listeners .insert(subscription_id, cancel_tx); + let outgoing_for_task = self.outgoing.clone(); let pending_interrupts = self.pending_interrupts.clone(); tokio::spawn(async move { @@ -1535,45 +2008,32 @@ impl CodexMessageProcessor { continue; } }; - params.insert("conversationId".to_string(), conversation_id.to_string().into()); + params.insert( + "conversationId".to_string(), + conversation_id.to_string().into(), + ); - outgoing_for_task.send_notification(OutgoingNotification { - method, - params: Some(params.into()), - }) + outgoing_for_task + .send_notification(OutgoingNotification { + method, + params: Some(params.into()), + }) + .await; + + apply_bespoke_event_handling( + event.clone(), + conversation_id, + conversation.clone(), + outgoing_for_task.clone(), + pending_interrupts.clone(), + ) .await; - - apply_bespoke_event_handling(event.clone(), conversation_id, conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone()).await; } } } }); - let response = AddConversationSubscriptionResponse { subscription_id }; - self.outgoing.send_response(request_id, response).await; - } - async fn remove_conversation_listener( - &mut self, - request_id: RequestId, - params: RemoveConversationListenerParams, - ) { - let RemoveConversationListenerParams { subscription_id } = params; - match self.conversation_listeners.remove(&subscription_id) { - Some(sender) => { - // Signal the spawned task to exit and acknowledge. - let _ = sender.send(()); - let response = RemoveConversationSubscriptionResponse {}; - self.outgoing.send_response(request_id, response).await; - } - None => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("subscription not found: {subscription_id}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - } + Ok(subscription_id) } async fn git_diff_to_origin(&self, request_id: RequestId, cwd: PathBuf) { @@ -1637,8 +2097,8 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn upload_feedback(&self, request_id: RequestId, params: UploadFeedbackParams) { - let UploadFeedbackParams { + async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) { + let FeedbackUploadParams { classification, reason, conversation_id, @@ -1683,7 +2143,7 @@ impl CodexMessageProcessor { match upload_result { Ok(()) => { - let response = UploadFeedbackResponse { thread_id }; + let response = FeedbackUploadResponse { thread_id }; self.outgoing.send_response(request_id, response).await; } Err(err) => { @@ -1797,41 +2257,9 @@ async fn apply_bespoke_event_handling( } async fn derive_config_from_params( - params: NewConversationParams, - codex_linux_sandbox_exe: Option, + overrides: ConfigOverrides, + cli_overrides: Option>, ) -> std::io::Result { - let NewConversationParams { - model, - model_provider, - profile, - cwd, - approval_policy, - sandbox: sandbox_mode, - config: cli_overrides, - base_instructions, - developer_instructions, - compact_prompt, - include_apply_patch_tool, - } = params; - let overrides = ConfigOverrides { - model, - review_model: None, - config_profile: profile, - cwd: cwd.map(PathBuf::from), - approval_policy, - sandbox_mode, - model_provider, - codex_linux_sandbox_exe, - base_instructions, - developer_instructions, - compact_prompt, - include_apply_patch_tool, - show_raw_agent_reasoning: None, - tools_web_search_request: None, - experimental_sandbox_command_assessment: None, - additional_writable_roots: Vec::new(), - }; - let cli_overrides = cli_overrides .unwrap_or_default() .into_iter() diff --git a/codex-rs/app-server/tests/common/Cargo.toml b/codex-rs/app-server/tests/common/Cargo.toml index ece8174d..6240f755 100644 --- a/codex-rs/app-server/tests/common/Cargo.toml +++ b/codex-rs/app-server/tests/common/Cargo.toml @@ -13,6 +13,7 @@ base64 = { workspace = true } chrono = { workspace = true } codex-app-server-protocol = { workspace = true } codex-core = { workspace = true } +codex-protocol = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = [ @@ -21,4 +22,5 @@ tokio = { workspace = true, features = [ "process", "rt-multi-thread", ] } +uuid = { workspace = true } wiremock = { workspace = true } diff --git a/codex-rs/app-server/tests/common/lib.rs b/codex-rs/app-server/tests/common/lib.rs index 71b27190..dc3d24cc 100644 --- a/codex-rs/app-server/tests/common/lib.rs +++ b/codex-rs/app-server/tests/common/lib.rs @@ -2,6 +2,7 @@ mod auth_fixtures; mod mcp_process; mod mock_model_server; mod responses; +mod rollout; pub use auth_fixtures::ChatGptAuthFixture; pub use auth_fixtures::ChatGptIdTokenClaims; @@ -10,9 +11,11 @@ pub use auth_fixtures::write_chatgpt_auth; use codex_app_server_protocol::JSONRPCResponse; pub use mcp_process::McpProcess; pub use mock_model_server::create_mock_chat_completions_server; +pub use mock_model_server::create_mock_chat_completions_server_unchecked; pub use responses::create_apply_patch_sse_response; pub use responses::create_final_assistant_message_sse_response; pub use responses::create_shell_sse_response; +pub use rollout::create_fake_rollout; use serde::de::DeserializeOwned; pub fn to_response(response: JSONRPCResponse) -> anyhow::Result { diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 3c1fbf77..254f121a 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -17,12 +17,13 @@ use codex_app_server_protocol::ArchiveConversationParams; use codex_app_server_protocol::CancelLoginChatGptParams; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientNotification; +use codex_app_server_protocol::FeedbackUploadParams; use codex_app_server_protocol::GetAuthStatusParams; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::InterruptConversationParams; use codex_app_server_protocol::ListConversationsParams; -use codex_app_server_protocol::ListModelsParams; use codex_app_server_protocol::LoginApiKeyParams; +use codex_app_server_protocol::ModelListParams; use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::RemoveConversationListenerParams; use codex_app_server_protocol::ResumeConversationParams; @@ -30,7 +31,10 @@ use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserTurnParams; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SetDefaultModelParams; -use codex_app_server_protocol::UploadFeedbackParams; +use codex_app_server_protocol::ThreadArchiveParams; +use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCMessage; @@ -246,7 +250,7 @@ impl McpProcess { /// Send a `feedback/upload` JSON-RPC request. pub async fn send_upload_feedback_request( &mut self, - params: UploadFeedbackParams, + params: FeedbackUploadParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("feedback/upload", params).await @@ -275,10 +279,46 @@ impl McpProcess { self.send_request("listConversations", params).await } + /// Send a `thread/start` JSON-RPC request. + pub async fn send_thread_start_request( + &mut self, + params: ThreadStartParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/start", params).await + } + + /// Send a `thread/resume` JSON-RPC request. + pub async fn send_thread_resume_request( + &mut self, + params: ThreadResumeParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/resume", params).await + } + + /// Send a `thread/archive` JSON-RPC request. + pub async fn send_thread_archive_request( + &mut self, + params: ThreadArchiveParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/archive", params).await + } + + /// Send a `thread/list` JSON-RPC request. + pub async fn send_thread_list_request( + &mut self, + params: ThreadListParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/list", params).await + } + /// Send a `model/list` JSON-RPC request. pub async fn send_list_models_request( &mut self, - params: ListModelsParams, + params: ModelListParams, ) -> anyhow::Result { let params = Some(serde_json::to_value(params)?); self.send_request("model/list", params).await diff --git a/codex-rs/app-server/tests/common/mock_model_server.rs b/codex-rs/app-server/tests/common/mock_model_server.rs index be7f3eb5..08765338 100644 --- a/codex-rs/app-server/tests/common/mock_model_server.rs +++ b/codex-rs/app-server/tests/common/mock_model_server.rs @@ -29,6 +29,25 @@ pub async fn create_mock_chat_completions_server(responses: Vec) -> Mock server } +/// Same as `create_mock_chat_completions_server` but does not enforce an +/// expectation on the number of calls. +pub async fn create_mock_chat_completions_server_unchecked(responses: Vec) -> MockServer { + let server = MockServer::start().await; + + let seq_responder = SeqResponder { + num_calls: AtomicUsize::new(0), + responses, + }; + + Mock::given(method("POST")) + .and(path("/v1/chat/completions")) + .respond_with(seq_responder) + .mount(&server) + .await; + + server +} + struct SeqResponder { num_calls: AtomicUsize, responses: Vec, diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs new file mode 100644 index 00000000..c8197a04 --- /dev/null +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -0,0 +1,82 @@ +use anyhow::Result; +use codex_protocol::ConversationId; +use codex_protocol::protocol::SessionMeta; +use codex_protocol::protocol::SessionSource; +use serde_json::json; +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use uuid::Uuid; + +/// Create a minimal rollout file under `CODEX_HOME/sessions/YYYY/MM/DD/`. +/// +/// - `filename_ts` is the filename timestamp component in `YYYY-MM-DDThh-mm-ss` format. +/// - `meta_rfc3339` is the envelope timestamp used in JSON lines. +/// - `preview` is the user message preview text. +/// - `model_provider` optionally sets the provider in the session meta payload. +/// +/// Returns the generated conversation/session UUID as a string. +pub fn create_fake_rollout( + codex_home: &Path, + filename_ts: &str, + meta_rfc3339: &str, + preview: &str, + model_provider: Option<&str>, +) -> Result { + let uuid = Uuid::new_v4(); + let uuid_str = uuid.to_string(); + let conversation_id = ConversationId::from_string(&uuid_str)?; + + // sessions/YYYY/MM/DD derived from filename_ts (YYYY-MM-DDThh-mm-ss) + let year = &filename_ts[0..4]; + let month = &filename_ts[5..7]; + let day = &filename_ts[8..10]; + let dir = codex_home.join("sessions").join(year).join(month).join(day); + fs::create_dir_all(&dir)?; + + let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl")); + + // Build JSONL lines + let payload = serde_json::to_value(SessionMeta { + id: conversation_id, + timestamp: meta_rfc3339.to_string(), + cwd: PathBuf::from("/"), + originator: "codex".to_string(), + cli_version: "0.0.0".to_string(), + instructions: None, + source: SessionSource::Cli, + model_provider: model_provider.map(str::to_string), + })?; + + let lines = [ + json!({ + "timestamp": meta_rfc3339, + "type": "session_meta", + "payload": payload + }) + .to_string(), + json!({ + "timestamp": meta_rfc3339, + "type":"response_item", + "payload": { + "type":"message", + "role":"user", + "content":[{"type":"input_text","text": preview}] + } + }) + .to_string(), + json!({ + "timestamp": meta_rfc3339, + "type":"event_msg", + "payload": { + "type":"user_message", + "message": preview, + "kind": "plain" + } + }) + .to_string(), + ]; + + fs::write(file_path, lines.join("\n") + "\n")?; + Ok(uuid_str) +} diff --git a/codex-rs/app-server/tests/suite/list_resume.rs b/codex-rs/app-server/tests/suite/list_resume.rs index 76b92b3e..30be93a2 100644 --- a/codex-rs/app-server/tests/suite/list_resume.rs +++ b/codex-rs/app-server/tests/suite/list_resume.rs @@ -1,5 +1,6 @@ use anyhow::Result; use app_test_support::McpProcess; +use app_test_support::create_fake_rollout; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; @@ -15,12 +16,8 @@ use codex_core::protocol::EventMsg; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use pretty_assertions::assert_eq; -use serde_json::json; -use std::fs; -use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; -use uuid::Uuid; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); @@ -357,70 +354,3 @@ async fn test_list_and_resume_conversations() -> Result<()> { Ok(()) } - -fn create_fake_rollout( - codex_home: &Path, - filename_ts: &str, - meta_rfc3339: &str, - preview: &str, - model_provider: Option<&str>, -) -> Result<()> { - let uuid = Uuid::new_v4(); - // sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss) - let year = &filename_ts[0..4]; - let month = &filename_ts[5..7]; - let day = &filename_ts[8..10]; - let dir = codex_home.join("sessions").join(year).join(month).join(day); - fs::create_dir_all(&dir)?; - - let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl")); - let mut lines = Vec::new(); - // Meta line with timestamp (flattened meta in payload for new schema) - let mut payload = json!({ - "id": uuid, - "timestamp": meta_rfc3339, - "cwd": "/", - "originator": "codex", - "cli_version": "0.0.0", - "instructions": null, - }); - if let Some(provider) = model_provider { - payload["model_provider"] = json!(provider); - } - lines.push( - json!({ - "timestamp": meta_rfc3339, - "type": "session_meta", - "payload": payload - }) - .to_string(), - ); - // Minimal user message entry as a persisted response item (with envelope timestamp) - lines.push( - json!({ - "timestamp": meta_rfc3339, - "type":"response_item", - "payload": { - "type":"message", - "role":"user", - "content":[{"type":"input_text","text": preview}] - } - }) - .to_string(), - ); - // Add a matching user message event line to satisfy filters - lines.push( - json!({ - "timestamp": meta_rfc3339, - "type":"event_msg", - "payload": { - "type":"user_message", - "message": preview, - "kind": "plain" - } - }) - .to_string(), - ); - fs::write(file_path, lines.join("\n") + "\n")?; - Ok(()) -} diff --git a/codex-rs/app-server/tests/suite/model_list.rs b/codex-rs/app-server/tests/suite/model_list.rs index 3472e0d3..b652121f 100644 --- a/codex-rs/app-server/tests/suite/model_list.rs +++ b/codex-rs/app-server/tests/suite/model_list.rs @@ -6,9 +6,9 @@ use app_test_support::McpProcess; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::ListModelsParams; -use codex_app_server_protocol::ListModelsResponse; use codex_app_server_protocol::Model; +use codex_app_server_protocol::ModelListParams; +use codex_app_server_protocol::ModelListResponse; use codex_app_server_protocol::ReasoningEffortOption; use codex_app_server_protocol::RequestId; use codex_protocol::config_types::ReasoningEffort; @@ -27,8 +27,8 @@ async fn list_models_returns_all_models_with_large_limit() -> Result<()> { timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; let request_id = mcp - .send_list_models_request(ListModelsParams { - page_size: Some(100), + .send_list_models_request(ModelListParams { + limit: Some(100), cursor: None, }) .await?; @@ -39,7 +39,10 @@ async fn list_models_returns_all_models_with_large_limit() -> Result<()> { ) .await??; - let ListModelsResponse { items, next_cursor } = to_response::(response)?; + let ModelListResponse { + data: items, + next_cursor, + } = to_response::(response)?; let expected_models = vec![ Model { @@ -111,8 +114,8 @@ async fn list_models_pagination_works() -> Result<()> { timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; let first_request = mcp - .send_list_models_request(ListModelsParams { - page_size: Some(1), + .send_list_models_request(ModelListParams { + limit: Some(1), cursor: None, }) .await?; @@ -123,18 +126,18 @@ async fn list_models_pagination_works() -> Result<()> { ) .await??; - let ListModelsResponse { - items: first_items, + let ModelListResponse { + data: first_items, next_cursor: first_cursor, - } = to_response::(first_response)?; + } = to_response::(first_response)?; assert_eq!(first_items.len(), 1); assert_eq!(first_items[0].id, "gpt-5-codex"); let next_cursor = first_cursor.ok_or_else(|| anyhow!("cursor for second page"))?; let second_request = mcp - .send_list_models_request(ListModelsParams { - page_size: Some(1), + .send_list_models_request(ModelListParams { + limit: Some(1), cursor: Some(next_cursor.clone()), }) .await?; @@ -145,10 +148,10 @@ async fn list_models_pagination_works() -> Result<()> { ) .await??; - let ListModelsResponse { - items: second_items, + let ModelListResponse { + data: second_items, next_cursor: second_cursor, - } = to_response::(second_response)?; + } = to_response::(second_response)?; assert_eq!(second_items.len(), 1); assert_eq!(second_items[0].id, "gpt-5"); @@ -164,8 +167,8 @@ async fn list_models_rejects_invalid_cursor() -> Result<()> { timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; let request_id = mcp - .send_list_models_request(ListModelsParams { - page_size: None, + .send_list_models_request(ModelListParams { + limit: None, cursor: Some("invalid".to_string()), }) .await?; diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index bd7a1467..ccaa9eda 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -1,2 +1,6 @@ // v2 test suite modules mod account; +mod thread_archive; +mod thread_list; +mod thread_resume; +mod thread_start; diff --git a/codex-rs/app-server/tests/suite/v2/thread_archive.rs b/codex-rs/app-server/tests/suite/v2/thread_archive.rs new file mode 100644 index 00000000..083f3da9 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_archive.rs @@ -0,0 +1,93 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadArchiveParams; +use codex_app_server_protocol::ThreadArchiveResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_core::ARCHIVED_SESSIONS_SUBDIR; +use codex_core::find_conversation_path_by_id_str; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> { + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // Start a thread. + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(start_resp)?; + assert!(!thread.id.is_empty()); + + // Locate the rollout path recorded for this thread id. + let rollout_path = find_conversation_path_by_id_str(codex_home.path(), &thread.id) + .await? + .expect("expected rollout path for thread id to exist"); + assert!( + rollout_path.exists(), + "expected {} to exist", + rollout_path.display() + ); + + // Archive the thread. + let archive_id = mcp + .send_thread_archive_request(ThreadArchiveParams { + thread_id: thread.id.clone(), + }) + .await?; + let archive_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(archive_id)), + ) + .await??; + let _: ThreadArchiveResponse = to_response::(archive_resp)?; + + // Verify file moved. + let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR); + // The archived file keeps the original filename (rollout-...-.jsonl). + let archived_rollout_path = + archived_directory.join(rollout_path.file_name().expect("rollout file name")); + assert!( + !rollout_path.exists(), + "expected rollout path {} to be moved", + rollout_path.display() + ); + assert!( + archived_rollout_path.exists(), + "expected archived rollout path {} to exist", + archived_rollout_path.display() + ); + + Ok(()) +} + +fn create_config_toml(codex_home: &Path) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write(config_toml, config_contents()) +} + +fn config_contents() -> &'static str { + r#"model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" +"# +} diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs new file mode 100644 index 00000000..43e243b0 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -0,0 +1,205 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_fake_rollout; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadListParams; +use codex_app_server_protocol::ThreadListResponse; +use serde_json::json; +use tempfile::TempDir; +use tokio::time::timeout; +use uuid::Uuid; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_list_basic_empty() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // List threads in an empty CODEX_HOME; should return an empty page with nextCursor: null. + let list_id = mcp + .send_thread_list_request(ThreadListParams { + cursor: None, + limit: Some(10), + model_providers: None, + }) + .await?; + let list_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + ) + .await??; + let ThreadListResponse { data, next_cursor } = to_response::(list_resp)?; + assert!(data.is_empty()); + assert_eq!(next_cursor, None); + + Ok(()) +} + +// Minimal config.toml for listing. +fn create_minimal_config(codex_home: &std::path::Path) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + r#" +model = "mock-model" +approval_policy = "never" +"#, + ) +} + +#[tokio::test] +async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + // Create three rollouts so we can paginate with limit=2. + let _a = create_fake_rollout( + codex_home.path(), + "2025-01-02T12-00-00", + "2025-01-02T12:00:00Z", + "Hello", + Some("mock_provider"), + )?; + let _b = create_fake_rollout( + codex_home.path(), + "2025-01-01T13-00-00", + "2025-01-01T13:00:00Z", + "Hello", + Some("mock_provider"), + )?; + let _c = create_fake_rollout( + codex_home.path(), + "2025-01-01T12-00-00", + "2025-01-01T12:00:00Z", + "Hello", + Some("mock_provider"), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // Page 1: limit 2 → expect next_cursor Some. + let page1_id = mcp + .send_thread_list_request(ThreadListParams { + cursor: None, + limit: Some(2), + model_providers: Some(vec!["mock_provider".to_string()]), + }) + .await?; + let page1_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(page1_id)), + ) + .await??; + let ThreadListResponse { + data: data1, + next_cursor: cursor1, + } = to_response::(page1_resp)?; + assert_eq!(data1.len(), 2); + let cursor1 = cursor1.expect("expected nextCursor on first page"); + + // Page 2: with cursor → expect next_cursor None when no more results. + let page2_id = mcp + .send_thread_list_request(ThreadListParams { + cursor: Some(cursor1), + limit: Some(2), + model_providers: Some(vec!["mock_provider".to_string()]), + }) + .await?; + let page2_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(page2_id)), + ) + .await??; + let ThreadListResponse { + data: data2, + next_cursor: cursor2, + } = to_response::(page2_resp)?; + assert!(data2.len() <= 2); + assert_eq!(cursor2, None, "expected nextCursor to be null on last page"); + + Ok(()) +} + +#[tokio::test] +async fn thread_list_respects_provider_filter() -> Result<()> { + let codex_home = TempDir::new()?; + create_minimal_config(codex_home.path())?; + + // Create rollouts under two providers. + let _a = create_fake_rollout( + codex_home.path(), + "2025-01-02T10-00-00", + "2025-01-02T10:00:00Z", + "X", + Some("mock_provider"), + )?; // mock_provider + // one with a different provider + let uuid = Uuid::new_v4(); + let dir = codex_home + .path() + .join("sessions") + .join("2025") + .join("01") + .join("02"); + std::fs::create_dir_all(&dir)?; + let file_path = dir.join(format!("rollout-2025-01-02T11-00-00-{uuid}.jsonl")); + let lines = [ + json!({ + "timestamp": "2025-01-02T11:00:00Z", + "type": "session_meta", + "payload": { + "id": uuid, + "timestamp": "2025-01-02T11:00:00Z", + "cwd": "/", + "originator": "codex", + "cli_version": "0.0.0", + "instructions": null, + "source": "vscode", + "model_provider": "other_provider" + } + }) + .to_string(), + json!({ + "timestamp": "2025-01-02T11:00:00Z", + "type":"response_item", + "payload": {"type":"message","role":"user","content":[{"type":"input_text","text":"X"}]} + }) + .to_string(), + json!({ + "timestamp": "2025-01-02T11:00:00Z", + "type":"event_msg", + "payload": {"type":"user_message","message":"X","kind":"plain"} + }) + .to_string(), + ]; + std::fs::write(file_path, lines.join("\n") + "\n")?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // Filter to only other_provider; expect 1 item, nextCursor None. + let list_id = mcp + .send_thread_list_request(ThreadListParams { + cursor: None, + limit: Some(10), + model_providers: Some(vec!["other_provider".to_string()]), + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + ) + .await??; + let ThreadListResponse { data, next_cursor } = to_response::(resp)?; + assert_eq!(data.len(), 1); + assert_eq!(next_cursor, None); + + Ok(()) +} diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs new file mode 100644 index 00000000..039f542f --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -0,0 +1,79 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_mock_chat_completions_server; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_resume_returns_existing_thread() -> Result<()> { + let server = create_mock_chat_completions_server(vec![]).await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // Start a thread. + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5-codex".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(start_resp)?; + + // Resume it via v2 API. + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { thread: resumed } = + to_response::(resume_resp)?; + assert_eq!(resumed.id, thread.id); + + Ok(()) +} + +// Helper to create a config.toml pointing at the mock model server. +fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs new file mode 100644 index 00000000..8b4cd557 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -0,0 +1,81 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_mock_chat_completions_server; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadStartedNotification; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_start_creates_thread_and_emits_started() -> Result<()> { + // Provide a mock server and config so model wiring is valid. + let server = create_mock_chat_completions_server(vec![]).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + // Start server and initialize. + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // Start a v2 thread with an explicit model override. + let req_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5".to_string()), + ..Default::default() + }) + .await?; + + // Expect a proper JSON-RPC response with a thread id. + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(resp)?; + assert!(!thread.id.is_empty(), "thread id should not be empty"); + + // A corresponding thread/started notification should arrive. + let notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/started"), + ) + .await??; + let started: ThreadStartedNotification = + serde_json::from_value(notif.params.expect("params must be present"))?; + assert_eq!(started.thread.id, thread.id); + + Ok(()) +} + +// Helper to create a config.toml pointing at the mock model server. +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index a045f210..93a31770 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -75,6 +75,7 @@ pub use rollout::find_conversation_path_by_id_str; pub use rollout::list::ConversationItem; pub use rollout::list::ConversationsPage; pub use rollout::list::Cursor; +pub use rollout::list::parse_cursor; pub use rollout::list::read_head_for_summary; mod function_tool; mod state; diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index d1bc82e0..781e3fe3 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -273,7 +273,7 @@ async fn traverse_directories_for_paths( /// Pagination cursor token format: "|" where `file_ts` matches the /// filename timestamp portion (YYYY-MM-DDThh-mm-ss) used in rollout filenames. /// The cursor orders files by timestamp desc, then UUID desc. -fn parse_cursor(token: &str) -> Option { +pub fn parse_cursor(token: &str) -> Option { let (file_ts, uuid_str) = token.split_once('|')?; let Ok(uuid) = Uuid::parse_str(uuid_str) else { diff --git a/codex-rs/core/tests/suite/user_notification.rs b/codex-rs/core/tests/suite/user_notification.rs index 6fc31370..74b59736 100644 --- a/codex-rs/core/tests/suite/user_notification.rs +++ b/codex-rs/core/tests/suite/user_notification.rs @@ -24,6 +24,10 @@ use responses::start_mock_server; use std::time::Duration; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[ignore = "flaky on ubuntu-24.04-arm - aarch64-unknown-linux-gnu"] +// The notify script gets far enough to create (and therefore surface) the file, +// but hasn’t flushed the JSON yet. Reading an empty file produces EOF while parsing +// a value at line 1 column 0. May be caused by a slow runner. async fn summarize_context_three_requests_and_instructions() -> anyhow::Result<()> { skip_if_no_network!(Ok(()));