diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 8ece9bbc..a7ae035d 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -133,6 +133,18 @@ client_request_definitions! { params: v2::ThreadCompactParams, response: v2::ThreadCompactResponse, }, + #[serde(rename = "turn/start")] + #[ts(rename = "turn/start")] + TurnStart { + params: v2::TurnStartParams, + response: v2::TurnStartResponse, + }, + #[serde(rename = "turn/interrupt")] + #[ts(rename = "turn/interrupt")] + TurnInterrupt { + params: v2::TurnInterruptParams, + response: v2::TurnInterruptResponse, + }, #[serde(rename = "model/list")] #[ts(rename = "model/list")] diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index ccc55723..436585ba 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -5,8 +5,10 @@ use crate::protocol::common::AuthMode; use codex_protocol::ConversationId; use codex_protocol::account::PlanType; use codex_protocol::config_types::ReasoningEffort; +use codex_protocol::config_types::ReasoningSummary; use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow as CoreRateLimitWindow; +use codex_protocol::user_input::UserInput as CoreUserInput; use mcp_types::ContentBlock as McpContentBlock; use schemars::JsonSchema; use serde::Deserialize; @@ -365,6 +367,13 @@ pub struct Turn { pub error: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnError { + pub message: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -375,15 +384,51 @@ pub enum TurnStatus { InProgress, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +// Turn APIs +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] -pub struct TurnError { - pub message: String, +pub struct TurnStartParams { + pub thread_id: String, + pub input: Vec, + /// Override the working directory for this turn and subsequent turns. + pub cwd: Option, + /// Override the approval policy for this turn and subsequent turns. + pub approval_policy: Option, + /// Override the sandbox policy for this turn and subsequent turns. + pub sandbox_policy: Option, + /// Override the model for this turn and subsequent turns. + pub model: Option, + /// Override the reasoning effort for this turn and subsequent turns. + pub effort: Option, + /// Override the reasoning summary for this turn and subsequent turns. + pub summary: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnStartResponse { + pub turn: Turn, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnInterruptParams { + pub thread_id: String, + pub turn_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnInterruptResponse {} + +// User input types #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(tag = "type", rename_all = "camelCase")] +#[ts(tag = "type")] #[ts(export_to = "v2/")] pub enum UserInput { Text { text: String }, @@ -391,8 +436,19 @@ pub enum UserInput { LocalImage { path: PathBuf }, } +impl UserInput { + pub fn into_core(self) -> CoreUserInput { + match self { + UserInput::Text { text } => CoreUserInput::Text { text }, + UserInput::Image { url } => CoreUserInput::Image { image_url: url }, + UserInput::LocalImage { path } => CoreUserInput::LocalImage { path }, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(tag = "type", rename_all = "camelCase")] +#[ts(tag = "type")] #[ts(export_to = "v2/")] pub enum ThreadItem { UserMessage { @@ -516,7 +572,7 @@ pub struct TodoItem { } // === Server Notifications === - +// Thread/Turn lifecycle notifications and item progress events #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -545,6 +601,7 @@ pub struct Usage { #[ts(export_to = "v2/")] pub struct TurnCompletedNotification { pub turn: Turn, + // TODO: should usage be stored on the Turn object, and we return that instead? pub usage: Usage, } @@ -562,6 +619,7 @@ pub struct ItemCompletedNotification { pub item: ThreadItem, } +// Item-specific progress notifications #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index e280e563..ddf777de 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -13,6 +13,7 @@ use codex_app_server_protocol::ApplyPatchApprovalParams; use codex_app_server_protocol::ApplyPatchApprovalResponse; use codex_app_server_protocol::ArchiveConversationParams; use codex_app_server_protocol::ArchiveConversationResponse; +use codex_app_server_protocol::AskForApproval; use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::AuthStatusChangeNotification; use codex_app_server_protocol::CancelLoginAccountParams; @@ -29,6 +30,8 @@ use codex_app_server_protocol::FeedbackUploadResponse; use codex_app_server_protocol::FuzzyFileSearchParams; use codex_app_server_protocol::FuzzyFileSearchResponse; use codex_app_server_protocol::GetAccountRateLimitsResponse; +use codex_app_server_protocol::GetAuthStatusParams; +use codex_app_server_protocol::GetAuthStatusResponse; use codex_app_server_protocol::GetConversationSummaryParams; use codex_app_server_protocol::GetConversationSummaryResponse; use codex_app_server_protocol::GetUserAgentResponse; @@ -45,6 +48,8 @@ use codex_app_server_protocol::LoginApiKeyParams; use codex_app_server_protocol::LoginApiKeyResponse; use codex_app_server_protocol::LoginChatGptCompleteNotification; use codex_app_server_protocol::LoginChatGptResponse; +use codex_app_server_protocol::LogoutAccountResponse; +use codex_app_server_protocol::LogoutChatGptResponse; use codex_app_server_protocol::ModelListParams; use codex_app_server_protocol::ModelListResponse; use codex_app_server_protocol::NewConversationParams; @@ -54,6 +59,8 @@ use codex_app_server_protocol::RemoveConversationSubscriptionResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::Result as JsonRpcResult; use codex_app_server_protocol::ResumeConversationParams; +use codex_app_server_protocol::ResumeConversationResponse; +use codex_app_server_protocol::SandboxMode; use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::SendUserTurnParams; @@ -66,6 +73,7 @@ use codex_app_server_protocol::SetDefaultModelResponse; use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadArchiveResponse; +use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadResumeParams; @@ -73,7 +81,15 @@ use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; +use codex_app_server_protocol::Turn; +use codex_app_server_protocol::TurnInterruptParams; +use codex_app_server_protocol::TurnInterruptResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::TurnStartedNotification; +use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInfoResponse; +use codex_app_server_protocol::UserInput as V2UserInput; use codex_app_server_protocol::UserSavedConfig; use codex_backend_client::Client as BackendClient; use codex_core::AuthManager; @@ -136,6 +152,9 @@ use tracing::info; use tracing::warn; use uuid::Uuid; +type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>; +type PendingInterrupts = Arc>>; + // Duration before a ChatGPT login attempt is abandoned. const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60); struct ActiveLogin { @@ -159,11 +178,17 @@ pub(crate) struct CodexMessageProcessor { conversation_listeners: HashMap>, active_login: Arc>>, // Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives. - pending_interrupts: Arc>>>, + pending_interrupts: PendingInterrupts, pending_fuzzy_searches: Arc>>>, feedback: CodexFeedback, } +#[derive(Clone, Copy, Debug)] +enum ApiVersion { + V1, + V2, +} + impl CodexMessageProcessor { pub fn new( auth_manager: Arc, @@ -192,7 +217,7 @@ impl CodexMessageProcessor { ClientRequest::Initialize { .. } => { panic!("Initialize should be handled in MessageProcessor"); } - // === v2 Thread APIs === + // === v2 Thread/Turn APIs === ClientRequest::ThreadStart { request_id, params } => { self.thread_start(request_id, params).await; } @@ -212,6 +237,12 @@ impl CodexMessageProcessor { self.send_unimplemented_error(request_id, "thread/compact") .await; } + ClientRequest::TurnStart { request_id, params } => { + self.turn_start(request_id, params).await; + } + ClientRequest::TurnInterrupt { request_id, params } => { + self.turn_interrupt(request_id, params).await; + } ClientRequest::NewConversation { request_id, params } => { // Do not tokio::spawn() to process new_conversation() // asynchronously because we need to ensure the conversation is @@ -731,10 +762,7 @@ impl CodexMessageProcessor { match self.logout_common().await { Ok(current_auth_method) => { self.outgoing - .send_response( - request_id, - codex_app_server_protocol::LogoutChatGptResponse {}, - ) + .send_response(request_id, LogoutChatGptResponse {}) .await; let payload = AuthStatusChangeNotification { @@ -754,10 +782,7 @@ impl CodexMessageProcessor { match self.logout_common().await { Ok(current_auth_method) => { self.outgoing - .send_response( - request_id, - codex_app_server_protocol::LogoutAccountResponse {}, - ) + .send_response(request_id, LogoutAccountResponse {}) .await; let payload_v2 = AccountUpdatedNotification { @@ -773,11 +798,7 @@ impl CodexMessageProcessor { } } - async fn get_auth_status( - &self, - request_id: RequestId, - params: codex_app_server_protocol::GetAuthStatusParams, - ) { + async fn get_auth_status(&self, request_id: RequestId, params: GetAuthStatusParams) { let include_token = params.include_token.unwrap_or(false); let do_refresh = params.refresh_token.unwrap_or(false); @@ -791,7 +812,7 @@ impl CodexMessageProcessor { let requires_openai_auth = self.config.model_provider.requires_openai_auth; let response = if !requires_openai_auth { - codex_app_server_protocol::GetAuthStatusResponse { + GetAuthStatusResponse { auth_method: None, auth_token: None, requires_openai_auth: Some(false), @@ -811,13 +832,13 @@ impl CodexMessageProcessor { (None, None) } }; - codex_app_server_protocol::GetAuthStatusResponse { + GetAuthStatusResponse { auth_method: reported_auth_method, auth_token: token_opt, requires_openai_auth: Some(true), } } - None => codex_app_server_protocol::GetAuthStatusResponse { + None => GetAuthStatusResponse { auth_method: None, auth_token: None, requires_openai_auth: Some(true), @@ -1101,12 +1122,8 @@ impl CodexMessageProcessor { let overrides = ConfigOverrides { model: params.model, cwd: params.cwd.map(PathBuf::from), - approval_policy: params - .approval_policy - .map(codex_app_server_protocol::AskForApproval::to_core), - sandbox_mode: params - .sandbox - .map(codex_app_server_protocol::SandboxMode::to_core), + approval_policy: params.approval_policy.map(AskForApproval::to_core), + sandbox_mode: params.sandbox.map(SandboxMode::to_core), model_provider: params.model_provider, codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(), base_instructions: params.base_instructions, @@ -1431,60 +1448,22 @@ impl CodexMessageProcessor { let ListConversationsParams { page_size, cursor, - model_providers: model_provider, + model_providers, } = params; - let page_size = page_size.unwrap_or(25); - let cursor_obj: Option = cursor.as_ref().and_then(|s| parse_cursor(s)); - let cursor_ref = cursor_obj.as_ref(); - let model_provider_filter = match model_provider { - Some(providers) => { - if providers.is_empty() { - None - } else { - Some(providers) - } - } - None => Some(vec![self.config.model_provider_id.clone()]), - }; - let model_provider_slice = model_provider_filter.as_deref(); - let fallback_provider = self.config.model_provider_id.clone(); + let page_size = page_size.unwrap_or(25).max(1); - let page = match RolloutRecorder::list_conversations( - &self.config.codex_home, - page_size, - cursor_ref, - INTERACTIVE_SESSION_SOURCES, - model_provider_slice, - fallback_provider.as_str(), - ) - .await + match self + .list_conversations_common(page_size, cursor, model_providers) + .await { - Ok(p) => p, - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to list conversations: {err}"), - data: None, - }; + Ok((items, next_cursor)) => { + let response = ListConversationsResponse { items, next_cursor }; + self.outgoing.send_response(request_id, response).await; + } + Err(error) => { self.outgoing.send_error(request_id, error).await; - return; } }; - - let items = page - .items - .into_iter() - .filter_map(|it| extract_conversation_summary(it.path, &it.head, &fallback_provider)) - .collect(); - - // Encode next_cursor as a plain string - let next_cursor = page - .next_cursor - .and_then(|cursor| serde_json::to_value(&cursor).ok()) - .and_then(|value| value.as_str().map(str::to_owned)); - - let response = ListConversationsResponse { items, next_cursor }; - self.outgoing.send_response(request_id, response).await; } async fn list_conversations_common( @@ -1534,6 +1513,7 @@ impl CodexMessageProcessor { .filter_map(|it| extract_conversation_summary(it.path, &it.head, &fallback_provider)) .collect::>(); + // Encode next_cursor as a plain string let next_cursor = page .next_cursor .and_then(|cursor| serde_json::to_value(&cursor).ok()) @@ -1557,8 +1537,8 @@ impl CodexMessageProcessor { return; } - let effective_page_size = limit.unwrap_or(total as u32).max(1) as usize; - let effective_page_size = effective_page_size.min(total); + let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; + let effective_limit = effective_limit.min(total); let start = match cursor { Some(cursor) => match cursor.parse::() { Ok(idx) => idx, @@ -1585,7 +1565,7 @@ impl CodexMessageProcessor { return; } - let end = start.saturating_add(effective_page_size).min(total); + let end = start.saturating_add(effective_limit).min(total); let items = models[start..end].to_vec(); let next_cursor = if end < total { Some(end.to_string()) @@ -1620,7 +1600,7 @@ impl CodexMessageProcessor { profile, cwd, approval_policy, - sandbox, + sandbox: sandbox_mode, config: cli_overrides, base_instructions, developer_instructions, @@ -1633,7 +1613,7 @@ impl CodexMessageProcessor { config_profile: profile, cwd: cwd.map(PathBuf::from), approval_policy, - sandbox_mode: sandbox, + sandbox_mode, model_provider, codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(), base_instructions, @@ -1759,7 +1739,7 @@ impl CodexMessageProcessor { .map(|msgs| msgs.into_iter().collect()); // Reply with conversation id + model and initial messages (when present) - let response = codex_app_server_protocol::ResumeConversationResponse { + let response = ResumeConversationResponse { conversation_id, model: session_configured.model.clone(), initial_messages, @@ -2070,7 +2050,151 @@ impl CodexMessageProcessor { // Record the pending interrupt so we can reply when TurnAborted arrives. { let mut map = self.pending_interrupts.lock().await; - map.entry(conversation_id).or_default().push(request_id); + map.entry(conversation_id) + .or_default() + .push((request_id, ApiVersion::V1)); + } + + // Submit the interrupt; we'll respond upon TurnAborted. + let _ = conversation.submit(Op::Interrupt).await; + } + + async fn turn_start(&self, request_id: RequestId, params: TurnStartParams) { + // Resolve conversation id from v2 thread id string. + let conversation_id = match ConversationId::from_string(¶ms.thread_id) { + Ok(id) => id, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid thread id: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let Ok(conversation) = self + .conversation_manager + .get_conversation(conversation_id) + .await + else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("conversation not found: {conversation_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; + + // Keep a copy of v2 inputs for the notification payload. + let v2_inputs_for_notif = params.input.clone(); + + // Map v2 input items to core input items. + let mapped_items: Vec = params + .input + .into_iter() + .map(V2UserInput::into_core) + .collect(); + + let has_any_overrides = params.cwd.is_some() + || params.approval_policy.is_some() + || params.sandbox_policy.is_some() + || params.model.is_some() + || params.effort.is_some() + || params.summary.is_some(); + + // If any overrides are provided, update the session turn context first. + if has_any_overrides { + let _ = conversation + .submit(Op::OverrideTurnContext { + cwd: params.cwd, + approval_policy: params.approval_policy.map(AskForApproval::to_core), + sandbox_policy: params.sandbox_policy.map(|p| p.to_core()), + model: params.model, + effort: params.effort.map(Some), + summary: params.summary, + }) + .await; + } + + // Start the turn by submitting the user input. Return its submission id as turn_id. + let turn_id = conversation + .submit(Op::UserInput { + items: mapped_items, + }) + .await; + + match turn_id { + Ok(turn_id) => { + let turn = Turn { + id: turn_id.clone(), + items: vec![ThreadItem::UserMessage { + id: turn_id, + content: v2_inputs_for_notif, + }], + status: TurnStatus::InProgress, + error: None, + }; + + let response = TurnStartResponse { turn: turn.clone() }; + self.outgoing.send_response(request_id, response).await; + + // Emit v2 turn/started notification. + let notif = TurnStartedNotification { turn }; + self.outgoing + .send_server_notification(ServerNotification::TurnStarted(notif)) + .await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to start turn: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + + async fn turn_interrupt(&mut self, request_id: RequestId, params: TurnInterruptParams) { + let TurnInterruptParams { thread_id, .. } = params; + + // Resolve conversation id from v2 thread id string. + let conversation_id = match ConversationId::from_string(&thread_id) { + Ok(id) => id, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid thread id: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let Ok(conversation) = self + .conversation_manager + .get_conversation(conversation_id) + .await + else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("conversation not found: {conversation_id}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + }; + + // Record the pending interrupt so we can reply when TurnAborted arrives. + { + let mut map = self.pending_interrupts.lock().await; + map.entry(conversation_id) + .or_default() + .push((request_id, ApiVersion::V2)); } // Submit the interrupt; we'll respond upon TurnAborted. @@ -2086,7 +2210,6 @@ impl CodexMessageProcessor { conversation_id, experimental_raw_events, } = params; - match self .attach_conversation_listener(conversation_id, experimental_raw_events) .await @@ -2213,7 +2336,6 @@ impl CodexMessageProcessor { } } }); - Ok(subscription_id) } @@ -2355,7 +2477,7 @@ async fn apply_bespoke_event_handling( conversation_id: ConversationId, conversation: Arc, outgoing: Arc, - pending_interrupts: Arc>>>, + pending_interrupts: PendingInterrupts, ) { let Event { id: event_id, msg } = event; match msg { @@ -2424,11 +2546,19 @@ async fn apply_bespoke_event_handling( map.remove(&conversation_id).unwrap_or_default() }; if !pending.is_empty() { - let response = InterruptConversationResponse { - abort_reason: turn_aborted_event.reason, - }; - for rid in pending { - outgoing.send_response(rid, response.clone()).await; + for (rid, ver) in pending { + match ver { + ApiVersion::V1 => { + let response = InterruptConversationResponse { + abort_reason: turn_aborted_event.reason.clone(), + }; + outgoing.send_response(rid, response).await; + } + ApiVersion::V2 => { + let response = TurnInterruptResponse {}; + outgoing.send_response(rid, response).await; + } + } } } } diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 59a8f20d..b4404b47 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -22,11 +22,17 @@ use codex_app_server_protocol::FeedbackUploadParams; use codex_app_server_protocol::GetAuthStatusParams; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::InterruptConversationParams; +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::ListConversationsParams; use codex_app_server_protocol::LoginApiKeyParams; use codex_app_server_protocol::ModelListParams; use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::RemoveConversationListenerParams; +use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ResumeConversationParams; use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserTurnParams; @@ -36,13 +42,8 @@ use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadStartParams; - -use codex_app_server_protocol::JSONRPCError; -use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCNotification; -use codex_app_server_protocol::JSONRPCRequest; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::TurnInterruptParams; +use codex_app_server_protocol::TurnStartParams; use std::process::Command as StdCommand; use tokio::process::Command; @@ -249,7 +250,7 @@ impl McpProcess { } /// Send a `feedback/upload` JSON-RPC request. - pub async fn send_upload_feedback_request( + pub async fn send_feedback_upload_request( &mut self, params: FeedbackUploadParams, ) -> anyhow::Result { @@ -348,6 +349,24 @@ impl McpProcess { self.send_request("loginChatGpt", None).await } + /// Send a `turn/start` JSON-RPC request (v2). + pub async fn send_turn_start_request( + &mut self, + params: TurnStartParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("turn/start", params).await + } + + /// Send a `turn/interrupt` JSON-RPC request (v2). + pub async fn send_turn_interrupt_request( + &mut self, + params: TurnInterruptParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("turn/interrupt", params).await + } + /// Send a `cancelLoginChatGpt` JSON-RPC request. pub async fn send_cancel_login_chat_gpt_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/interrupt.rs b/codex-rs/app-server/tests/suite/interrupt.rs index 1c4c05e3..86b0a3f3 100644 --- a/codex-rs/app-server/tests/suite/interrupt.rs +++ b/codex-rs/app-server/tests/suite/interrupt.rs @@ -146,7 +146,7 @@ fn create_config_toml(codex_home: &Path, server_uri: String) -> std::io::Result< r#" model = "mock-model" approval_policy = "never" -sandbox_mode = "danger-full-access" +sandbox_mode = "read-only" model_provider = "mock_provider" diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index ccaa9eda..df2f3d90 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -1,6 +1,7 @@ -// v2 test suite modules mod account; mod thread_archive; mod thread_list; mod thread_resume; mod thread_start; +mod turn_interrupt; +mod turn_start; diff --git a/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs new file mode 100644 index 00000000..d1deb608 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/turn_interrupt.rs @@ -0,0 +1,128 @@ +#![cfg(unix)] + +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_mock_chat_completions_server; +use app_test_support::create_shell_sse_response; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnInterruptParams; +use codex_app_server_protocol::TurnInterruptResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::UserInput as V2UserInput; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn turn_interrupt_aborts_running_turn() -> Result<()> { + // Use a portable sleep command to keep the turn running. + #[cfg(target_os = "windows")] + let shell_command = vec![ + "powershell".to_string(), + "-Command".to_string(), + "Start-Sleep -Seconds 10".to_string(), + ]; + #[cfg(not(target_os = "windows"))] + let shell_command = vec!["sleep".to_string(), "10".to_string()]; + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let working_directory = tmp.path().join("workdir"); + std::fs::create_dir(&working_directory)?; + + // Mock server: long-running shell command then (after abort) nothing else needed. + let server = create_mock_chat_completions_server(vec![create_shell_sse_response( + shell_command.clone(), + Some(&working_directory), + Some(10_000), + "call_sleep", + )?]) + .await; + create_config_toml(&codex_home, &server.uri())?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // Start a v2 thread and capture its id. + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(thread_resp)?; + + // Start a turn that triggers a long-running command. + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run sleep".to_string(), + }], + cwd: Some(working_directory.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + // Give the command a brief moment to start. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Interrupt the in-progress turn by id (v2 API). + let interrupt_id = mcp + .send_turn_interrupt_request(TurnInterruptParams { + thread_id: thread.id, + turn_id: turn.id, + }) + .await?; + let interrupt_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(interrupt_id)), + ) + .await??; + let _resp: TurnInterruptResponse = to_response::(interrupt_resp)?; + + // No fields to assert on; successful deserialization confirms proper response shape. + Ok(()) +} + +// Helper to create a config.toml pointing at the mock model server. +fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "workspace-write" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs new file mode 100644 index 00000000..b26f01a9 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -0,0 +1,486 @@ +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_final_assistant_message_sse_response; +use app_test_support::create_mock_chat_completions_server; +use app_test_support::create_mock_chat_completions_server_unchecked; +use app_test_support::create_shell_sse_response; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::TurnStartedNotification; +use codex_app_server_protocol::UserInput as V2UserInput; +use codex_core::protocol_config_types::ReasoningEffort; +use codex_core::protocol_config_types::ReasoningSummary; +use codex_protocol::parse_command::ParsedCommand; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use core_test_support::skip_if_no_network; +use pretty_assertions::assert_eq; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<()> { + // Provide a mock server and config so model wiring is valid. + // Three Codex turns hit the mock model (session start + two turn/start calls). + let responses = vec![ + create_final_assistant_message_sse_response("Done")?, + create_final_assistant_message_sse_response("Done")?, + create_final_assistant_message_sse_response("Done")?, + ]; + let server = create_mock_chat_completions_server_unchecked(responses).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri(), "never")?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // Start a thread (v2) and capture its id. + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(thread_resp)?; + + // Start a turn with only input and thread_id set (no overrides). + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Hello".to_string(), + }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + assert!(!turn.id.is_empty()); + + // Expect a turn/started notification. + let notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/started"), + ) + .await??; + let started: TurnStartedNotification = + serde_json::from_value(notif.params.expect("params must be present"))?; + assert_eq!( + started.turn.status, + codex_app_server_protocol::TurnStatus::InProgress + ); + + // Send a second turn that exercises the overrides path: change the model. + let turn_req2 = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Second".to_string(), + }], + model: Some("mock-model-override".to_string()), + ..Default::default() + }) + .await?; + let turn_resp2: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req2)), + ) + .await??; + let TurnStartResponse { turn: turn2 } = to_response::(turn_resp2)?; + assert!(!turn2.id.is_empty()); + // Ensure the second turn has a different id than the first. + assert_ne!(turn.id, turn2.id); + + // Expect a second turn/started notification as well. + let _notif2: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/started"), + ) + .await??; + + // And we should ultimately get a task_complete without having to add a + // legacy conversation listener explicitly (auto-attached by thread/start). + let _task_complete: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + Ok(()) +} + +#[tokio::test] +async fn turn_start_accepts_local_image_input() -> Result<()> { + // Two Codex turns hit the mock model (session start + turn/start). + let responses = vec![ + create_final_assistant_message_sse_response("Done")?, + create_final_assistant_message_sse_response("Done")?, + ]; + // Use the unchecked variant because the request payload includes a LocalImage + // which the strict matcher does not currently cover. + let server = create_mock_chat_completions_server_unchecked(responses).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri(), "never")?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(thread_resp)?; + + let image_path = codex_home.path().join("image.png"); + // No need to actually write the file; we just exercise the input path. + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::LocalImage { path: image_path }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + assert!(!turn.id.is_empty()); + + // This test only validates that turn/start responds and returns a turn. + Ok(()) +} + +#[tokio::test] +async fn turn_start_exec_approval_toggle_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().to_path_buf(); + + // Mock server: first turn requests a shell call (elicitation), then completes. + // Second turn same, but we'll set approval_policy=never to avoid elicitation. + let responses = vec![ + create_shell_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + None, + Some(5000), + "call1", + )?, + create_final_assistant_message_sse_response("done 1")?, + create_shell_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + None, + Some(5000), + "call2", + )?, + create_final_assistant_message_sse_response("done 2")?, + ]; + let server = create_mock_chat_completions_server(responses).await; + // Default approval is untrusted to force elicitation on first turn. + create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?; + + let mut mcp = McpProcess::new(codex_home.as_path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // thread/start + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(start_resp)?; + + // turn/start — expect ExecCommandApproval request from server + let first_turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run python".to_string(), + }], + ..Default::default() + }) + .await?; + // Acknowledge RPC + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_turn_id)), + ) + .await??; + + // Receive elicitation + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::ExecCommandApproval { request_id, params } = server_req else { + panic!("expected ExecCommandApproval request"); + }; + assert_eq!(params.call_id, "call1"); + assert_eq!( + params.parsed_cmd, + vec![ParsedCommand::Unknown { + cmd: "python3 -c 'print(42)'".to_string() + }] + ); + + // Approve and wait for task completion + mcp.send_response( + request_id, + serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }), + ) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + // Second turn with approval_policy=never should not elicit approval + let second_turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run python again".to_string(), + }], + approval_policy: Some(codex_app_server_protocol::AskForApproval::Never), + sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess), + model: Some("mock-model".to_string()), + effort: Some(ReasoningEffort::Medium), + summary: Some(ReasoningSummary::Auto), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_turn_id)), + ) + .await??; + + // Ensure we do NOT receive an ExecCommandApproval request before task completes + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + Ok(()) +} + +#[tokio::test] +async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { + // When returning Result from a test, pass an Ok(()) to the skip macro + // so the early return type matches. The no-arg form returns unit. + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace_root = tmp.path().join("workspace"); + std::fs::create_dir(&workspace_root)?; + let first_cwd = workspace_root.join("turn1"); + let second_cwd = workspace_root.join("turn2"); + std::fs::create_dir(&first_cwd)?; + std::fs::create_dir(&second_cwd)?; + + let responses = vec![ + create_shell_sse_response( + vec![ + "bash".to_string(), + "-lc".to_string(), + "echo first turn".to_string(), + ], + None, + Some(5000), + "call-first", + )?, + create_final_assistant_message_sse_response("done first")?, + create_shell_sse_response( + vec![ + "bash".to_string(), + "-lc".to_string(), + "echo second turn".to_string(), + ], + None, + Some(5000), + "call-second", + )?, + create_final_assistant_message_sse_response("done second")?, + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(&codex_home, &server.uri(), "untrusted")?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + // thread/start + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread } = to_response::(start_resp)?; + + // first turn with workspace-write sandbox and first_cwd + let first_turn = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "first turn".to_string(), + }], + cwd: Some(first_cwd.clone()), + approval_policy: Some(codex_app_server_protocol::AskForApproval::Never), + sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::WorkspaceWrite { + writable_roots: vec![first_cwd.clone()], + network_access: false, + exclude_tmpdir_env_var: false, + exclude_slash_tmp: false, + }), + model: Some("mock-model".to_string()), + effort: Some(ReasoningEffort::Medium), + summary: Some(ReasoningSummary::Auto), + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_turn)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + // second turn with workspace-write and second_cwd, ensure exec begins in second_cwd + let second_turn = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "second turn".to_string(), + }], + cwd: Some(second_cwd.clone()), + approval_policy: Some(codex_app_server_protocol::AskForApproval::Never), + sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess), + model: Some("mock-model".to_string()), + effort: Some(ReasoningEffort::Medium), + summary: Some(ReasoningSummary::Auto), + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_turn)), + ) + .await??; + + let exec_begin_notification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/exec_command_begin"), + ) + .await??; + let params = exec_begin_notification + .params + .clone() + .expect("exec_command_begin params"); + let event: Event = serde_json::from_value(params).expect("deserialize exec begin event"); + let exec_begin = match event.msg { + EventMsg::ExecCommandBegin(exec_begin) => exec_begin, + other => panic!("expected ExecCommandBegin event, got {other:?}"), + }; + assert_eq!(exec_begin.cwd, second_cwd); + assert_eq!( + exec_begin.command, + vec![ + "bash".to_string(), + "-lc".to_string(), + "echo second turn".to_string() + ] + ); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + Ok(()) +} + +// Helper to create a config.toml pointing at the mock model server. +fn create_config_toml( + codex_home: &Path, + server_uri: &str, + approval_policy: &str, +) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "{approval_policy}" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "chat" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +}