diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 73bb7149..4cc888b6 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -812,6 +812,37 @@ async fn submission_loop( } }); } + Op::Shutdown => { + info!("Shutting down Codex instance"); + + // Gracefully flush and shutdown rollout recorder on session end so tests + // that inspect the rollout file do not race with the background writer. + if let Some(sess_arc) = sess { + let recorder_opt = sess_arc.rollout.lock().unwrap().take(); + if let Some(rec) = recorder_opt { + if let Err(e) = rec.shutdown().await { + warn!("failed to shutdown rollout recorder: {e}"); + let event = Event { + id: sub.id.clone(), + msg: EventMsg::Error(ErrorEvent { + message: "Failed to shutdown rollout recorder".to_string(), + }), + }; + if let Err(e) = tx_event.send(event).await { + warn!("failed to send error message: {e:?}"); + } + } + } + } + let event = Event { + id: sub.id.clone(), + msg: EventMsg::ShutdownComplete, + }; + if let Err(e) = tx_event.send(event).await { + warn!("failed to send Shutdown event: {e}"); + } + break; + } } } debug!("Agent loop exited"); diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index cc201bc7..0c375e45 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -116,6 +116,9 @@ pub enum Op { /// Request a single history entry identified by `log_id` + `offset`. GetHistoryEntryRequest { offset: usize, log_id: u64 }, + + /// Request to shut down codex instance. + Shutdown, } /// Determines the conditions under which the user is consulted to approve @@ -326,6 +329,9 @@ pub enum EventMsg { /// Response to GetHistoryEntryRequest. GetHistoryEntryResponse(GetHistoryEntryResponseEvent), + + /// Notification that the agent is shutting down. + ShutdownComplete, } // Individual event payload types matching each `EventMsg` variant. diff --git a/codex-rs/core/src/rollout.rs b/codex-rs/core/src/rollout.rs index 0b19d133..7f0f61b9 100644 --- a/codex-rs/core/src/rollout.rs +++ b/codex-rs/core/src/rollout.rs @@ -14,6 +14,7 @@ use time::macros::format_description; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; +use tokio::sync::oneshot; use tracing::info; use tracing::warn; use uuid::Uuid; @@ -57,10 +58,10 @@ pub(crate) struct RolloutRecorder { tx: Sender, } -#[derive(Clone)] enum RolloutCmd { AddItems(Vec), UpdateState(SessionStateSnapshot), + Shutdown { ack: oneshot::Sender<()> }, } impl RolloutRecorder { @@ -204,6 +205,21 @@ impl RolloutRecorder { info!("Resumed rollout successfully from {path:?}"); Ok((Self { tx }, saved)) } + + pub async fn shutdown(&self) -> std::io::Result<()> { + let (tx_done, rx_done) = oneshot::channel(); + match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await { + Ok(_) => rx_done + .await + .map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}"))), + Err(e) => { + warn!("failed to send rollout shutdown command: {e}"); + Err(IoError::other(format!( + "failed to send rollout shutdown command: {e}" + ))) + } + } + } } struct LogFileInfo { @@ -299,6 +315,9 @@ async fn rollout_writer( let _ = file.flush().await; } } + RolloutCmd::Shutdown { ack } => { + let _ = ack.send(()); + } } } } diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 56db651a..a7edb96a 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -1,15 +1,23 @@ +use std::path::Path; + use codex_common::summarize_sandbox_policy; use codex_core::WireApi; use codex_core::config::Config; use codex_core::model_supports_reasoning_summaries; use codex_core::protocol::Event; +pub(crate) enum CodexStatus { + Running, + InitiateShutdown, + Shutdown, +} + pub(crate) trait EventProcessor { /// Print summary of effective configuration and user prompt. fn print_config_summary(&mut self, config: &Config, prompt: &str); /// Handle a single event emitted by the agent. - fn process_event(&mut self, event: Event); + fn process_event(&mut self, event: Event) -> CodexStatus; } pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static str, String)> { @@ -35,3 +43,28 @@ pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static st entries } + +pub(crate) fn handle_last_message( + last_agent_message: Option<&str>, + last_message_path: Option<&Path>, +) { + match (last_message_path, last_agent_message) { + (Some(path), Some(msg)) => write_last_message_file(msg, Some(path)), + (Some(path), None) => { + write_last_message_file("", Some(path)); + eprintln!( + "Warning: no last agent message; wrote empty content to {}", + path.display() + ); + } + (None, _) => eprintln!("Warning: no file to write last message to."), + } +} + +fn write_last_message_file(contents: &str, last_message_path: Option<&Path>) { + if let Some(path) = last_message_path { + if let Err(e) = std::fs::write(path, contents) { + eprintln!("Failed to write last message file {path:?}: {e}"); + } + } +} diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 7b390711..bc647c68 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -15,16 +15,20 @@ use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyBeginEvent; use codex_core::protocol::PatchApplyEndEvent; use codex_core::protocol::SessionConfiguredEvent; +use codex_core::protocol::TaskCompleteEvent; use codex_core::protocol::TokenUsage; use owo_colors::OwoColorize; use owo_colors::Style; use shlex::try_join; use std::collections::HashMap; use std::io::Write; +use std::path::PathBuf; use std::time::Instant; +use crate::event_processor::CodexStatus; use crate::event_processor::EventProcessor; use crate::event_processor::create_config_summary_entries; +use crate::event_processor::handle_last_message; /// This should be configurable. When used in CI, users may not want to impose /// a limit so they can see the full transcript. @@ -54,10 +58,15 @@ pub(crate) struct EventProcessorWithHumanOutput { show_agent_reasoning: bool, answer_started: bool, reasoning_started: bool, + last_message_path: Option, } impl EventProcessorWithHumanOutput { - pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self { + pub(crate) fn create_with_ansi( + with_ansi: bool, + config: &Config, + last_message_path: Option, + ) -> Self { let call_id_to_command = HashMap::new(); let call_id_to_patch = HashMap::new(); let call_id_to_tool_call = HashMap::new(); @@ -77,6 +86,7 @@ impl EventProcessorWithHumanOutput { show_agent_reasoning: !config.hide_agent_reasoning, answer_started: false, reasoning_started: false, + last_message_path, } } else { Self { @@ -93,6 +103,7 @@ impl EventProcessorWithHumanOutput { show_agent_reasoning: !config.hide_agent_reasoning, answer_started: false, reasoning_started: false, + last_message_path, } } } @@ -158,7 +169,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { ); } - fn process_event(&mut self, event: Event) { + fn process_event(&mut self, event: Event) -> CodexStatus { let Event { id: _, msg } = event; match msg { EventMsg::Error(ErrorEvent { message }) => { @@ -168,9 +179,16 @@ impl EventProcessor for EventProcessorWithHumanOutput { EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { ts_println!(self, "{}", message.style(self.dimmed)); } - EventMsg::TaskStarted | EventMsg::TaskComplete(_) => { + EventMsg::TaskStarted => { // Ignore. } + EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => { + handle_last_message( + last_agent_message.as_deref(), + self.last_message_path.as_deref(), + ); + return CodexStatus::InitiateShutdown; + } EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => { ts_println!(self, "tokens used: {total_tokens}"); } @@ -185,7 +203,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { } EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { if !self.show_agent_reasoning { - return; + return CodexStatus::Running; } if !self.reasoning_started { ts_println!( @@ -498,7 +516,9 @@ impl EventProcessor for EventProcessorWithHumanOutput { EventMsg::GetHistoryEntryResponse(_) => { // Currently ignored in exec output. } + EventMsg::ShutdownComplete => return CodexStatus::Shutdown, } + CodexStatus::Running } } diff --git a/codex-rs/exec/src/event_processor_with_json_output.rs b/codex-rs/exec/src/event_processor_with_json_output.rs index 699460bb..e7a658b7 100644 --- a/codex-rs/exec/src/event_processor_with_json_output.rs +++ b/codex-rs/exec/src/event_processor_with_json_output.rs @@ -1,18 +1,24 @@ use std::collections::HashMap; +use std::path::PathBuf; use codex_core::config::Config; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; +use codex_core::protocol::TaskCompleteEvent; use serde_json::json; +use crate::event_processor::CodexStatus; use crate::event_processor::EventProcessor; use crate::event_processor::create_config_summary_entries; +use crate::event_processor::handle_last_message; -pub(crate) struct EventProcessorWithJsonOutput; +pub(crate) struct EventProcessorWithJsonOutput { + last_message_path: Option, +} impl EventProcessorWithJsonOutput { - pub fn new() -> Self { - Self {} + pub fn new(last_message_path: Option) -> Self { + Self { last_message_path } } } @@ -33,15 +39,25 @@ impl EventProcessor for EventProcessorWithJsonOutput { println!("{prompt_json}"); } - fn process_event(&mut self, event: Event) { + fn process_event(&mut self, event: Event) -> CodexStatus { match event.msg { EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) => { // Suppress streaming events in JSON mode. + CodexStatus::Running } + EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => { + handle_last_message( + last_agent_message.as_deref(), + self.last_message_path.as_deref(), + ); + CodexStatus::InitiateShutdown + } + EventMsg::ShutdownComplete => CodexStatus::Shutdown, _ => { if let Ok(line) = serde_json::to_string(&event) { println!("{line}"); } + CodexStatus::Running } } } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 620ab823..126e92f5 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -5,7 +5,6 @@ mod event_processor_with_json_output; use std::io::IsTerminal; use std::io::Read; -use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -28,6 +27,7 @@ use tracing::error; use tracing::info; use tracing_subscriber::EnvFilter; +use crate::event_processor::CodexStatus; use crate::event_processor::EventProcessor; pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> anyhow::Result<()> { @@ -123,11 +123,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?; let mut event_processor: Box = if json_mode { - Box::new(EventProcessorWithJsonOutput::new()) + Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())) } else { Box::new(EventProcessorWithHumanOutput::create_with_ansi( stdout_with_ansi, &config, + last_message_file.clone(), )) }; @@ -224,40 +225,17 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any // Run the loop until the task is complete. while let Some(event) = rx.recv().await { - let (is_last_event, last_assistant_message) = match &event.msg { - EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => { - (true, last_agent_message.clone()) + let shutdown: CodexStatus = event_processor.process_event(event); + match shutdown { + CodexStatus::Running => continue, + CodexStatus::InitiateShutdown => { + codex.submit(Op::Shutdown).await?; + } + CodexStatus::Shutdown => { + break; } - _ => (false, None), - }; - event_processor.process_event(event); - if is_last_event { - handle_last_message(last_assistant_message, last_message_file.as_deref())?; - break; } } Ok(()) } - -fn handle_last_message( - last_agent_message: Option, - last_message_file: Option<&Path>, -) -> std::io::Result<()> { - match (last_agent_message, last_message_file) { - (Some(last_agent_message), Some(last_message_file)) => { - // Last message and a file to write to. - std::fs::write(last_message_file, last_agent_message)?; - } - (None, Some(last_message_file)) => { - eprintln!( - "Warning: No last message to write to file: {}", - last_message_file.to_string_lossy() - ); - } - (_, None) => { - // No last message and no file to write to. - } - } - Ok(()) -} diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 4e10d158..f2cacf6c 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -246,7 +246,8 @@ async fn run_codex_tool_session_inner( | EventMsg::BackgroundEvent(_) | EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyEnd(_) - | EventMsg::GetHistoryEntryResponse(_) => { + | EventMsg::GetHistoryEntryResponse(_) + | EventMsg::ShutdownComplete => { // For now, we do not do anything extra for these // events. Note that // send(codex_event_to_notification(&event)) above has diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 37c2616d..377b5d6f 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -223,9 +223,7 @@ impl App<'_> { } => { match &mut self.app_state { AppState::Chat { widget } => { - if widget.on_ctrl_c() { - self.app_event_tx.send(AppEvent::ExitRequest); - } + widget.on_ctrl_c(); } AppState::Login { .. } | AppState::GitWarning { .. } => { // No-op. diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 38565870..081a406f 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -419,6 +419,9 @@ impl ChatWidget<'_> { self.bottom_pane .on_history_entry_response(log_id, offset, entry.map(|e| e.text)); } + EventMsg::ShutdownComplete => { + self.app_event_tx.send(AppEvent::ExitRequest); + } event => { self.conversation_history .add_background_event(format!("{event:?}")); @@ -471,6 +474,7 @@ impl ChatWidget<'_> { self.reasoning_buffer.clear(); false } else if self.bottom_pane.ctrl_c_quit_hint_visible() { + self.submit_op(Op::Shutdown); true } else { self.bottom_pane.show_ctrl_c_quit_hint();