diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 020acd04..932c1fb0 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -14,6 +14,8 @@ use codex_apply_patch::ApplyPatchAction; use codex_apply_patch::MaybeApplyPatchVerified; use codex_apply_patch::maybe_parse_apply_patch_verified; use codex_login::CodexAuth; +use codex_protocol::protocol::TurnAbortReason; +use codex_protocol::protocol::TurnAbortedEvent; use futures::prelude::*; use mcp_types::CallToolResult; use serde::Serialize; @@ -535,7 +537,7 @@ impl Session { pub fn set_task(&self, task: AgentTask) { let mut state = self.state.lock_unchecked(); if let Some(current_task) = state.current_task.take() { - current_task.abort(); + current_task.abort(TurnAbortReason::Replaced); } state.current_task = Some(task); } @@ -852,13 +854,13 @@ impl Session { .await } - fn abort(&self) { - info!("Aborting existing session"); + fn interrupt_task(&self) { + info!("interrupt received: abort current task, if any"); let mut state = self.state.lock_unchecked(); state.pending_approvals.clear(); state.pending_input.clear(); if let Some(task) = state.current_task.take() { - task.abort(); + task.abort(TurnAbortReason::Interrupted); } } @@ -894,7 +896,7 @@ impl Session { impl Drop for Session { fn drop(&mut self) { - self.abort(); + self.interrupt_task(); } } @@ -964,14 +966,13 @@ impl AgentTask { } } - fn abort(self) { + fn abort(self, reason: TurnAbortReason) { + // TOCTOU? if !self.handle.is_finished() { self.handle.abort(); let event = Event { id: self.sub_id, - msg: EventMsg::Error(ErrorEvent { - message: " Turn interrupted".to_string(), - }), + msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }), }; let tx_event = self.sess.tx_event.clone(); tokio::spawn(async move { @@ -994,7 +995,7 @@ async fn submission_loop( debug!(?sub, "Submission"); match sub.op { Op::Interrupt => { - sess.abort(); + sess.interrupt_task(); } Op::UserInput { items } => { // attempt to inject input into current task @@ -1065,13 +1066,13 @@ async fn submission_loop( } Op::ExecApproval { id, decision } => match decision { ReviewDecision::Abort => { - sess.abort(); + sess.interrupt_task(); } other => sess.notify_approval(&id, other), }, Op::PatchApproval { id, decision } => match decision { ReviewDecision::Abort => { - sess.abort(); + sess.interrupt_task(); } other => sess.notify_approval(&id, other), }, diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 5d64fe62..98d7a1fb 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -21,6 +21,7 @@ use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::TaskCompleteEvent; +use codex_core::protocol::TurnAbortReason; use codex_core::protocol::TurnDiffEvent; use owo_colors::OwoColorize; use owo_colors::Style; @@ -522,6 +523,14 @@ impl EventProcessor for EventProcessorWithHumanOutput { EventMsg::GetHistoryEntryResponse(_) => { // Currently ignored in exec output. } + EventMsg::TurnAborted(abort_reason) => match abort_reason.reason { + TurnAbortReason::Interrupted => { + ts_println!(self, "task interrupted"); + } + TurnAbortReason::Replaced => { + ts_println!(self, "task aborted: replaced by a new task"); + } + }, EventMsg::ShutdownComplete => return CodexStatus::Shutdown, } CodexStatus::Running diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index 3a859fbe..c90fab86 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -77,6 +77,8 @@ pub(crate) struct CodexMessageProcessor { codex_linux_sandbox_exe: Option, conversation_listeners: HashMap>, active_login: Arc>>, + // Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives. + pending_interrupts: Arc>>>, } impl CodexMessageProcessor { @@ -91,6 +93,7 @@ impl CodexMessageProcessor { codex_linux_sandbox_exe, conversation_listeners: HashMap::new(), active_login: Arc::new(Mutex::new(None)), + pending_interrupts: Arc::new(Mutex::new(HashMap::new())), } } @@ -399,13 +402,14 @@ impl CodexMessageProcessor { return; }; - let _ = conversation.submit(Op::Interrupt).await; + // Record the pending interrupt so we can reply when TurnAborted arrives. + { + let mut map = self.pending_interrupts.lock().await; + map.entry(conversation_id.0).or_default().push(request_id); + } - // Apparently CodexConversation does not send an ack for Op::Interrupt, - // so we can reply to the request right away. - self.outgoing - .send_response(request_id, InterruptConversationResponse {}) - .await; + // Submit the interrupt; we'll respond upon TurnAborted. + let _ = conversation.submit(Op::Interrupt).await; } async fn add_conversation_listener( @@ -433,6 +437,7 @@ impl CodexMessageProcessor { self.conversation_listeners .insert(subscription_id, cancel_tx); let outgoing_for_task = self.outgoing.clone(); + let pending_interrupts = self.pending_interrupts.clone(); tokio::spawn(async move { loop { tokio::select! { @@ -473,7 +478,7 @@ impl CodexMessageProcessor { }) .await; - apply_bespoke_event_handling(event, conversation_id, conversation.clone(), outgoing_for_task.clone()).await; + apply_bespoke_event_handling(event.clone(), conversation_id, conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone()).await; } } } @@ -512,6 +517,7 @@ async fn apply_bespoke_event_handling( conversation_id: ConversationId, conversation: Arc, outgoing: Arc, + pending_interrupts: Arc>>>, ) { let Event { id: event_id, msg } = event; match msg { @@ -560,6 +566,22 @@ async fn apply_bespoke_event_handling( on_exec_approval_response(event_id, rx, conversation).await; }); } + // If this is a TurnAborted, reply to any pending interrupt requests. + EventMsg::TurnAborted(turn_aborted_event) => { + let pending = { + let mut map = pending_interrupts.lock().await; + map.remove(&conversation_id.0).unwrap_or_default() + }; + if !pending.is_empty() { + let response = InterruptConversationResponse { + abort_reason: turn_aborted_event.reason, + }; + for rid in pending { + outgoing.send_response(rid, response.clone()).await; + } + } + } + _ => {} } } diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index ff660167..c0d14ece 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -272,6 +272,7 @@ async fn run_codex_tool_session_inner( | EventMsg::TurnDiff(_) | EventMsg::GetHistoryEntryResponse(_) | EventMsg::PlanUpdate(_) + | EventMsg::TurnAborted(_) | EventMsg::ShutdownComplete => { // For now, we do not do anything extra for these // events. Note that diff --git a/codex-rs/mcp-server/src/conversation_loop.rs b/codex-rs/mcp-server/src/conversation_loop.rs index 61f0c95a..7a3ae3e7 100644 --- a/codex-rs/mcp-server/src/conversation_loop.rs +++ b/codex-rs/mcp-server/src/conversation_loop.rs @@ -52,7 +52,6 @@ pub async fn run_conversation_loop( call_id, ) .await; - continue; } EventMsg::Error(_) => { error!("Codex runtime error"); @@ -75,7 +74,6 @@ pub async fn run_conversation_loop( event.id.clone(), ) .await; - continue; } EventMsg::TaskComplete(_) => {} EventMsg::SessionConfigured(_) => { @@ -107,6 +105,7 @@ pub async fn run_conversation_loop( | EventMsg::PatchApplyEnd(_) | EventMsg::GetHistoryEntryResponse(_) | EventMsg::PlanUpdate(_) + | EventMsg::TurnAborted(_) | EventMsg::ShutdownComplete => { // For now, we do not do anything extra for these // events. Note that diff --git a/codex-rs/mcp-server/src/wire_format.rs b/codex-rs/mcp-server/src/wire_format.rs index f8fb53b4..786a512e 100644 --- a/codex-rs/mcp-server/src/wire_format.rs +++ b/codex-rs/mcp-server/src/wire_format.rs @@ -6,6 +6,7 @@ use codex_core::protocol::AskForApproval; use codex_core::protocol::FileChange; use codex_core::protocol::ReviewDecision; use codex_core::protocol::SandboxPolicy; +use codex_core::protocol::TurnAbortReason; use codex_core::protocol_config_types::ReasoningEffort; use codex_core::protocol_config_types::ReasoningSummary; use mcp_types::RequestId; @@ -191,9 +192,11 @@ pub struct InterruptConversationParams { pub conversation_id: ConversationId, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] -pub struct InterruptConversationResponse {} +pub struct InterruptConversationResponse { + pub abort_reason: TurnAbortReason, +} #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(rename_all = "camelCase")] diff --git a/codex-rs/mcp-server/tests/interrupt.rs b/codex-rs/mcp-server/tests/interrupt.rs index b2767f5e..365972e0 100644 --- a/codex-rs/mcp-server/tests/interrupt.rs +++ b/codex-rs/mcp-server/tests/interrupt.rs @@ -5,8 +5,6 @@ use std::path::Path; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use codex_mcp_server::CodexToolCallParam; -use mcp_types::JSONRPCResponse; -use mcp_types::RequestId; use serde_json::json; use tempfile::TempDir; use tokio::time::timeout; @@ -100,22 +98,13 @@ async fn shell_command_interruption() -> anyhow::Result<()> { ) .await?; - // Expect Codex to return an error or interruption response - let codex_response: JSONRPCResponse = timeout( + // Expect Codex to emit a TurnAborted event notification + let _turn_aborted = timeout( DEFAULT_READ_TIMEOUT, - mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)), + mcp_process.read_stream_until_notification_message("turn_aborted"), ) .await??; - assert!( - codex_response - .result - .as_object() - .map(|o| o.contains_key("error")) - .unwrap_or(false), - "Expected an interruption or error result, got: {codex_response:?}" - ); - let codex_reply_request_id = mcp_process .send_codex_reply_tool_call(&session_id, "Second Run: run `sleep 60`") .await?; @@ -131,21 +120,12 @@ async fn shell_command_interruption() -> anyhow::Result<()> { ) .await?; - // Expect Codex to return an error or interruption response - let codex_response: JSONRPCResponse = timeout( + // Expect Codex to emit a TurnAborted event notification + let _turn_aborted = timeout( DEFAULT_READ_TIMEOUT, - mcp_process.read_stream_until_response_message(RequestId::Integer(codex_reply_request_id)), + mcp_process.read_stream_until_notification_message("turn_aborted"), ) .await??; - - assert!( - codex_response - .result - .as_object() - .map(|o| o.contains_key("error")) - .unwrap_or(false), - "Expected an interruption or error result, got: {codex_response:?}" - ); Ok(()) } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index c4f50b4f..4c9ba6cf 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -39,7 +39,7 @@ pub struct Submission { #[non_exhaustive] pub enum Op { /// Abort current task. - /// This server sends no corresponding Event + /// This server sends [`EventMsg::TurnAborted`] in response. Interrupt, /// Input from the user @@ -422,6 +422,8 @@ pub enum EventMsg { PlanUpdate(UpdatePlanArgs), + TurnAborted(TurnAbortedEvent), + /// Notification that the agent is shutting down. ShutdownComplete, } @@ -745,6 +747,18 @@ pub struct Chunk { pub inserted_lines: Vec, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TurnAbortedEvent { + pub reason: TurnAbortReason, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TurnAbortReason { + Interrupted, + Replaced, +} + #[cfg(test)] mod tests { use super::*; diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 84aee144..88b816d9 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -635,6 +635,7 @@ impl ChatWidget<'_> { EventMsg::TaskComplete(TaskCompleteEvent { .. }) => self.on_task_complete(), EventMsg::TokenCount(token_usage) => self.on_token_count(token_usage), EventMsg::Error(ErrorEvent { message }) => self.on_error(message), + EventMsg::TurnAborted(_) => self.on_error("Turn interrupted".to_owned()), EventMsg::PlanUpdate(update) => self.on_plan_update(update), EventMsg::ExecApprovalRequest(ev) => self.on_exec_approval_request(id, ev), EventMsg::ApplyPatchApprovalRequest(ev) => self.on_apply_patch_approval_request(id, ev),