// Poisoned mutex should fail the program #![allow(clippy::unwrap_used)] use std::collections::HashMap; use std::collections::HashSet; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::AtomicU64; use std::time::Duration; use anyhow::Context; use async_channel::Receiver; use async_channel::Sender; use codex_apply_patch::AffectedPaths; use codex_apply_patch::ApplyPatchAction; use codex_apply_patch::ApplyPatchFileChange; use codex_apply_patch::MaybeApplyPatchVerified; use codex_apply_patch::maybe_parse_apply_patch_verified; use codex_apply_patch::print_summary; use futures::prelude::*; use mcp_types::CallToolResult; use serde::Serialize; use serde_json; use tokio::sync::Notify; use tokio::sync::oneshot; use tokio::task::AbortHandle; use tracing::debug; use tracing::error; use tracing::info; use tracing::trace; use tracing::warn; use uuid::Uuid; use crate::WireApi; use crate::client::ModelClient; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; use crate::config::Config; use crate::config_types::ShellEnvironmentPolicy; use crate::conversation_history::ConversationHistory; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::error::SandboxErr; use crate::exec::ExecParams; use crate::exec::ExecToolCallOutput; use crate::exec::SandboxType; use crate::exec::process_exec_tool_call; use crate::exec_env::create_env; use crate::flags::OPENAI_STREAM_MAX_RETRIES; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_connection_manager::try_parse_fully_qualified_tool_name; use crate::mcp_tool_call::handle_mcp_tool_call; use crate::models::ContentItem; use crate::models::FunctionCallOutputPayload; use crate::models::LocalShellAction; use crate::models::ReasoningItemReasoningSummary; use crate::models::ResponseInputItem; use crate::models::ResponseItem; use crate::models::ShellToolCallParams; use crate::project_doc::get_user_instructions; use crate::protocol::AgentMessageEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; use crate::protocol::AskForApproval; use crate::protocol::BackgroundEventEvent; use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecApprovalRequestEvent; use crate::protocol::ExecCommandBeginEvent; use crate::protocol::ExecCommandEndEvent; use crate::protocol::FileChange; use crate::protocol::InputItem; use crate::protocol::Op; use crate::protocol::PatchApplyBeginEvent; use crate::protocol::PatchApplyEndEvent; use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; use crate::protocol::SessionConfiguredEvent; use crate::protocol::Submission; use crate::protocol::TaskCompleteEvent; use crate::rollout::RolloutRecorder; use crate::safety::SafetyCheck; use crate::safety::assess_command_safety; use crate::safety::assess_patch_safety; use crate::user_notification::UserNotification; use crate::util::backoff; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. pub struct Codex { next_id: AtomicU64, tx_sub: Sender, rx_event: Receiver, } impl Codex { /// Spawn a new [`Codex`] and initialize the session. Returns the instance /// of `Codex` and the ID of the `SessionInitialized` event that was /// submitted to start the session. pub async fn spawn(config: Config, ctrl_c: Arc) -> CodexResult<(Codex, String)> { let (tx_sub, rx_sub) = async_channel::bounded(64); let (tx_event, rx_event) = async_channel::bounded(64); let instructions = get_user_instructions(&config).await; let configure_session = Op::ConfigureSession { provider: config.model_provider.clone(), model: config.model.clone(), model_reasoning_effort: config.model_reasoning_effort, model_reasoning_summary: config.model_reasoning_summary, instructions, approval_policy: config.approval_policy, sandbox_policy: config.sandbox_policy.clone(), disable_response_storage: config.disable_response_storage, notify: config.notify.clone(), cwd: config.cwd.clone(), }; let config = Arc::new(config); tokio::spawn(submission_loop(config, rx_sub, tx_event, ctrl_c)); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, rx_event, }; let init_id = codex.submit(configure_session).await?; Ok((codex, init_id)) } /// Submit the `op` wrapped in a `Submission` with a unique ID. pub async fn submit(&self, op: Op) -> CodexResult { let id = self .next_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) .to_string(); let sub = Submission { id: id.clone(), op }; self.submit_with_id(sub).await?; Ok(id) } /// Use sparingly: prefer `submit()` so Codex is responsible for generating /// unique IDs for each submission. pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { self.tx_sub .send(sub) .await .map_err(|_| CodexErr::InternalAgentDied)?; Ok(()) } pub async fn next_event(&self) -> CodexResult { let event = self .rx_event .recv() .await .map_err(|_| CodexErr::InternalAgentDied)?; Ok(event) } } /// Context for an initialized model agent /// /// A session has at most 1 running task at a time, and can be interrupted by user input. pub(crate) struct Session { client: ModelClient, tx_event: Sender, ctrl_c: Arc, /// The session's current working directory. All relative paths provided by /// the model as well as sandbox policies are resolved against this path /// instead of `std::env::current_dir()`. cwd: PathBuf, instructions: Option, approval_policy: AskForApproval, sandbox_policy: SandboxPolicy, shell_environment_policy: ShellEnvironmentPolicy, writable_roots: Mutex>, /// Manager for external MCP servers/tools. mcp_connection_manager: McpConnectionManager, /// External notifier command (will be passed as args to exec()). When /// `None` this feature is disabled. notify: Option>, /// Optional rollout recorder for persisting the conversation transcript so /// sessions can be replayed or inspected later. rollout: Mutex>, state: Mutex, codex_linux_sandbox_exe: Option, } impl Session { fn resolve_path(&self, path: Option) -> PathBuf { path.as_ref() .map(PathBuf::from) .map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p)) } } /// Mutable state of the agent #[derive(Default)] struct State { approved_commands: HashSet>, current_task: Option, previous_response_id: Option, pending_approvals: HashMap>, pending_input: Vec, zdr_transcript: Option, } impl Session { pub fn set_task(&self, task: AgentTask) { let mut state = self.state.lock().unwrap(); if let Some(current_task) = state.current_task.take() { current_task.abort(); } state.current_task = Some(task); } pub fn remove_task(&self, sub_id: &str) { let mut state = self.state.lock().unwrap(); if let Some(task) = &state.current_task { if task.sub_id == sub_id { state.current_task.take(); } } } /// Sends the given event to the client and swallows the send event, if /// any, logging it as an error. pub(crate) async fn send_event(&self, event: Event) { if let Err(e) = self.tx_event.send(event).await { error!("failed to send tool call event: {e}"); } } pub async fn request_command_approval( &self, sub_id: String, command: Vec, cwd: PathBuf, reason: Option, ) -> oneshot::Receiver { let (tx_approve, rx_approve) = oneshot::channel(); let event = Event { id: sub_id.clone(), msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { command, cwd, reason, }), }; let _ = self.tx_event.send(event).await; { let mut state = self.state.lock().unwrap(); state.pending_approvals.insert(sub_id, tx_approve); } rx_approve } pub async fn request_patch_approval( &self, sub_id: String, action: &ApplyPatchAction, reason: Option, grant_root: Option, ) -> oneshot::Receiver { let (tx_approve, rx_approve) = oneshot::channel(); let event = Event { id: sub_id.clone(), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { changes: convert_apply_patch_to_protocol(action), reason, grant_root, }), }; let _ = self.tx_event.send(event).await; { let mut state = self.state.lock().unwrap(); state.pending_approvals.insert(sub_id, tx_approve); } rx_approve } pub fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) { let mut state = self.state.lock().unwrap(); if let Some(tx_approve) = state.pending_approvals.remove(sub_id) { tx_approve.send(decision).ok(); } } pub fn add_approved_command(&self, cmd: Vec) { let mut state = self.state.lock().unwrap(); state.approved_commands.insert(cmd); } /// Records items to both the rollout and the chat completions/ZDR /// transcript, if enabled. async fn record_conversation_items(&self, items: &[ResponseItem]) { debug!("Recording items for conversation: {items:?}"); self.record_rollout_items(items).await; if let Some(transcript) = self.state.lock().unwrap().zdr_transcript.as_mut() { transcript.record_items(items); } } /// Append the given items to the session's rollout transcript (if enabled) /// and persist them to disk. async fn record_rollout_items(&self, items: &[ResponseItem]) { // Clone the recorder outside of the mutex so we don’t hold the lock // across an await point (MutexGuard is not Send). let recorder = { let guard = self.rollout.lock().unwrap(); guard.as_ref().cloned() }; if let Some(rec) = recorder { if let Err(e) = rec.record_items(items).await { error!("failed to record rollout items: {e:#}"); } } } async fn notify_exec_command_begin(&self, sub_id: &str, call_id: &str, params: &ExecParams) { let event = Event { id: sub_id.to_string(), msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent { call_id: call_id.to_string(), command: params.command.clone(), cwd: params.cwd.clone(), }), }; let _ = self.tx_event.send(event).await; } async fn notify_exec_command_end( &self, sub_id: &str, call_id: &str, stdout: &str, stderr: &str, exit_code: i32, ) { const MAX_STREAM_OUTPUT: usize = 5 * 1024; // 5KiB let event = Event { id: sub_id.to_string(), // Because stdout and stderr could each be up to 100 KiB, we send // truncated versions. msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id: call_id.to_string(), stdout: stdout.chars().take(MAX_STREAM_OUTPUT).collect(), stderr: stderr.chars().take(MAX_STREAM_OUTPUT).collect(), exit_code, }), }; let _ = self.tx_event.send(event).await; } /// Helper that emits a BackgroundEvent with the given message. This keeps /// the call‑sites terse so adding more diagnostics does not clutter the /// core agent logic. async fn notify_background_event(&self, sub_id: &str, message: impl Into) { let event = Event { id: sub_id.to_string(), msg: EventMsg::BackgroundEvent(BackgroundEventEvent { message: message.into(), }), }; let _ = self.tx_event.send(event).await; } /// Returns the input if there was no task running to inject into pub fn inject_input(&self, input: Vec) -> Result<(), Vec> { let mut state = self.state.lock().unwrap(); if state.current_task.is_some() { state.pending_input.push(input.into()); Ok(()) } else { Err(input) } } pub fn get_pending_input(&self) -> Vec { let mut state = self.state.lock().unwrap(); if state.pending_input.is_empty() { Vec::with_capacity(0) } else { let mut ret = Vec::new(); std::mem::swap(&mut ret, &mut state.pending_input); ret } } pub async fn call_tool( &self, server: &str, tool: &str, arguments: Option, timeout: Option, ) -> anyhow::Result { self.mcp_connection_manager .call_tool(server, tool, arguments, timeout) .await } pub fn abort(&self) { info!("Aborting existing session"); let mut state = self.state.lock().unwrap(); state.pending_approvals.clear(); state.pending_input.clear(); if let Some(task) = state.current_task.take() { task.abort(); } } /// Spawn the configured notifier (if any) with the given JSON payload as /// the last argument. Failures are logged but otherwise ignored so that /// notification issues do not interfere with the main workflow. fn maybe_notify(&self, notification: UserNotification) { let Some(notify_command) = &self.notify else { return; }; if notify_command.is_empty() { return; } let Ok(json) = serde_json::to_string(¬ification) else { tracing::error!("failed to serialise notification payload"); return; }; let mut command = std::process::Command::new(¬ify_command[0]); if notify_command.len() > 1 { command.args(¬ify_command[1..]); } command.arg(json); // Fire-and-forget – we do not wait for completion. if let Err(e) = command.spawn() { tracing::warn!("failed to spawn notifier '{}': {e}", notify_command[0]); } } } impl Drop for Session { fn drop(&mut self) { self.abort(); } } impl State { pub fn partial_clone(&self, retain_zdr_transcript: bool) -> Self { Self { approved_commands: self.approved_commands.clone(), previous_response_id: self.previous_response_id.clone(), zdr_transcript: if retain_zdr_transcript { self.zdr_transcript.clone() } else { None }, ..Default::default() } } } /// A series of Turns in response to user input. pub(crate) struct AgentTask { sess: Arc, sub_id: String, handle: AbortHandle, } impl AgentTask { fn spawn(sess: Arc, sub_id: String, input: Vec) -> Self { let handle = tokio::spawn(run_task(Arc::clone(&sess), sub_id.clone(), input)).abort_handle(); Self { sess, sub_id, handle, } } fn abort(self) { if !self.handle.is_finished() { self.handle.abort(); let event = Event { id: self.sub_id, msg: EventMsg::Error(ErrorEvent { message: "Turn interrupted".to_string(), }), }; let tx_event = self.sess.tx_event.clone(); tokio::spawn(async move { tx_event.send(event).await.ok(); }); } } } async fn submission_loop( config: Arc, rx_sub: Receiver, tx_event: Sender, ctrl_c: Arc, ) { // Generate a unique ID for the lifetime of this Codex session. let session_id = Uuid::new_v4(); let mut sess: Option> = None; // shorthand - send an event when there is no active session let send_no_session_event = |sub_id: String| async { let event = Event { id: sub_id, msg: EventMsg::Error(ErrorEvent { message: "No session initialized, expected 'ConfigureSession' as first Op" .to_string(), }), }; tx_event.send(event).await.ok(); }; loop { let interrupted = ctrl_c.notified(); let sub = tokio::select! { res = rx_sub.recv() => match res { Ok(sub) => sub, Err(_) => break, }, _ = interrupted => { if let Some(sess) = sess.as_ref(){ sess.abort(); } continue; }, }; debug!(?sub, "Submission"); match sub.op { Op::Interrupt => { let sess = match sess.as_ref() { Some(sess) => sess, None => { send_no_session_event(sub.id).await; continue; } }; sess.abort(); } Op::ConfigureSession { provider, model, model_reasoning_effort, model_reasoning_summary, instructions, approval_policy, sandbox_policy, disable_response_storage, notify, cwd, } => { info!("Configuring session: model={model}; provider={provider:?}"); if !cwd.is_absolute() { let message = format!("cwd is not absolute: {cwd:?}"); error!(message); let event = Event { id: sub.id, msg: EventMsg::Error(ErrorEvent { message }), }; if let Err(e) = tx_event.send(event).await { error!("failed to send error message: {e:?}"); } return; } let client = ModelClient::new( model.clone(), provider.clone(), model_reasoning_effort, model_reasoning_summary, ); // abort any current running session and clone its state let retain_zdr_transcript = record_conversation_history(disable_response_storage, provider.wire_api); let state = match sess.take() { Some(sess) => { sess.abort(); sess.state .lock() .unwrap() .partial_clone(retain_zdr_transcript) } None => State { zdr_transcript: if retain_zdr_transcript { Some(ConversationHistory::new()) } else { None }, ..Default::default() }, }; let writable_roots = Mutex::new(get_writable_roots(&cwd)); // Error messages to dispatch after SessionConfigured is sent. let mut mcp_connection_errors = Vec::::new(); let (mcp_connection_manager, failed_clients) = match McpConnectionManager::new(config.mcp_servers.clone()).await { Ok((mgr, failures)) => (mgr, failures), Err(e) => { let message = format!("Failed to create MCP connection manager: {e:#}"); error!("{message}"); mcp_connection_errors.push(Event { id: sub.id.clone(), msg: EventMsg::Error(ErrorEvent { message }), }); (McpConnectionManager::default(), Default::default()) } }; // Surface individual client start-up failures to the user. if !failed_clients.is_empty() { for (server_name, err) in failed_clients { let message = format!("MCP client for `{server_name}` failed to start: {err:#}"); error!("{message}"); mcp_connection_errors.push(Event { id: sub.id.clone(), msg: EventMsg::Error(ErrorEvent { message }), }); } } // Attempt to create a RolloutRecorder *before* moving the // `instructions` value into the Session struct. // TODO: if ConfigureSession is sent twice, we will create an // overlapping rollout file. Consider passing RolloutRecorder // from above. let rollout_recorder = match RolloutRecorder::new(&config, session_id, instructions.clone()).await { Ok(r) => Some(r), Err(e) => { tracing::warn!("failed to initialise rollout recorder: {e}"); None } }; sess = Some(Arc::new(Session { client, tx_event: tx_event.clone(), ctrl_c: Arc::clone(&ctrl_c), instructions, approval_policy, sandbox_policy, shell_environment_policy: config.shell_environment_policy.clone(), cwd, writable_roots, mcp_connection_manager, notify, state: Mutex::new(state), rollout: Mutex::new(rollout_recorder), codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), })); // Gather history metadata for SessionConfiguredEvent. let (history_log_id, history_entry_count) = crate::message_history::history_metadata(&config).await; // ack let events = std::iter::once(Event { id: sub.id.clone(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { session_id, model, history_log_id, history_entry_count, }), }) .chain(mcp_connection_errors.into_iter()); for event in events { if let Err(e) = tx_event.send(event).await { error!("failed to send event: {e:?}"); } } } Op::UserInput { items } => { let sess = match sess.as_ref() { Some(sess) => sess, None => { send_no_session_event(sub.id).await; continue; } }; // attempt to inject input into current task if let Err(items) = sess.inject_input(items) { // no current task, spawn a new one let task = AgentTask::spawn(Arc::clone(sess), sub.id, items); sess.set_task(task); } } Op::ExecApproval { id, decision } => { let sess = match sess.as_ref() { Some(sess) => sess, None => { send_no_session_event(sub.id).await; continue; } }; match decision { ReviewDecision::Abort => { sess.abort(); } other => sess.notify_approval(&id, other), } } Op::PatchApproval { id, decision } => { let sess = match sess.as_ref() { Some(sess) => sess, None => { send_no_session_event(sub.id).await; continue; } }; match decision { ReviewDecision::Abort => { sess.abort(); } other => sess.notify_approval(&id, other), } } Op::AddToHistory { text } => { let id = session_id; let config = config.clone(); tokio::spawn(async move { if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await { tracing::warn!("failed to append to message history: {e}"); } }); } Op::GetHistoryEntryRequest { offset, log_id } => { let config = config.clone(); let tx_event = tx_event.clone(); let sub_id = sub.id.clone(); tokio::spawn(async move { // Run lookup in blocking thread because it does file IO + locking. let entry_opt = tokio::task::spawn_blocking(move || { crate::message_history::lookup(log_id, offset, &config) }) .await .unwrap_or(None); let event = Event { id: sub_id, msg: EventMsg::GetHistoryEntryResponse( crate::protocol::GetHistoryEntryResponseEvent { offset, log_id, entry: entry_opt, }, ), }; if let Err(e) = tx_event.send(event).await { tracing::warn!("failed to send GetHistoryEntryResponse event: {e}"); } }); } } } debug!("Agent loop exited"); } /// Takes a user message as input and runs a loop where, at each turn, the model /// replies with either: /// /// - requested function calls /// - an assistant message /// /// While it is possible for the model to return multiple of these items in a /// single turn, in practice, we generally one item per turn: /// /// - If the model requests a function call, we execute it and send the output /// back to the model in the next turn. /// - If the model sends only an assistant message, we record it in the /// conversation history and consider the task complete. async fn run_task(sess: Arc, sub_id: String, input: Vec) { if input.is_empty() { return; } let event = Event { id: sub_id.clone(), msg: EventMsg::TaskStarted, }; if sess.tx_event.send(event).await.is_err() { return; } let initial_input_for_turn = ResponseInputItem::from(input); sess.record_conversation_items(&[initial_input_for_turn.clone().into()]) .await; let mut input_for_next_turn: Vec = vec![initial_input_for_turn]; let last_agent_message: Option; loop { let mut net_new_turn_input = input_for_next_turn .drain(..) .map(ResponseItem::from) .collect::>(); // Note that pending_input would be something like a message the user // submitted through the UI while the model was running. Though the UI // may support this, the model might not. let pending_input = sess .get_pending_input() .into_iter() .map(ResponseItem::from) .collect::>(); sess.record_conversation_items(&pending_input).await; // Construct the input that we will send to the model. When using the // Chat completions API (or ZDR clients), the model needs the full // conversation history on each turn. The rollout file, however, should // only record the new items that originated in this turn so that it // represents an append-only log without duplicates. let turn_input: Vec = if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() { // If we are using Chat/ZDR, we need to send the transcript with // every turn. By induction, `transcript` already contains: // - The `input` that kicked off this task. // - Each `ResponseItem` that was recorded in the previous turn. // - Each response to a `ResponseItem` (in practice, the only // response type we seem to have is `FunctionCallOutput`). // // The only thing the `transcript` does not contain is the // `pending_input` that was injected while the model was // running. We need to add that to the conversation history // so that the model can see it in the next turn. [transcript.contents(), pending_input].concat() } else { // In practice, net_new_turn_input should contain only: // - User messages // - Outputs for function calls requested by the model net_new_turn_input.extend(pending_input); // Responses API path – we can just send the new items and // record the same. net_new_turn_input }; let turn_input_messages: Vec = turn_input .iter() .filter_map(|item| match item { ResponseItem::Message { content, .. } => Some(content), _ => None, }) .flat_map(|content| { content.iter().filter_map(|item| match item { ContentItem::OutputText { text } => Some(text.clone()), _ => None, }) }) .collect(); match run_turn(&sess, sub_id.clone(), turn_input).await { Ok(turn_output) => { let mut items_to_record_in_conversation_history = Vec::::new(); let mut responses = Vec::::new(); for processed_response_item in turn_output { let ProcessedResponseItem { item, response } = processed_response_item; match (&item, &response) { (ResponseItem::Message { role, .. }, None) if role == "assistant" => { // If the model returned a message, we need to record it. items_to_record_in_conversation_history.push(item); } ( ResponseItem::LocalShellCall { .. }, Some(ResponseInputItem::FunctionCallOutput { call_id, output }), ) => { items_to_record_in_conversation_history.push(item); items_to_record_in_conversation_history.push( ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output: output.clone(), }, ); } ( ResponseItem::FunctionCall { .. }, Some(ResponseInputItem::FunctionCallOutput { call_id, output }), ) => { items_to_record_in_conversation_history.push(item); items_to_record_in_conversation_history.push( ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output: output.clone(), }, ); } ( ResponseItem::FunctionCall { .. }, Some(ResponseInputItem::McpToolCallOutput { call_id, result }), ) => { items_to_record_in_conversation_history.push(item); let (content, success): (String, Option) = match result { Ok(CallToolResult { content, is_error }) => { match serde_json::to_string(content) { Ok(content) => (content, *is_error), Err(e) => { warn!("Failed to serialize MCP tool call output: {e}"); (e.to_string(), Some(true)) } } } Err(e) => (e.clone(), Some(true)), }; items_to_record_in_conversation_history.push( ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output: FunctionCallOutputPayload { content, success }, }, ); } (ResponseItem::Reasoning { .. }, None) => { // Omit from conversation history. } _ => { warn!("Unexpected response item: {item:?} with response: {response:?}"); } }; if let Some(response) = response { responses.push(response); } } // Only attempt to take the lock if there is something to record. if !items_to_record_in_conversation_history.is_empty() { sess.record_conversation_items(&items_to_record_in_conversation_history) .await; } if responses.is_empty() { debug!("Turn completed"); last_agent_message = get_last_assistant_message_from_turn( &items_to_record_in_conversation_history, ); sess.maybe_notify(UserNotification::AgentTurnComplete { turn_id: sub_id.clone(), input_messages: turn_input_messages, last_assistant_message: last_agent_message.clone(), }); break; } input_for_next_turn = responses; } Err(e) => { info!("Turn error: {e:#}"); let event = Event { id: sub_id.clone(), msg: EventMsg::Error(ErrorEvent { message: e.to_string(), }), }; sess.tx_event.send(event).await.ok(); return; } } } sess.remove_task(&sub_id); let event = Event { id: sub_id, msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }), }; sess.tx_event.send(event).await.ok(); } async fn run_turn( sess: &Session, sub_id: String, input: Vec, ) -> CodexResult> { // Decide whether to use server-side storage (previous_response_id) or disable it let (prev_id, store) = { let state = sess.state.lock().unwrap(); let store = state.zdr_transcript.is_none(); let prev_id = if store { state.previous_response_id.clone() } else { // When using ZDR, the Responses API may send previous_response_id // back, but trying to use it results in a 400. None }; (prev_id, store) }; let extra_tools = sess.mcp_connection_manager.list_all_tools(); let prompt = Prompt { input, prev_id, user_instructions: sess.instructions.clone(), store, extra_tools, }; let mut retries = 0; loop { match try_run_turn(sess, &sub_id, &prompt).await { Ok(output) => return Ok(output), Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted), Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), Err(e) => { if retries < *OPENAI_STREAM_MAX_RETRIES { retries += 1; let delay = backoff(retries); warn!( "stream disconnected - retrying turn ({retries}/{} in {delay:?})...", *OPENAI_STREAM_MAX_RETRIES ); // Surface retry information to any UI/front‑end so the // user understands what is happening instead of staring // at a seemingly frozen screen. sess.notify_background_event( &sub_id, format!( "stream error: {e}; retrying {retries}/{} in {:?}…", *OPENAI_STREAM_MAX_RETRIES, delay ), ) .await; tokio::time::sleep(delay).await; } else { return Err(e); } } } } } /// When the model is prompted, it returns a stream of events. Some of these /// events map to a `ResponseItem`. A `ResponseItem` may need to be /// "handled" such that it produces a `ResponseInputItem` that needs to be /// sent back to the model on the next turn. struct ProcessedResponseItem { item: ResponseItem, response: Option, } async fn try_run_turn( sess: &Session, sub_id: &str, prompt: &Prompt, ) -> CodexResult> { let mut stream = sess.client.clone().stream(prompt).await?; // Buffer all the incoming messages from the stream first, then execute them. // If we execute a function call in the middle of handling the stream, it can time out. let mut input = Vec::new(); while let Some(event) = stream.next().await { input.push(event?); } let mut output = Vec::new(); for event in input { match event { ResponseEvent::OutputItemDone(item) => { let response = handle_response_item(sess, sub_id, item.clone()).await?; output.push(ProcessedResponseItem { item, response }); } ResponseEvent::Completed { response_id } => { let mut state = sess.state.lock().unwrap(); state.previous_response_id = Some(response_id); break; } } } Ok(output) } async fn handle_response_item( sess: &Session, sub_id: &str, item: ResponseItem, ) -> CodexResult> { debug!(?item, "Output item"); let output = match item { ResponseItem::Message { content, .. } => { for item in content { if let ContentItem::OutputText { text } = item { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }), }; sess.tx_event.send(event).await.ok(); } } None } ResponseItem::Reasoning { id: _, summary } => { for item in summary { let text = match item { ReasoningItemReasoningSummary::SummaryText { text } => text, }; let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }), }; sess.tx_event.send(event).await.ok(); } None } ResponseItem::FunctionCall { name, arguments, call_id, } => { tracing::info!("FunctionCall: {arguments}"); Some(handle_function_call(sess, sub_id.to_string(), name, arguments, call_id).await) } ResponseItem::LocalShellCall { id, call_id, status: _, action, } => { let LocalShellAction::Exec(action) = action; tracing::info!("LocalShellCall: {action:?}"); let params = ShellToolCallParams { command: action.command, workdir: action.working_directory, timeout_ms: action.timeout_ms, }; let effective_call_id = match (call_id, id) { (Some(call_id), _) => call_id, (None, Some(id)) => id, (None, None) => { error!("LocalShellCall without call_id or id"); return Ok(Some(ResponseInputItem::FunctionCallOutput { call_id: "".to_string(), output: FunctionCallOutputPayload { content: "LocalShellCall without call_id or id".to_string(), success: None, }, })); } }; let exec_params = to_exec_params(params, sess); Some( handle_container_exec_with_params( exec_params, sess, sub_id.to_string(), effective_call_id, ) .await, ) } ResponseItem::FunctionCallOutput { .. } => { debug!("unexpected FunctionCallOutput from stream"); None } ResponseItem::Other => None, }; Ok(output) } async fn handle_function_call( sess: &Session, sub_id: String, name: String, arguments: String, call_id: String, ) -> ResponseInputItem { match name.as_str() { "container.exec" | "shell" => { let params = match parse_container_exec_arguments(arguments, sess, &call_id) { Ok(params) => params, Err(output) => { return output; } }; handle_container_exec_with_params(params, sess, sub_id, call_id).await } _ => { match try_parse_fully_qualified_tool_name(&name) { Some((server, tool_name)) => { // TODO(mbolin): Determine appropriate timeout for tool call. let timeout = None; handle_mcp_tool_call( sess, &sub_id, call_id, server, tool_name, arguments, timeout, ) .await } None => { // Unknown function: reply with structured failure so the model can adapt. ResponseInputItem::FunctionCallOutput { call_id, output: crate::models::FunctionCallOutputPayload { content: format!("unsupported call: {}", name), success: None, }, } } } } } } fn to_exec_params(params: ShellToolCallParams, sess: &Session) -> ExecParams { ExecParams { command: params.command, cwd: sess.resolve_path(params.workdir.clone()), timeout_ms: params.timeout_ms, env: create_env(&sess.shell_environment_policy), } } fn parse_container_exec_arguments( arguments: String, sess: &Session, call_id: &str, ) -> Result { // parse command match serde_json::from_str::(&arguments) { Ok(shell_tool_call_params) => Ok(to_exec_params(shell_tool_call_params, sess)), Err(e) => { // allow model to re-sample let output = ResponseInputItem::FunctionCallOutput { call_id: call_id.to_string(), output: crate::models::FunctionCallOutputPayload { content: format!("failed to parse function arguments: {e}"), success: None, }, }; Err(output) } } } async fn handle_container_exec_with_params( params: ExecParams, sess: &Session, sub_id: String, call_id: String, ) -> ResponseInputItem { // check if this was a patch, and apply it if so match maybe_parse_apply_patch_verified(¶ms.command, ¶ms.cwd) { MaybeApplyPatchVerified::Body(changes) => { return apply_patch(sess, sub_id, call_id, changes).await; } MaybeApplyPatchVerified::CorrectnessError(parse_error) => { // It looks like an invocation of `apply_patch`, but we // could not resolve it into a patch that would apply // cleanly. Return to model for resample. return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("error: {parse_error:#}"), success: None, }, }; } MaybeApplyPatchVerified::ShellParseError(error) => { trace!("Failed to parse shell command, {error:?}"); } MaybeApplyPatchVerified::NotApplyPatch => (), } // safety checks let safety = { let state = sess.state.lock().unwrap(); assess_command_safety( ¶ms.command, sess.approval_policy, &sess.sandbox_policy, &state.approved_commands, ) }; let sandbox_type = match safety { SafetyCheck::AutoApprove { sandbox_type } => sandbox_type, SafetyCheck::AskUser => { let rx_approve = sess .request_command_approval( sub_id.clone(), params.command.clone(), params.cwd.clone(), None, ) .await; match rx_approve.await.unwrap_or_default() { ReviewDecision::Approved => (), ReviewDecision::ApprovedForSession => { sess.add_approved_command(params.command.clone()); } ReviewDecision::Denied | ReviewDecision::Abort => { return ResponseInputItem::FunctionCallOutput { call_id, output: crate::models::FunctionCallOutputPayload { content: "exec command rejected by user".to_string(), success: None, }, }; } } // No sandboxing is applied because the user has given // explicit approval. Often, we end up in this case because // the command cannot be run in a sandbox, such as // installing a new dependency that requires network access. SandboxType::None } SafetyCheck::Reject { reason } => { return ResponseInputItem::FunctionCallOutput { call_id, output: crate::models::FunctionCallOutputPayload { content: format!("exec command rejected: {reason}"), success: None, }, }; } }; sess.notify_exec_command_begin(&sub_id, &call_id, ¶ms) .await; let output_result = process_exec_tool_call( params.clone(), sandbox_type, sess.ctrl_c.clone(), &sess.sandbox_policy, &sess.codex_linux_sandbox_exe, ) .await; match output_result { Ok(output) => { let ExecToolCallOutput { exit_code, stdout, stderr, duration, } = output; sess.notify_exec_command_end(&sub_id, &call_id, &stdout, &stderr, exit_code) .await; let is_success = exit_code == 0; let content = format_exec_output( if is_success { &stdout } else { &stderr }, exit_code, duration, ); ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content, success: Some(is_success), }, } } Err(CodexErr::Sandbox(error)) => { handle_sanbox_error(error, sandbox_type, params, sess, sub_id, call_id).await } Err(e) => { // Handle non-sandbox errors ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("execution error: {e}"), success: None, }, } } } } async fn handle_sanbox_error( error: SandboxErr, sandbox_type: SandboxType, params: ExecParams, sess: &Session, sub_id: String, call_id: String, ) -> ResponseInputItem { // Early out if the user never wants to be asked for approval; just return to the model immediately if sess.approval_policy == AskForApproval::Never { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!( "failed in sandbox {:?} with execution error: {error}", sandbox_type ), success: Some(false), }, }; } // Ask the user to retry without sandbox sess.notify_background_event(&sub_id, format!("Execution failed: {error}")) .await; let rx_approve = sess .request_command_approval( sub_id.clone(), params.command.clone(), params.cwd.clone(), Some("command failed; retry without sandbox?".to_string()), ) .await; match rx_approve.await.unwrap_or_default() { ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { // Persist this command as pre‑approved for the // remainder of the session so future // executions skip the sandbox directly. // TODO(ragona): Isn't this a bug? It always saves the command in an | fork? sess.add_approved_command(params.command.clone()); // Inform UI we are retrying without sandbox. sess.notify_background_event(&sub_id, "retrying command without sandbox") .await; // Emit a fresh Begin event so progress bars reset. let retry_call_id = format!("{call_id}-retry"); sess.notify_exec_command_begin(&sub_id, &retry_call_id, ¶ms) .await; // This is an escalated retry; the policy will not be // examined and the sandbox has been set to `None`. let retry_output_result = process_exec_tool_call( params, SandboxType::None, sess.ctrl_c.clone(), &sess.sandbox_policy, &sess.codex_linux_sandbox_exe, ) .await; match retry_output_result { Ok(retry_output) => { let ExecToolCallOutput { exit_code, stdout, stderr, duration, } = retry_output; sess.notify_exec_command_end( &sub_id, &retry_call_id, &stdout, &stderr, exit_code, ) .await; let is_success = exit_code == 0; let content = format_exec_output( if is_success { &stdout } else { &stderr }, exit_code, duration, ); ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content, success: Some(is_success), }, } } Err(e) => { // Handle retry failure ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("retry failed: {e}"), success: None, }, } } } } ReviewDecision::Denied | ReviewDecision::Abort => { // Fall through to original failure handling. ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: "exec command rejected by user".to_string(), success: None, }, } } } } async fn apply_patch( sess: &Session, sub_id: String, call_id: String, action: ApplyPatchAction, ) -> ResponseInputItem { let writable_roots_snapshot = { let guard = sess.writable_roots.lock().unwrap(); guard.clone() }; let auto_approved = match assess_patch_safety( &action, sess.approval_policy, &writable_roots_snapshot, &sess.cwd, ) { SafetyCheck::AutoApprove { .. } => true, SafetyCheck::AskUser => { // Compute a readable summary of path changes to include in the // approval request so the user can make an informed decision. let rx_approve = sess .request_patch_approval(sub_id.clone(), &action, None, None) .await; match rx_approve.await.unwrap_or_default() { ReviewDecision::Approved | ReviewDecision::ApprovedForSession => false, ReviewDecision::Denied | ReviewDecision::Abort => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: "patch rejected by user".to_string(), success: Some(false), }, }; } } } SafetyCheck::Reject { reason } => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("patch rejected: {reason}"), success: Some(false), }, }; } }; // Verify write permissions before touching the filesystem. let writable_snapshot = { sess.writable_roots.lock().unwrap().clone() }; if let Some(offending) = first_offending_path(&action, &writable_snapshot, &sess.cwd) { let root = offending.parent().unwrap_or(&offending).to_path_buf(); let reason = Some(format!( "grant write access to {} for this session", root.display() )); let rx = sess .request_patch_approval(sub_id.clone(), &action, reason.clone(), Some(root.clone())) .await; if !matches!( rx.await.unwrap_or_default(), ReviewDecision::Approved | ReviewDecision::ApprovedForSession ) { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: "patch rejected by user".to_string(), success: Some(false), }, }; } // user approved, extend writable roots for this session sess.writable_roots.lock().unwrap().push(root); } let _ = sess .tx_event .send(Event { id: sub_id.clone(), msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: call_id.clone(), auto_approved, changes: convert_apply_patch_to_protocol(&action), }), }) .await; let mut stdout = Vec::new(); let mut stderr = Vec::new(); // Enforce writable roots. If a write is blocked, collect offending root // and prompt the user to extend permissions. let mut result = apply_changes_from_apply_patch_and_report(&action, &mut stdout, &mut stderr); if let Err(err) = &result { if err.kind() == std::io::ErrorKind::PermissionDenied { // Determine first offending path. let offending_opt = action .changes() .iter() .flat_map(|(path, change)| match change { ApplyPatchFileChange::Add { .. } => vec![path.as_ref()], ApplyPatchFileChange::Delete => vec![path.as_ref()], ApplyPatchFileChange::Update { move_path: Some(move_path), .. } => { vec![path.as_ref(), move_path.as_ref()] } ApplyPatchFileChange::Update { move_path: None, .. } => vec![path.as_ref()], }) .find_map(|path: &Path| { // ApplyPatchAction promises to guarantee absolute paths. if !path.is_absolute() { panic!("apply_patch invariant failed: path is not absolute: {path:?}"); } let writable = { let roots = sess.writable_roots.lock().unwrap(); roots.iter().any(|root| path.starts_with(root)) }; if writable { None } else { Some(path.to_path_buf()) } }); if let Some(offending) = offending_opt { let root = offending.parent().unwrap_or(&offending).to_path_buf(); let reason = Some(format!( "grant write access to {} for this session", root.display() )); let rx = sess .request_patch_approval( sub_id.clone(), &action, reason.clone(), Some(root.clone()), ) .await; if matches!( rx.await.unwrap_or_default(), ReviewDecision::Approved | ReviewDecision::ApprovedForSession ) { // Extend writable roots. sess.writable_roots.lock().unwrap().push(root); stdout.clear(); stderr.clear(); result = apply_changes_from_apply_patch_and_report( &action, &mut stdout, &mut stderr, ); } } } } // Emit PatchApplyEnd event. let success_flag = result.is_ok(); let _ = sess .tx_event .send(Event { id: sub_id.clone(), msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent { call_id: call_id.clone(), stdout: String::from_utf8_lossy(&stdout).to_string(), stderr: String::from_utf8_lossy(&stderr).to_string(), success: success_flag, }), }) .await; match result { Ok(_) => ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: String::from_utf8_lossy(&stdout).to_string(), success: None, }, }, Err(e) => ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("error: {e:#}, stderr: {}", String::from_utf8_lossy(&stderr)), success: Some(false), }, }, } } /// Return the first path in `hunks` that is NOT under any of the /// `writable_roots` (after normalising). If all paths are acceptable, /// returns None. fn first_offending_path( action: &ApplyPatchAction, writable_roots: &[PathBuf], cwd: &Path, ) -> Option { let changes = action.changes(); for (path, change) in changes { let candidate = match change { ApplyPatchFileChange::Add { .. } => path, ApplyPatchFileChange::Delete => path, ApplyPatchFileChange::Update { move_path, .. } => move_path.as_ref().unwrap_or(path), }; let abs = if candidate.is_absolute() { candidate.clone() } else { cwd.join(candidate) }; let mut allowed = false; for root in writable_roots { let root_abs = if root.is_absolute() { root.clone() } else { cwd.join(root) }; if abs.starts_with(&root_abs) { allowed = true; break; } } if !allowed { return Some(candidate.clone()); } } None } fn convert_apply_patch_to_protocol(action: &ApplyPatchAction) -> HashMap { let changes = action.changes(); let mut result = HashMap::with_capacity(changes.len()); for (path, change) in changes { let protocol_change = match change { ApplyPatchFileChange::Add { content } => FileChange::Add { content: content.clone(), }, ApplyPatchFileChange::Delete => FileChange::Delete, ApplyPatchFileChange::Update { unified_diff, move_path, new_content: _new_content, } => FileChange::Update { unified_diff: unified_diff.clone(), move_path: move_path.clone(), }, }; result.insert(path.clone(), protocol_change); } result } fn apply_changes_from_apply_patch_and_report( action: &ApplyPatchAction, stdout: &mut impl std::io::Write, stderr: &mut impl std::io::Write, ) -> std::io::Result<()> { match apply_changes_from_apply_patch(action) { Ok(affected_paths) => { print_summary(&affected_paths, stdout)?; } Err(err) => { writeln!(stderr, "{err:?}")?; } } Ok(()) } fn apply_changes_from_apply_patch(action: &ApplyPatchAction) -> anyhow::Result { let mut added: Vec = Vec::new(); let mut modified: Vec = Vec::new(); let mut deleted: Vec = Vec::new(); let changes = action.changes(); for (path, change) in changes { match change { ApplyPatchFileChange::Add { content } => { if let Some(parent) = path.parent() { if !parent.as_os_str().is_empty() { std::fs::create_dir_all(parent).with_context(|| { format!("Failed to create parent directories for {}", path.display()) })?; } } std::fs::write(path, content) .with_context(|| format!("Failed to write file {}", path.display()))?; added.push(path.clone()); } ApplyPatchFileChange::Delete => { std::fs::remove_file(path) .with_context(|| format!("Failed to delete file {}", path.display()))?; deleted.push(path.clone()); } ApplyPatchFileChange::Update { unified_diff: _unified_diff, move_path, new_content, } => { if let Some(move_path) = move_path { if let Some(parent) = move_path.parent() { if !parent.as_os_str().is_empty() { std::fs::create_dir_all(parent).with_context(|| { format!( "Failed to create parent directories for {}", move_path.display() ) })?; } } std::fs::rename(path, move_path) .with_context(|| format!("Failed to rename file {}", path.display()))?; std::fs::write(move_path, new_content)?; modified.push(move_path.clone()); deleted.push(path.clone()); } else { std::fs::write(path, new_content)?; modified.push(path.clone()); } } } } Ok(AffectedPaths { added, modified, deleted, }) } fn get_writable_roots(cwd: &Path) -> Vec { let mut writable_roots = Vec::new(); if cfg!(target_os = "macos") { // On macOS, $TMPDIR is private to the user. writable_roots.push(std::env::temp_dir()); // Allow pyenv to update its shims directory. Without this, any tool // that happens to be managed by `pyenv` will fail with an error like: // // pyenv: cannot rehash: $HOME/.pyenv/shims isn't writable // // which is emitted every time `pyenv` tries to run `rehash` (for // example, after installing a new Python package that drops an entry // point). Although the sandbox is intentionally read‑only by default, // writing to the user's local `pyenv` directory is safe because it // is already user‑writable and scoped to the current user account. if let Ok(home_dir) = std::env::var("HOME") { let pyenv_dir = PathBuf::from(home_dir).join(".pyenv"); writable_roots.push(pyenv_dir); } } writable_roots.push(cwd.to_path_buf()); writable_roots } /// Exec output is a pre-serialized JSON payload fn format_exec_output(output: &str, exit_code: i32, duration: std::time::Duration) -> String { #[derive(Serialize)] struct ExecMetadata { exit_code: i32, duration_seconds: f32, } #[derive(Serialize)] struct ExecOutput<'a> { output: &'a str, metadata: ExecMetadata, } // round to 1 decimal place let duration_seconds = ((duration.as_secs_f32()) * 10.0).round() / 10.0; let payload = ExecOutput { output, metadata: ExecMetadata { exit_code, duration_seconds, }, }; #[expect(clippy::expect_used)] serde_json::to_string(&payload).expect("serialize ExecOutput") } fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { responses.iter().rev().find_map(|item| { if let ResponseItem::Message { role, content } = item { if role == "assistant" { content.iter().rev().find_map(|ci| { if let ContentItem::OutputText { text } = ci { Some(text.clone()) } else { None } }) } else { None } } else { None } }) } /// See [`ConversationHistory`] for details. fn record_conversation_history(disable_response_storage: bool, wire_api: WireApi) -> bool { if disable_response_storage { return true; } match wire_api { WireApi::Responses => false, WireApi::Chat => true, } }