fix: introduce EventMsg::TurnAborted (#2365)
Introduces `EventMsg::TurnAborted` that should be sent in response to `Op::Interrupt`. In the MCP server, updates the handling of a `ClientRequest::InterruptConversation` request such that it sends the `Op::Interrupt` but does not respond to the request until it sees an `EventMsg::TurnAborted`.
This commit is contained in:
@@ -14,6 +14,8 @@ use codex_apply_patch::ApplyPatchAction;
|
|||||||
use codex_apply_patch::MaybeApplyPatchVerified;
|
use codex_apply_patch::MaybeApplyPatchVerified;
|
||||||
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
||||||
use codex_login::CodexAuth;
|
use codex_login::CodexAuth;
|
||||||
|
use codex_protocol::protocol::TurnAbortReason;
|
||||||
|
use codex_protocol::protocol::TurnAbortedEvent;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use mcp_types::CallToolResult;
|
use mcp_types::CallToolResult;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
@@ -535,7 +537,7 @@ impl Session {
|
|||||||
pub fn set_task(&self, task: AgentTask) {
|
pub fn set_task(&self, task: AgentTask) {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock_unchecked();
|
||||||
if let Some(current_task) = state.current_task.take() {
|
if let Some(current_task) = state.current_task.take() {
|
||||||
current_task.abort();
|
current_task.abort(TurnAbortReason::Replaced);
|
||||||
}
|
}
|
||||||
state.current_task = Some(task);
|
state.current_task = Some(task);
|
||||||
}
|
}
|
||||||
@@ -852,13 +854,13 @@ impl Session {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn abort(&self) {
|
fn interrupt_task(&self) {
|
||||||
info!("Aborting existing session");
|
info!("interrupt received: abort current task, if any");
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock_unchecked();
|
||||||
state.pending_approvals.clear();
|
state.pending_approvals.clear();
|
||||||
state.pending_input.clear();
|
state.pending_input.clear();
|
||||||
if let Some(task) = state.current_task.take() {
|
if let Some(task) = state.current_task.take() {
|
||||||
task.abort();
|
task.abort(TurnAbortReason::Interrupted);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -894,7 +896,7 @@ impl Session {
|
|||||||
|
|
||||||
impl Drop for Session {
|
impl Drop for Session {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.abort();
|
self.interrupt_task();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -964,14 +966,13 @@ impl AgentTask {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn abort(self) {
|
fn abort(self, reason: TurnAbortReason) {
|
||||||
|
// TOCTOU?
|
||||||
if !self.handle.is_finished() {
|
if !self.handle.is_finished() {
|
||||||
self.handle.abort();
|
self.handle.abort();
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: self.sub_id,
|
id: self.sub_id,
|
||||||
msg: EventMsg::Error(ErrorEvent {
|
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
|
||||||
message: " Turn interrupted".to_string(),
|
|
||||||
}),
|
|
||||||
};
|
};
|
||||||
let tx_event = self.sess.tx_event.clone();
|
let tx_event = self.sess.tx_event.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -994,7 +995,7 @@ async fn submission_loop(
|
|||||||
debug!(?sub, "Submission");
|
debug!(?sub, "Submission");
|
||||||
match sub.op {
|
match sub.op {
|
||||||
Op::Interrupt => {
|
Op::Interrupt => {
|
||||||
sess.abort();
|
sess.interrupt_task();
|
||||||
}
|
}
|
||||||
Op::UserInput { items } => {
|
Op::UserInput { items } => {
|
||||||
// attempt to inject input into current task
|
// attempt to inject input into current task
|
||||||
@@ -1065,13 +1066,13 @@ async fn submission_loop(
|
|||||||
}
|
}
|
||||||
Op::ExecApproval { id, decision } => match decision {
|
Op::ExecApproval { id, decision } => match decision {
|
||||||
ReviewDecision::Abort => {
|
ReviewDecision::Abort => {
|
||||||
sess.abort();
|
sess.interrupt_task();
|
||||||
}
|
}
|
||||||
other => sess.notify_approval(&id, other),
|
other => sess.notify_approval(&id, other),
|
||||||
},
|
},
|
||||||
Op::PatchApproval { id, decision } => match decision {
|
Op::PatchApproval { id, decision } => match decision {
|
||||||
ReviewDecision::Abort => {
|
ReviewDecision::Abort => {
|
||||||
sess.abort();
|
sess.interrupt_task();
|
||||||
}
|
}
|
||||||
other => sess.notify_approval(&id, other),
|
other => sess.notify_approval(&id, other),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ use codex_core::protocol::PatchApplyBeginEvent;
|
|||||||
use codex_core::protocol::PatchApplyEndEvent;
|
use codex_core::protocol::PatchApplyEndEvent;
|
||||||
use codex_core::protocol::SessionConfiguredEvent;
|
use codex_core::protocol::SessionConfiguredEvent;
|
||||||
use codex_core::protocol::TaskCompleteEvent;
|
use codex_core::protocol::TaskCompleteEvent;
|
||||||
|
use codex_core::protocol::TurnAbortReason;
|
||||||
use codex_core::protocol::TurnDiffEvent;
|
use codex_core::protocol::TurnDiffEvent;
|
||||||
use owo_colors::OwoColorize;
|
use owo_colors::OwoColorize;
|
||||||
use owo_colors::Style;
|
use owo_colors::Style;
|
||||||
@@ -522,6 +523,14 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
|||||||
EventMsg::GetHistoryEntryResponse(_) => {
|
EventMsg::GetHistoryEntryResponse(_) => {
|
||||||
// Currently ignored in exec output.
|
// Currently ignored in exec output.
|
||||||
}
|
}
|
||||||
|
EventMsg::TurnAborted(abort_reason) => match abort_reason.reason {
|
||||||
|
TurnAbortReason::Interrupted => {
|
||||||
|
ts_println!(self, "task interrupted");
|
||||||
|
}
|
||||||
|
TurnAbortReason::Replaced => {
|
||||||
|
ts_println!(self, "task aborted: replaced by a new task");
|
||||||
|
}
|
||||||
|
},
|
||||||
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
|
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
|
||||||
}
|
}
|
||||||
CodexStatus::Running
|
CodexStatus::Running
|
||||||
|
|||||||
@@ -77,6 +77,8 @@ pub(crate) struct CodexMessageProcessor {
|
|||||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||||
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
|
conversation_listeners: HashMap<Uuid, oneshot::Sender<()>>,
|
||||||
active_login: Arc<Mutex<Option<ActiveLogin>>>,
|
active_login: Arc<Mutex<Option<ActiveLogin>>>,
|
||||||
|
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
|
||||||
|
pending_interrupts: Arc<Mutex<HashMap<Uuid, Vec<RequestId>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CodexMessageProcessor {
|
impl CodexMessageProcessor {
|
||||||
@@ -91,6 +93,7 @@ impl CodexMessageProcessor {
|
|||||||
codex_linux_sandbox_exe,
|
codex_linux_sandbox_exe,
|
||||||
conversation_listeners: HashMap::new(),
|
conversation_listeners: HashMap::new(),
|
||||||
active_login: Arc::new(Mutex::new(None)),
|
active_login: Arc::new(Mutex::new(None)),
|
||||||
|
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -399,13 +402,14 @@ impl CodexMessageProcessor {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = conversation.submit(Op::Interrupt).await;
|
// Record the pending interrupt so we can reply when TurnAborted arrives.
|
||||||
|
{
|
||||||
|
let mut map = self.pending_interrupts.lock().await;
|
||||||
|
map.entry(conversation_id.0).or_default().push(request_id);
|
||||||
|
}
|
||||||
|
|
||||||
// Apparently CodexConversation does not send an ack for Op::Interrupt,
|
// Submit the interrupt; we'll respond upon TurnAborted.
|
||||||
// so we can reply to the request right away.
|
let _ = conversation.submit(Op::Interrupt).await;
|
||||||
self.outgoing
|
|
||||||
.send_response(request_id, InterruptConversationResponse {})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn add_conversation_listener(
|
async fn add_conversation_listener(
|
||||||
@@ -433,6 +437,7 @@ impl CodexMessageProcessor {
|
|||||||
self.conversation_listeners
|
self.conversation_listeners
|
||||||
.insert(subscription_id, cancel_tx);
|
.insert(subscription_id, cancel_tx);
|
||||||
let outgoing_for_task = self.outgoing.clone();
|
let outgoing_for_task = self.outgoing.clone();
|
||||||
|
let pending_interrupts = self.pending_interrupts.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@@ -473,7 +478,7 @@ impl CodexMessageProcessor {
|
|||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
apply_bespoke_event_handling(event, conversation_id, conversation.clone(), outgoing_for_task.clone()).await;
|
apply_bespoke_event_handling(event.clone(), conversation_id, conversation.clone(), outgoing_for_task.clone(), pending_interrupts.clone()).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -512,6 +517,7 @@ async fn apply_bespoke_event_handling(
|
|||||||
conversation_id: ConversationId,
|
conversation_id: ConversationId,
|
||||||
conversation: Arc<CodexConversation>,
|
conversation: Arc<CodexConversation>,
|
||||||
outgoing: Arc<OutgoingMessageSender>,
|
outgoing: Arc<OutgoingMessageSender>,
|
||||||
|
pending_interrupts: Arc<Mutex<HashMap<Uuid, Vec<RequestId>>>>,
|
||||||
) {
|
) {
|
||||||
let Event { id: event_id, msg } = event;
|
let Event { id: event_id, msg } = event;
|
||||||
match msg {
|
match msg {
|
||||||
@@ -560,6 +566,22 @@ async fn apply_bespoke_event_handling(
|
|||||||
on_exec_approval_response(event_id, rx, conversation).await;
|
on_exec_approval_response(event_id, rx, conversation).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
// If this is a TurnAborted, reply to any pending interrupt requests.
|
||||||
|
EventMsg::TurnAborted(turn_aborted_event) => {
|
||||||
|
let pending = {
|
||||||
|
let mut map = pending_interrupts.lock().await;
|
||||||
|
map.remove(&conversation_id.0).unwrap_or_default()
|
||||||
|
};
|
||||||
|
if !pending.is_empty() {
|
||||||
|
let response = InterruptConversationResponse {
|
||||||
|
abort_reason: turn_aborted_event.reason,
|
||||||
|
};
|
||||||
|
for rid in pending {
|
||||||
|
outgoing.send_response(rid, response.clone()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -272,6 +272,7 @@ async fn run_codex_tool_session_inner(
|
|||||||
| EventMsg::TurnDiff(_)
|
| EventMsg::TurnDiff(_)
|
||||||
| EventMsg::GetHistoryEntryResponse(_)
|
| EventMsg::GetHistoryEntryResponse(_)
|
||||||
| EventMsg::PlanUpdate(_)
|
| EventMsg::PlanUpdate(_)
|
||||||
|
| EventMsg::TurnAborted(_)
|
||||||
| EventMsg::ShutdownComplete => {
|
| EventMsg::ShutdownComplete => {
|
||||||
// For now, we do not do anything extra for these
|
// For now, we do not do anything extra for these
|
||||||
// events. Note that
|
// events. Note that
|
||||||
|
|||||||
@@ -52,7 +52,6 @@ pub async fn run_conversation_loop(
|
|||||||
call_id,
|
call_id,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
EventMsg::Error(_) => {
|
EventMsg::Error(_) => {
|
||||||
error!("Codex runtime error");
|
error!("Codex runtime error");
|
||||||
@@ -75,7 +74,6 @@ pub async fn run_conversation_loop(
|
|||||||
event.id.clone(),
|
event.id.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
EventMsg::TaskComplete(_) => {}
|
EventMsg::TaskComplete(_) => {}
|
||||||
EventMsg::SessionConfigured(_) => {
|
EventMsg::SessionConfigured(_) => {
|
||||||
@@ -107,6 +105,7 @@ pub async fn run_conversation_loop(
|
|||||||
| EventMsg::PatchApplyEnd(_)
|
| EventMsg::PatchApplyEnd(_)
|
||||||
| EventMsg::GetHistoryEntryResponse(_)
|
| EventMsg::GetHistoryEntryResponse(_)
|
||||||
| EventMsg::PlanUpdate(_)
|
| EventMsg::PlanUpdate(_)
|
||||||
|
| EventMsg::TurnAborted(_)
|
||||||
| EventMsg::ShutdownComplete => {
|
| EventMsg::ShutdownComplete => {
|
||||||
// For now, we do not do anything extra for these
|
// For now, we do not do anything extra for these
|
||||||
// events. Note that
|
// events. Note that
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use codex_core::protocol::AskForApproval;
|
|||||||
use codex_core::protocol::FileChange;
|
use codex_core::protocol::FileChange;
|
||||||
use codex_core::protocol::ReviewDecision;
|
use codex_core::protocol::ReviewDecision;
|
||||||
use codex_core::protocol::SandboxPolicy;
|
use codex_core::protocol::SandboxPolicy;
|
||||||
|
use codex_core::protocol::TurnAbortReason;
|
||||||
use codex_core::protocol_config_types::ReasoningEffort;
|
use codex_core::protocol_config_types::ReasoningEffort;
|
||||||
use codex_core::protocol_config_types::ReasoningSummary;
|
use codex_core::protocol_config_types::ReasoningSummary;
|
||||||
use mcp_types::RequestId;
|
use mcp_types::RequestId;
|
||||||
@@ -191,9 +192,11 @@ pub struct InterruptConversationParams {
|
|||||||
pub conversation_id: ConversationId,
|
pub conversation_id: ConversationId,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct InterruptConversationResponse {}
|
pub struct InterruptConversationResponse {
|
||||||
|
pub abort_reason: TurnAbortReason,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
|
|||||||
@@ -5,8 +5,6 @@ use std::path::Path;
|
|||||||
|
|
||||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||||
use codex_mcp_server::CodexToolCallParam;
|
use codex_mcp_server::CodexToolCallParam;
|
||||||
use mcp_types::JSONRPCResponse;
|
|
||||||
use mcp_types::RequestId;
|
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
@@ -100,22 +98,13 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Expect Codex to return an error or interruption response
|
// Expect Codex to emit a TurnAborted event notification
|
||||||
let codex_response: JSONRPCResponse = timeout(
|
let _turn_aborted = timeout(
|
||||||
DEFAULT_READ_TIMEOUT,
|
DEFAULT_READ_TIMEOUT,
|
||||||
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)),
|
mcp_process.read_stream_until_notification_message("turn_aborted"),
|
||||||
)
|
)
|
||||||
.await??;
|
.await??;
|
||||||
|
|
||||||
assert!(
|
|
||||||
codex_response
|
|
||||||
.result
|
|
||||||
.as_object()
|
|
||||||
.map(|o| o.contains_key("error"))
|
|
||||||
.unwrap_or(false),
|
|
||||||
"Expected an interruption or error result, got: {codex_response:?}"
|
|
||||||
);
|
|
||||||
|
|
||||||
let codex_reply_request_id = mcp_process
|
let codex_reply_request_id = mcp_process
|
||||||
.send_codex_reply_tool_call(&session_id, "Second Run: run `sleep 60`")
|
.send_codex_reply_tool_call(&session_id, "Second Run: run `sleep 60`")
|
||||||
.await?;
|
.await?;
|
||||||
@@ -131,21 +120,12 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Expect Codex to return an error or interruption response
|
// Expect Codex to emit a TurnAborted event notification
|
||||||
let codex_response: JSONRPCResponse = timeout(
|
let _turn_aborted = timeout(
|
||||||
DEFAULT_READ_TIMEOUT,
|
DEFAULT_READ_TIMEOUT,
|
||||||
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_reply_request_id)),
|
mcp_process.read_stream_until_notification_message("turn_aborted"),
|
||||||
)
|
)
|
||||||
.await??;
|
.await??;
|
||||||
|
|
||||||
assert!(
|
|
||||||
codex_response
|
|
||||||
.result
|
|
||||||
.as_object()
|
|
||||||
.map(|o| o.contains_key("error"))
|
|
||||||
.unwrap_or(false),
|
|
||||||
"Expected an interruption or error result, got: {codex_response:?}"
|
|
||||||
);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ pub struct Submission {
|
|||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
pub enum Op {
|
pub enum Op {
|
||||||
/// Abort current task.
|
/// Abort current task.
|
||||||
/// This server sends no corresponding Event
|
/// This server sends [`EventMsg::TurnAborted`] in response.
|
||||||
Interrupt,
|
Interrupt,
|
||||||
|
|
||||||
/// Input from the user
|
/// Input from the user
|
||||||
@@ -422,6 +422,8 @@ pub enum EventMsg {
|
|||||||
|
|
||||||
PlanUpdate(UpdatePlanArgs),
|
PlanUpdate(UpdatePlanArgs),
|
||||||
|
|
||||||
|
TurnAborted(TurnAbortedEvent),
|
||||||
|
|
||||||
/// Notification that the agent is shutting down.
|
/// Notification that the agent is shutting down.
|
||||||
ShutdownComplete,
|
ShutdownComplete,
|
||||||
}
|
}
|
||||||
@@ -745,6 +747,18 @@ pub struct Chunk {
|
|||||||
pub inserted_lines: Vec<String>,
|
pub inserted_lines: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
|
pub struct TurnAbortedEvent {
|
||||||
|
pub reason: TurnAbortReason,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum TurnAbortReason {
|
||||||
|
Interrupted,
|
||||||
|
Replaced,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -635,6 +635,7 @@ impl ChatWidget<'_> {
|
|||||||
EventMsg::TaskComplete(TaskCompleteEvent { .. }) => self.on_task_complete(),
|
EventMsg::TaskComplete(TaskCompleteEvent { .. }) => self.on_task_complete(),
|
||||||
EventMsg::TokenCount(token_usage) => self.on_token_count(token_usage),
|
EventMsg::TokenCount(token_usage) => self.on_token_count(token_usage),
|
||||||
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
|
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
|
||||||
|
EventMsg::TurnAborted(_) => self.on_error("Turn interrupted".to_owned()),
|
||||||
EventMsg::PlanUpdate(update) => self.on_plan_update(update),
|
EventMsg::PlanUpdate(update) => self.on_plan_update(update),
|
||||||
EventMsg::ExecApprovalRequest(ev) => self.on_exec_approval_request(id, ev),
|
EventMsg::ExecApprovalRequest(ev) => self.on_exec_approval_request(id, ev),
|
||||||
EventMsg::ApplyPatchApprovalRequest(ev) => self.on_apply_patch_approval_request(id, ev),
|
EventMsg::ApplyPatchApprovalRequest(ev) => self.on_apply_patch_approval_request(id, ev),
|
||||||
|
|||||||
Reference in New Issue
Block a user