use std::borrow::Cow; use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; use std::sync::atomic::AtomicU64; use std::time::Duration; use async_channel::Receiver; use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; use codex_apply_patch::MaybeApplyPatchVerified; use codex_apply_patch::maybe_parse_apply_patch_verified; use codex_login::AuthManager; use codex_protocol::protocol::ConversationHistoryResponseEvent; use codex_protocol::protocol::TaskStartedEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use futures::prelude::*; use mcp_types::CallToolResult; use serde::Serialize; use serde_json; use tokio::sync::oneshot; use tokio::task::AbortHandle; use tracing::debug; use tracing::error; use tracing::info; use tracing::trace; use tracing::warn; use uuid::Uuid; use crate::ModelProviderInfo; use crate::apply_patch; use crate::apply_patch::ApplyPatchExec; use crate::apply_patch::CODEX_APPLY_PATCH_ARG1; use crate::apply_patch::InternalApplyPatchInvocation; use crate::apply_patch::convert_apply_patch_to_protocol; use crate::client::ModelClient; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; use crate::config::Config; use crate::config_types::ShellEnvironmentPolicy; use crate::conversation_history::ConversationHistory; use crate::environment_context::EnvironmentContext; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::error::SandboxErr; use crate::error::get_error_message_ui; use crate::exec::ExecParams; use crate::exec::ExecToolCallOutput; use crate::exec::SandboxType; use crate::exec::StdoutStream; use crate::exec::StreamOutput; use crate::exec::process_exec_tool_call; use crate::exec_command::EXEC_COMMAND_TOOL_NAME; use crate::exec_command::ExecCommandParams; use crate::exec_command::ExecSessionManager; use crate::exec_command::WRITE_STDIN_TOOL_NAME; use crate::exec_command::WriteStdinParams; use crate::exec_env::create_env; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_tool_call::handle_mcp_tool_call; use crate::model_family::find_family_for_model; use crate::openai_model_info::get_model_info; use crate::openai_tools::ApplyPatchToolArgs; use crate::openai_tools::ToolsConfig; use crate::openai_tools::ToolsConfigParams; use crate::openai_tools::get_openai_tools; use crate::parse_command::parse_command; use crate::plan_tool::handle_update_plan; use crate::project_doc::get_user_instructions; use crate::protocol::AgentMessageDeltaEvent; use crate::protocol::AgentMessageEvent; use crate::protocol::AgentReasoningDeltaEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::AgentReasoningRawContentDeltaEvent; use crate::protocol::AgentReasoningRawContentEvent; use crate::protocol::AgentReasoningSectionBreakEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; use crate::protocol::AskForApproval; use crate::protocol::BackgroundEventEvent; use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecApprovalRequestEvent; use crate::protocol::ExecCommandBeginEvent; use crate::protocol::ExecCommandEndEvent; use crate::protocol::FileChange; use crate::protocol::InputItem; use crate::protocol::Op; use crate::protocol::PatchApplyBeginEvent; use crate::protocol::PatchApplyEndEvent; use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; use crate::protocol::SessionConfiguredEvent; use crate::protocol::StreamErrorEvent; use crate::protocol::Submission; use crate::protocol::TaskCompleteEvent; use crate::protocol::TurnDiffEvent; use crate::protocol::WebSearchBeginEvent; use crate::rollout::RolloutRecorder; use crate::safety::SafetyCheck; use crate::safety::assess_command_safety; use crate::safety::assess_safety_for_untrusted_command; use crate::shell; use crate::turn_diff_tracker::TurnDiffTracker; use crate::user_notification::UserNotification; use crate::util::backoff; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ContentItem; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::LocalShellAction; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::models::ShellToolCallParams; // A convenience extension trait for acquiring mutex locks where poisoning is // unrecoverable and should abort the program. This avoids scattered `.unwrap()` // calls on `lock()` while still surfacing a clear panic message when a lock is // poisoned. trait MutexExt { fn lock_unchecked(&self) -> MutexGuard<'_, T>; } impl MutexExt for Mutex { fn lock_unchecked(&self) -> MutexGuard<'_, T> { #[expect(clippy::expect_used)] self.lock().expect("poisoned lock") } } /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. pub struct Codex { next_id: AtomicU64, tx_sub: Sender, rx_event: Receiver, } /// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`], /// the submission id for the initial `ConfigureSession` request and the /// unique session id. pub struct CodexSpawnOk { pub codex: Codex, pub session_id: Uuid, } pub(crate) const INITIAL_SUBMIT_ID: &str = ""; pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 64; // Model-formatting limits: clients get full streams; oonly content sent to the model is truncated. pub(crate) const MODEL_FORMAT_MAX_BYTES: usize = 10 * 1024; // 10 KiB pub(crate) const MODEL_FORMAT_MAX_LINES: usize = 256; // lines pub(crate) const MODEL_FORMAT_HEAD_LINES: usize = MODEL_FORMAT_MAX_LINES / 2; pub(crate) const MODEL_FORMAT_TAIL_LINES: usize = MODEL_FORMAT_MAX_LINES - MODEL_FORMAT_HEAD_LINES; // 128 pub(crate) const MODEL_FORMAT_HEAD_BYTES: usize = MODEL_FORMAT_MAX_BYTES / 2; impl Codex { /// Spawn a new [`Codex`] and initialize the session. pub async fn spawn( config: Config, auth_manager: Arc, initial_history: Option>, ) -> CodexResult { let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_event, rx_event) = async_channel::unbounded(); let user_instructions = get_user_instructions(&config).await; let config = Arc::new(config); let resume_path = config.experimental_resume.clone(); let configure_session = ConfigureSession { provider: config.model_provider.clone(), model: config.model.clone(), model_reasoning_effort: config.model_reasoning_effort, model_reasoning_summary: config.model_reasoning_summary, user_instructions, base_instructions: config.base_instructions.clone(), approval_policy: config.approval_policy, sandbox_policy: config.sandbox_policy.clone(), disable_response_storage: config.disable_response_storage, notify: config.notify.clone(), cwd: config.cwd.clone(), resume_path, }; // Generate a unique ID for the lifetime of this Codex session. let (session, turn_context) = Session::new( configure_session, config.clone(), auth_manager.clone(), tx_event.clone(), initial_history, ) .await .map_err(|e| { error!("Failed to create session: {e:#}"); CodexErr::InternalAgentDied })?; let session_id = session.session_id; // This task will run until Op::Shutdown is received. tokio::spawn(submission_loop( session.clone(), turn_context, config, rx_sub, )); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, rx_event, }; Ok(CodexSpawnOk { codex, session_id }) } /// Submit the `op` wrapped in a `Submission` with a unique ID. pub async fn submit(&self, op: Op) -> CodexResult { let id = self .next_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) .to_string(); let sub = Submission { id: id.clone(), op }; self.submit_with_id(sub).await?; Ok(id) } /// Use sparingly: prefer `submit()` so Codex is responsible for generating /// unique IDs for each submission. pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { self.tx_sub .send(sub) .await .map_err(|_| CodexErr::InternalAgentDied)?; Ok(()) } pub async fn next_event(&self) -> CodexResult { let event = self .rx_event .recv() .await .map_err(|_| CodexErr::InternalAgentDied)?; Ok(event) } } /// Mutable state of the agent #[derive(Default)] struct State { approved_commands: HashSet>, current_task: Option, pending_approvals: HashMap>, pending_input: Vec, history: ConversationHistory, } /// Context for an initialized model agent /// /// A session has at most 1 running task at a time, and can be interrupted by user input. pub(crate) struct Session { session_id: Uuid, tx_event: Sender, /// Manager for external MCP servers/tools. mcp_connection_manager: McpConnectionManager, session_manager: ExecSessionManager, /// External notifier command (will be passed as args to exec()). When /// `None` this feature is disabled. notify: Option>, /// Optional rollout recorder for persisting the conversation transcript so /// sessions can be replayed or inspected later. rollout: Mutex>, state: Mutex, codex_linux_sandbox_exe: Option, user_shell: shell::Shell, show_raw_agent_reasoning: bool, } /// The context needed for a single turn of the conversation. #[derive(Debug)] pub(crate) struct TurnContext { pub(crate) client: ModelClient, /// The session's current working directory. All relative paths provided by /// the model as well as sandbox policies are resolved against this path /// instead of `std::env::current_dir()`. pub(crate) cwd: PathBuf, pub(crate) base_instructions: Option, pub(crate) user_instructions: Option, pub(crate) approval_policy: AskForApproval, pub(crate) sandbox_policy: SandboxPolicy, pub(crate) shell_environment_policy: ShellEnvironmentPolicy, pub(crate) disable_response_storage: bool, pub(crate) tools_config: ToolsConfig, } impl TurnContext { fn resolve_path(&self, path: Option) -> PathBuf { path.as_ref() .map(PathBuf::from) .map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p)) } } /// Configure the model session. struct ConfigureSession { /// Provider identifier ("openai", "openrouter", ...). provider: ModelProviderInfo, /// If not specified, server will use its default model. model: String, model_reasoning_effort: ReasoningEffortConfig, model_reasoning_summary: ReasoningSummaryConfig, /// Model instructions that are appended to the base instructions. user_instructions: Option, /// Base instructions override. base_instructions: Option, /// When to escalate for approval for execution approval_policy: AskForApproval, /// How to sandbox commands executed in the system sandbox_policy: SandboxPolicy, /// Disable server-side response storage (send full context each request) disable_response_storage: bool, /// Optional external notifier command tokens. Present only when the /// client wants the agent to spawn a program after each completed /// turn. notify: Option>, /// Working directory that should be treated as the *root* of the /// session. All relative paths supplied by the model as well as the /// execution sandbox are resolved against this directory **instead** /// of the process-wide current working directory. CLI front-ends are /// expected to expand this to an absolute path before sending the /// `ConfigureSession` operation so that the business-logic layer can /// operate deterministically. cwd: PathBuf, resume_path: Option, } impl Session { async fn new( configure_session: ConfigureSession, config: Arc, auth_manager: Arc, tx_event: Sender, initial_history: Option>, ) -> anyhow::Result<(Arc, TurnContext)> { let ConfigureSession { provider, model, model_reasoning_effort, model_reasoning_summary, user_instructions, base_instructions, approval_policy, sandbox_policy, disable_response_storage, notify, cwd, resume_path, } = configure_session; debug!("Configuring session: model={model}; provider={provider:?}"); if !cwd.is_absolute() { return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}")); } // Error messages to dispatch after SessionConfigured is sent. let mut post_session_configured_error_events = Vec::::new(); // Kick off independent async setup tasks in parallel to reduce startup latency. // // - initialize RolloutRecorder with new or resumed session info // - spin up MCP connection manager // - perform default shell discovery // - load history metadata let rollout_fut = async { match resume_path.as_ref() { Some(path) => RolloutRecorder::resume(path, cwd.clone()) .await .map(|(rec, saved)| (saved.session_id, Some(saved), rec)), None => { let session_id = Uuid::new_v4(); RolloutRecorder::new(&config, session_id, user_instructions.clone()) .await .map(|rec| (session_id, None, rec)) } } }; let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone()); let default_shell_fut = shell::default_user_shell(); let history_meta_fut = crate::message_history::history_metadata(&config); // Join all independent futures. let (rollout_res, mcp_res, default_shell, (history_log_id, history_entry_count)) = tokio::join!(rollout_fut, mcp_fut, default_shell_fut, history_meta_fut); // Handle rollout result, which determines the session_id. struct RolloutResult { session_id: Uuid, rollout_recorder: Option, restored_items: Option>, } let rollout_result = match rollout_res { Ok((session_id, maybe_saved, recorder)) => { let restored_items: Option> = initial_history.or_else(|| { maybe_saved.and_then(|saved_session| { if saved_session.items.is_empty() { None } else { Some(saved_session.items) } }) }); RolloutResult { session_id, rollout_recorder: Some(recorder), restored_items, } } Err(e) => { if let Some(path) = resume_path.as_ref() { return Err(anyhow::anyhow!( "failed to resume rollout from {path:?}: {e}" )); } let message = format!("failed to initialize rollout recorder: {e}"); post_session_configured_error_events.push(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::Error(ErrorEvent { message: message.clone(), }), }); warn!("{message}"); RolloutResult { session_id: Uuid::new_v4(), rollout_recorder: None, restored_items: None, } } }; let RolloutResult { session_id, rollout_recorder, restored_items, } = rollout_result; // Create the mutable state for the Session. let mut state = State { history: ConversationHistory::new(), ..Default::default() }; if let Some(restored_items) = restored_items { state.history.record_items(&restored_items); } // Handle MCP manager result and record any startup failures. let (mcp_connection_manager, failed_clients) = match mcp_res { Ok((mgr, failures)) => (mgr, failures), Err(e) => { let message = format!("Failed to create MCP connection manager: {e:#}"); error!("{message}"); post_session_configured_error_events.push(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::Error(ErrorEvent { message }), }); (McpConnectionManager::default(), Default::default()) } }; // Surface individual client start-up failures to the user. if !failed_clients.is_empty() { for (server_name, err) in failed_clients { let message = format!("MCP client for `{server_name}` failed to start: {err:#}"); error!("{message}"); post_session_configured_error_events.push(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::Error(ErrorEvent { message }), }); } } // Now that `session_id` is final (may have been updated by resume), // construct the model client. let client = ModelClient::new( config.clone(), Some(auth_manager.clone()), provider.clone(), model_reasoning_effort, model_reasoning_summary, session_id, ); let turn_context = TurnContext { client, tools_config: ToolsConfig::new(&ToolsConfigParams { model_family: &config.model_family, approval_policy, sandbox_policy: sandbox_policy.clone(), include_plan_tool: config.include_plan_tool, include_apply_patch_tool: config.include_apply_patch_tool, include_web_search_request: config.tools_web_search_request, use_streamable_shell_tool: config.use_experimental_streamable_shell_tool, }), user_instructions, base_instructions, approval_policy, sandbox_policy, shell_environment_policy: config.shell_environment_policy.clone(), cwd, disable_response_storage, }; let sess = Arc::new(Session { session_id, tx_event: tx_event.clone(), mcp_connection_manager, session_manager: ExecSessionManager::default(), notify, state: Mutex::new(state), rollout: Mutex::new(rollout_recorder), codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), user_shell: default_shell, show_raw_agent_reasoning: config.show_raw_agent_reasoning, }); // record the initial user instructions and environment context, // regardless of whether we restored items. let mut conversation_items = Vec::::with_capacity(2); if let Some(user_instructions) = turn_context.user_instructions.as_deref() { conversation_items.push(Prompt::format_user_instructions_message(user_instructions)); } conversation_items.push(ResponseItem::from(EnvironmentContext::new( Some(turn_context.cwd.clone()), Some(turn_context.approval_policy), Some(turn_context.sandbox_policy.clone()), Some(sess.user_shell.clone()), ))); sess.record_conversation_items(&conversation_items).await; // Dispatch the SessionConfiguredEvent first and then report any errors. let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { session_id, model, history_log_id, history_entry_count, }), }) .chain(post_session_configured_error_events.into_iter()); for event in events { if let Err(e) = tx_event.send(event).await { error!("failed to send event: {e:?}"); } } Ok((sess, turn_context)) } pub fn set_task(&self, task: AgentTask) { let mut state = self.state.lock_unchecked(); if let Some(current_task) = state.current_task.take() { current_task.abort(TurnAbortReason::Replaced); } state.current_task = Some(task); } pub fn remove_task(&self, sub_id: &str) { let mut state = self.state.lock_unchecked(); if let Some(task) = &state.current_task && task.sub_id == sub_id { state.current_task.take(); } } /// Sends the given event to the client and swallows the send event, if /// any, logging it as an error. pub(crate) async fn send_event(&self, event: Event) { if let Err(e) = self.tx_event.send(event).await { error!("failed to send tool call event: {e}"); } } pub async fn request_command_approval( &self, sub_id: String, call_id: String, command: Vec, cwd: PathBuf, reason: Option, ) -> oneshot::Receiver { let (tx_approve, rx_approve) = oneshot::channel(); let event = Event { id: sub_id.clone(), msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { call_id, command, cwd, reason, }), }; let _ = self.tx_event.send(event).await; { let mut state = self.state.lock_unchecked(); state.pending_approvals.insert(sub_id, tx_approve); } rx_approve } pub async fn request_patch_approval( &self, sub_id: String, call_id: String, action: &ApplyPatchAction, reason: Option, grant_root: Option, ) -> oneshot::Receiver { let (tx_approve, rx_approve) = oneshot::channel(); let event = Event { id: sub_id.clone(), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, changes: convert_apply_patch_to_protocol(action), reason, grant_root, }), }; let _ = self.tx_event.send(event).await; { let mut state = self.state.lock_unchecked(); state.pending_approvals.insert(sub_id, tx_approve); } rx_approve } pub fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) { let mut state = self.state.lock_unchecked(); if let Some(tx_approve) = state.pending_approvals.remove(sub_id) { tx_approve.send(decision).ok(); } } pub fn add_approved_command(&self, cmd: Vec) { let mut state = self.state.lock_unchecked(); state.approved_commands.insert(cmd); } /// Records items to both the rollout and the chat completions/ZDR /// transcript, if enabled. async fn record_conversation_items(&self, items: &[ResponseItem]) { debug!("Recording items for conversation: {items:?}"); self.record_state_snapshot(items).await; self.state.lock_unchecked().history.record_items(items); } async fn record_state_snapshot(&self, items: &[ResponseItem]) { let snapshot = { crate::rollout::SessionStateSnapshot {} }; let recorder = { let guard = self.rollout.lock_unchecked(); guard.as_ref().cloned() }; if let Some(rec) = recorder { if let Err(e) = rec.record_state(snapshot).await { error!("failed to record rollout state: {e:#}"); } if let Err(e) = rec.record_items(items).await { error!("failed to record rollout items: {e:#}"); } } } async fn on_exec_command_begin( &self, turn_diff_tracker: &mut TurnDiffTracker, exec_command_context: ExecCommandContext, ) { let ExecCommandContext { sub_id, call_id, command_for_display, cwd, apply_patch, } = exec_command_context; let msg = match apply_patch { Some(ApplyPatchCommandContext { user_explicitly_approved_this_action, changes, }) => { turn_diff_tracker.on_patch_begin(&changes); EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id, auto_approved: !user_explicitly_approved_this_action, changes, }) } None => EventMsg::ExecCommandBegin(ExecCommandBeginEvent { call_id, command: command_for_display.clone(), cwd, parsed_cmd: parse_command(&command_for_display) .into_iter() .map(Into::into) .collect(), }), }; let event = Event { id: sub_id.to_string(), msg, }; let _ = self.tx_event.send(event).await; } async fn on_exec_command_end( &self, turn_diff_tracker: &mut TurnDiffTracker, sub_id: &str, call_id: &str, output: &ExecToolCallOutput, is_apply_patch: bool, ) { let ExecToolCallOutput { stdout, stderr, aggregated_output, duration, exit_code, } = output; // Send full stdout/stderr to clients; do not truncate. let stdout = stdout.text.clone(); let stderr = stderr.text.clone(); let formatted_output = format_exec_output_str(output); let aggregated_output: String = aggregated_output.text.clone(); let msg = if is_apply_patch { EventMsg::PatchApplyEnd(PatchApplyEndEvent { call_id: call_id.to_string(), stdout, stderr, success: *exit_code == 0, }) } else { EventMsg::ExecCommandEnd(ExecCommandEndEvent { call_id: call_id.to_string(), stdout, stderr, aggregated_output, exit_code: *exit_code, duration: *duration, formatted_output, }) }; let event = Event { id: sub_id.to_string(), msg, }; let _ = self.tx_event.send(event).await; // If this is an apply_patch, after we emit the end patch, emit a second event // with the full turn diff if there is one. if is_apply_patch { let unified_diff = turn_diff_tracker.get_unified_diff(); if let Ok(Some(unified_diff)) = unified_diff { let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff }); let event = Event { id: sub_id.into(), msg, }; let _ = self.tx_event.send(event).await; } } } /// Runs the exec tool call and emits events for the begin and end of the /// command even on error. /// /// Returns the output of the exec tool call. async fn run_exec_with_events<'a>( &self, turn_diff_tracker: &mut TurnDiffTracker, begin_ctx: ExecCommandContext, exec_args: ExecInvokeArgs<'a>, ) -> crate::error::Result { let is_apply_patch = begin_ctx.apply_patch.is_some(); let sub_id = begin_ctx.sub_id.clone(); let call_id = begin_ctx.call_id.clone(); self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone()) .await; let result = process_exec_tool_call( exec_args.params, exec_args.sandbox_type, exec_args.sandbox_policy, exec_args.codex_linux_sandbox_exe, exec_args.stdout_stream, ) .await; let output_stderr; let borrowed: &ExecToolCallOutput = match &result { Ok(output) => output, Err(e) => { output_stderr = ExecToolCallOutput { exit_code: -1, stdout: StreamOutput::new(String::new()), stderr: StreamOutput::new(get_error_message_ui(e)), aggregated_output: StreamOutput::new(get_error_message_ui(e)), duration: Duration::default(), }; &output_stderr } }; self.on_exec_command_end( turn_diff_tracker, &sub_id, &call_id, borrowed, is_apply_patch, ) .await; result } /// Helper that emits a BackgroundEvent with the given message. This keeps /// the call‑sites terse so adding more diagnostics does not clutter the /// core agent logic. async fn notify_background_event(&self, sub_id: &str, message: impl Into) { let event = Event { id: sub_id.to_string(), msg: EventMsg::BackgroundEvent(BackgroundEventEvent { message: message.into(), }), }; let _ = self.tx_event.send(event).await; } async fn notify_stream_error(&self, sub_id: &str, message: impl Into) { let event = Event { id: sub_id.to_string(), msg: EventMsg::StreamError(StreamErrorEvent { message: message.into(), }), }; let _ = self.tx_event.send(event).await; } /// Build the full turn input by concatenating the current conversation /// history with additional items for this turn. pub fn turn_input_with_history(&self, extra: Vec) -> Vec { [self.state.lock_unchecked().history.contents(), extra].concat() } /// Returns the input if there was no task running to inject into pub fn inject_input(&self, input: Vec) -> Result<(), Vec> { let mut state = self.state.lock_unchecked(); if state.current_task.is_some() { state.pending_input.push(input.into()); Ok(()) } else { Err(input) } } pub fn get_pending_input(&self) -> Vec { let mut state = self.state.lock_unchecked(); if state.pending_input.is_empty() { Vec::with_capacity(0) } else { let mut ret = Vec::new(); std::mem::swap(&mut ret, &mut state.pending_input); ret } } pub async fn call_tool( &self, server: &str, tool: &str, arguments: Option, timeout: Option, ) -> anyhow::Result { self.mcp_connection_manager .call_tool(server, tool, arguments, timeout) .await } fn interrupt_task(&self) { info!("interrupt received: abort current task, if any"); let mut state = self.state.lock_unchecked(); state.pending_approvals.clear(); state.pending_input.clear(); if let Some(task) = state.current_task.take() { task.abort(TurnAbortReason::Interrupted); } } /// Spawn the configured notifier (if any) with the given JSON payload as /// the last argument. Failures are logged but otherwise ignored so that /// notification issues do not interfere with the main workflow. fn maybe_notify(&self, notification: UserNotification) { let Some(notify_command) = &self.notify else { return; }; if notify_command.is_empty() { return; } let Ok(json) = serde_json::to_string(¬ification) else { error!("failed to serialise notification payload"); return; }; let mut command = std::process::Command::new(¬ify_command[0]); if notify_command.len() > 1 { command.args(¬ify_command[1..]); } command.arg(json); // Fire-and-forget – we do not wait for completion. if let Err(e) = command.spawn() { warn!("failed to spawn notifier '{}': {e}", notify_command[0]); } } } impl Drop for Session { fn drop(&mut self) { self.interrupt_task(); } } #[derive(Clone, Debug)] pub(crate) struct ExecCommandContext { pub(crate) sub_id: String, pub(crate) call_id: String, pub(crate) command_for_display: Vec, pub(crate) cwd: PathBuf, pub(crate) apply_patch: Option, } #[derive(Clone, Debug)] pub(crate) struct ApplyPatchCommandContext { pub(crate) user_explicitly_approved_this_action: bool, pub(crate) changes: HashMap, } /// A series of Turns in response to user input. pub(crate) struct AgentTask { sess: Arc, sub_id: String, handle: AbortHandle, } impl AgentTask { fn spawn( sess: Arc, turn_context: Arc, sub_id: String, input: Vec, ) -> Self { let handle = { let sess = sess.clone(); let sub_id = sub_id.clone(); let tc = Arc::clone(&turn_context); tokio::spawn(async move { run_task(sess, tc.as_ref(), sub_id, input).await }) .abort_handle() }; Self { sess, sub_id, handle, } } fn compact( sess: Arc, turn_context: Arc, sub_id: String, input: Vec, compact_instructions: String, ) -> Self { let handle = { let sess = sess.clone(); let sub_id = sub_id.clone(); let tc = Arc::clone(&turn_context); tokio::spawn(async move { run_compact_task(sess, tc.as_ref(), sub_id, input, compact_instructions).await }) .abort_handle() }; Self { sess, sub_id, handle, } } fn abort(self, reason: TurnAbortReason) { // TOCTOU? if !self.handle.is_finished() { self.handle.abort(); let event = Event { id: self.sub_id, msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }), }; let tx_event = self.sess.tx_event.clone(); tokio::spawn(async move { tx_event.send(event).await.ok(); }); } } } async fn submission_loop( sess: Arc, turn_context: TurnContext, config: Arc, rx_sub: Receiver, ) { // Wrap once to avoid cloning TurnContext for each task. let mut turn_context = Arc::new(turn_context); // To break out of this loop, send Op::Shutdown. while let Ok(sub) = rx_sub.recv().await { debug!(?sub, "Submission"); match sub.op { Op::Interrupt => { sess.interrupt_task(); } Op::OverrideTurnContext { cwd, approval_policy, sandbox_policy, model, effort, summary, } => { // Recalculate the persistent turn context with provided overrides. let prev = Arc::clone(&turn_context); let provider = prev.client.get_provider(); // Effective model + family let (effective_model, effective_family) = if let Some(m) = model { let fam = find_family_for_model(&m).unwrap_or_else(|| config.model_family.clone()); (m, fam) } else { (prev.client.get_model(), prev.client.get_model_family()) }; // Effective reasoning settings let effective_effort = effort.unwrap_or(prev.client.get_reasoning_effort()); let effective_summary = summary.unwrap_or(prev.client.get_reasoning_summary()); let auth_manager = prev.client.get_auth_manager(); // Build updated config for the client let mut updated_config = (*config).clone(); updated_config.model = effective_model.clone(); updated_config.model_family = effective_family.clone(); if let Some(model_info) = get_model_info(&effective_family) { updated_config.model_context_window = Some(model_info.context_window); } let client = ModelClient::new( Arc::new(updated_config), auth_manager, provider, effective_effort, effective_summary, sess.session_id, ); let new_approval_policy = approval_policy.unwrap_or(prev.approval_policy); let new_sandbox_policy = sandbox_policy .clone() .unwrap_or(prev.sandbox_policy.clone()); let new_cwd = cwd.clone().unwrap_or_else(|| prev.cwd.clone()); let tools_config = ToolsConfig::new(&ToolsConfigParams { model_family: &effective_family, approval_policy: new_approval_policy, sandbox_policy: new_sandbox_policy.clone(), include_plan_tool: config.include_plan_tool, include_apply_patch_tool: config.include_apply_patch_tool, include_web_search_request: config.tools_web_search_request, use_streamable_shell_tool: config.use_experimental_streamable_shell_tool, }); let new_turn_context = TurnContext { client, tools_config, user_instructions: prev.user_instructions.clone(), base_instructions: prev.base_instructions.clone(), approval_policy: new_approval_policy, sandbox_policy: new_sandbox_policy.clone(), shell_environment_policy: prev.shell_environment_policy.clone(), cwd: new_cwd.clone(), disable_response_storage: prev.disable_response_storage, }; // Install the new persistent context for subsequent tasks/turns. turn_context = Arc::new(new_turn_context); if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() { sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new( cwd, approval_policy, sandbox_policy, // Shell is not configurable from turn to turn None, ))]) .await; } } Op::UserInput { items } => { // attempt to inject input into current task if let Err(items) = sess.inject_input(items) { // no current task, spawn a new one let task = AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items); sess.set_task(task); } } Op::UserTurn { items, cwd, approval_policy, sandbox_policy, model, effort, summary, } => { // attempt to inject input into current task if let Err(items) = sess.inject_input(items) { // Derive a fresh TurnContext for this turn using the provided overrides. let provider = turn_context.client.get_provider(); let auth_manager = turn_context.client.get_auth_manager(); // Derive a model family for the requested model; fall back to the session's. let model_family = find_family_for_model(&model) .unwrap_or_else(|| config.model_family.clone()); // Create a per‑turn Config clone with the requested model/family. let mut per_turn_config = (*config).clone(); per_turn_config.model = model.clone(); per_turn_config.model_family = model_family.clone(); if let Some(model_info) = get_model_info(&model_family) { per_turn_config.model_context_window = Some(model_info.context_window); } // Build a new client with per‑turn reasoning settings. // Reuse the same provider and session id; auth defaults to env/API key. let client = ModelClient::new( Arc::new(per_turn_config), auth_manager, provider, effort, summary, sess.session_id, ); let fresh_turn_context = TurnContext { client, tools_config: ToolsConfig::new(&ToolsConfigParams { model_family: &model_family, approval_policy, sandbox_policy: sandbox_policy.clone(), include_plan_tool: config.include_plan_tool, include_apply_patch_tool: config.include_apply_patch_tool, include_web_search_request: config.tools_web_search_request, use_streamable_shell_tool: config .use_experimental_streamable_shell_tool, }), user_instructions: turn_context.user_instructions.clone(), base_instructions: turn_context.base_instructions.clone(), approval_policy, sandbox_policy, shell_environment_policy: turn_context.shell_environment_policy.clone(), cwd, disable_response_storage: turn_context.disable_response_storage, }; // TODO: record the new environment context in the conversation history // no current task, spawn a new one with the per‑turn context let task = AgentTask::spawn(sess.clone(), Arc::new(fresh_turn_context), sub.id, items); sess.set_task(task); } } Op::ExecApproval { id, decision } => match decision { ReviewDecision::Abort => { sess.interrupt_task(); } other => sess.notify_approval(&id, other), }, Op::PatchApproval { id, decision } => match decision { ReviewDecision::Abort => { sess.interrupt_task(); } other => sess.notify_approval(&id, other), }, Op::AddToHistory { text } => { let id = sess.session_id; let config = config.clone(); tokio::spawn(async move { if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await { warn!("failed to append to message history: {e}"); } }); } Op::GetHistoryEntryRequest { offset, log_id } => { let config = config.clone(); let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); tokio::spawn(async move { // Run lookup in blocking thread because it does file IO + locking. let entry_opt = tokio::task::spawn_blocking(move || { crate::message_history::lookup(log_id, offset, &config) }) .await .unwrap_or(None); let event = Event { id: sub_id, msg: EventMsg::GetHistoryEntryResponse( crate::protocol::GetHistoryEntryResponseEvent { offset, log_id, entry: entry_opt.map(|e| { codex_protocol::message_history::HistoryEntry { session_id: e.session_id, ts: e.ts, text: e.text, } }), }, ), }; if let Err(e) = tx_event.send(event).await { warn!("failed to send GetHistoryEntryResponse event: {e}"); } }); } Op::ListMcpTools => { let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); // This is a cheap lookup from the connection manager's cache. let tools = sess.mcp_connection_manager.list_all_tools(); let event = Event { id: sub_id, msg: EventMsg::McpListToolsResponse( crate::protocol::McpListToolsResponseEvent { tools }, ), }; if let Err(e) = tx_event.send(event).await { warn!("failed to send McpListToolsResponse event: {e}"); } } Op::Compact => { // Create a summarization request as user input const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md"); // Attempt to inject input into current task if let Err(items) = sess.inject_input(vec![InputItem::Text { text: "Start Summarization".to_string(), }]) { let task = AgentTask::compact( sess.clone(), Arc::clone(&turn_context), sub.id, items, SUMMARIZATION_PROMPT.to_string(), ); sess.set_task(task); } } Op::Shutdown => { info!("Shutting down Codex instance"); // Gracefully flush and shutdown rollout recorder on session end so tests // that inspect the rollout file do not race with the background writer. let recorder_opt = sess.rollout.lock_unchecked().take(); if let Some(rec) = recorder_opt && let Err(e) = rec.shutdown().await { warn!("failed to shutdown rollout recorder: {e}"); let event = Event { id: sub.id.clone(), msg: EventMsg::Error(ErrorEvent { message: "Failed to shutdown rollout recorder".to_string(), }), }; if let Err(e) = sess.tx_event.send(event).await { warn!("failed to send error message: {e:?}"); } } let event = Event { id: sub.id.clone(), msg: EventMsg::ShutdownComplete, }; if let Err(e) = sess.tx_event.send(event).await { warn!("failed to send Shutdown event: {e}"); } break; } Op::GetHistory => { let tx_event = sess.tx_event.clone(); let sub_id = sub.id.clone(); let event = Event { id: sub_id.clone(), msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent { conversation_id: sess.session_id, entries: sess.state.lock_unchecked().history.contents(), }), }; if let Err(e) = tx_event.send(event).await { warn!("failed to send ConversationHistory event: {e}"); } } _ => { // Ignore unknown ops; enum is non_exhaustive to allow extensions. } } } debug!("Agent loop exited"); } /// Takes a user message as input and runs a loop where, at each turn, the model /// replies with either: /// /// - requested function calls /// - an assistant message /// /// While it is possible for the model to return multiple of these items in a /// single turn, in practice, we generally one item per turn: /// /// - If the model requests a function call, we execute it and send the output /// back to the model in the next turn. /// - If the model sends only an assistant message, we record it in the /// conversation history and consider the task complete. async fn run_task( sess: Arc, turn_context: &TurnContext, sub_id: String, input: Vec, ) { if input.is_empty() { return; } let event = Event { id: sub_id.clone(), msg: EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }), }; if sess.tx_event.send(event).await.is_err() { return; } let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); sess.record_conversation_items(&[initial_input_for_turn.clone().into()]) .await; let mut last_agent_message: Option = None; // Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains // many turns, from the perspective of the user, it is a single turn. let mut turn_diff_tracker = TurnDiffTracker::new(); loop { // Note that pending_input would be something like a message the user // submitted through the UI while the model was running. Though the UI // may support this, the model might not. let pending_input = sess .get_pending_input() .into_iter() .map(ResponseItem::from) .collect::>(); sess.record_conversation_items(&pending_input).await; // Construct the input that we will send to the model. When using the // Chat completions API (or ZDR clients), the model needs the full // conversation history on each turn. The rollout file, however, should // only record the new items that originated in this turn so that it // represents an append-only log without duplicates. let turn_input: Vec = sess.turn_input_with_history(pending_input); let turn_input_messages: Vec = turn_input .iter() .filter_map(|item| match item { ResponseItem::Message { content, .. } => Some(content), _ => None, }) .flat_map(|content| { content.iter().filter_map(|item| match item { ContentItem::OutputText { text } => Some(text.clone()), _ => None, }) }) .collect(); match run_turn( &sess, turn_context, &mut turn_diff_tracker, sub_id.clone(), turn_input, ) .await { Ok(turn_output) => { let mut items_to_record_in_conversation_history = Vec::::new(); let mut responses = Vec::::new(); for processed_response_item in turn_output { let ProcessedResponseItem { item, response } = processed_response_item; match (&item, &response) { (ResponseItem::Message { role, .. }, None) if role == "assistant" => { // If the model returned a message, we need to record it. items_to_record_in_conversation_history.push(item); } ( ResponseItem::LocalShellCall { .. }, Some(ResponseInputItem::FunctionCallOutput { call_id, output }), ) => { items_to_record_in_conversation_history.push(item); items_to_record_in_conversation_history.push( ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output: output.clone(), }, ); } ( ResponseItem::FunctionCall { .. }, Some(ResponseInputItem::FunctionCallOutput { call_id, output }), ) => { items_to_record_in_conversation_history.push(item); items_to_record_in_conversation_history.push( ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output: output.clone(), }, ); } ( ResponseItem::CustomToolCall { .. }, Some(ResponseInputItem::CustomToolCallOutput { call_id, output }), ) => { items_to_record_in_conversation_history.push(item); items_to_record_in_conversation_history.push( ResponseItem::CustomToolCallOutput { call_id: call_id.clone(), output: output.clone(), }, ); } ( ResponseItem::FunctionCall { .. }, Some(ResponseInputItem::McpToolCallOutput { call_id, result }), ) => { items_to_record_in_conversation_history.push(item); let output = match result { Ok(call_tool_result) => { convert_call_tool_result_to_function_call_output_payload( call_tool_result, ) } Err(err) => FunctionCallOutputPayload { content: err.clone(), success: Some(false), }, }; items_to_record_in_conversation_history.push( ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output, }, ); } ( ResponseItem::Reasoning { id, summary, content, encrypted_content, }, None, ) => { items_to_record_in_conversation_history.push(ResponseItem::Reasoning { id: id.clone(), summary: summary.clone(), content: content.clone(), encrypted_content: encrypted_content.clone(), }); } _ => { warn!("Unexpected response item: {item:?} with response: {response:?}"); } }; if let Some(response) = response { responses.push(response); } } // Only attempt to take the lock if there is something to record. if !items_to_record_in_conversation_history.is_empty() { sess.record_conversation_items(&items_to_record_in_conversation_history) .await; } if responses.is_empty() { debug!("Turn completed"); last_agent_message = get_last_assistant_message_from_turn( &items_to_record_in_conversation_history, ); sess.maybe_notify(UserNotification::AgentTurnComplete { turn_id: sub_id.clone(), input_messages: turn_input_messages, last_assistant_message: last_agent_message.clone(), }); break; } } Err(e) => { info!("Turn error: {e:#}"); let event = Event { id: sub_id.clone(), msg: EventMsg::Error(ErrorEvent { message: e.to_string(), }), }; sess.tx_event.send(event).await.ok(); // let the user continue the conversation break; } } } sess.remove_task(&sub_id); let event = Event { id: sub_id, msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }), }; sess.tx_event.send(event).await.ok(); } async fn run_turn( sess: &Session, turn_context: &TurnContext, turn_diff_tracker: &mut TurnDiffTracker, sub_id: String, input: Vec, ) -> CodexResult> { let tools = get_openai_tools( &turn_context.tools_config, Some(sess.mcp_connection_manager.list_all_tools()), ); let prompt = Prompt { input, store: !turn_context.disable_response_storage, tools, base_instructions_override: turn_context.base_instructions.clone(), }; let mut retries = 0; loop { match try_run_turn(sess, turn_context, turn_diff_tracker, &sub_id, &prompt).await { Ok(output) => return Ok(output), Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted), Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)), Err(e @ (CodexErr::UsageLimitReached(_) | CodexErr::UsageNotIncluded)) => { return Err(e); } Err(e) => { // Use the configured provider-specific stream retry budget. let max_retries = turn_context.client.get_provider().stream_max_retries(); if retries < max_retries { retries += 1; let delay = match e { CodexErr::Stream(_, Some(delay)) => delay, _ => backoff(retries), }; warn!( "stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...", ); // Surface retry information to any UI/front‑end so the // user understands what is happening instead of staring // at a seemingly frozen screen. sess.notify_stream_error( &sub_id, format!( "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…" ), ) .await; tokio::time::sleep(delay).await; } else { return Err(e); } } } } } /// When the model is prompted, it returns a stream of events. Some of these /// events map to a `ResponseItem`. A `ResponseItem` may need to be /// "handled" such that it produces a `ResponseInputItem` that needs to be /// sent back to the model on the next turn. #[derive(Debug)] struct ProcessedResponseItem { item: ResponseItem, response: Option, } async fn try_run_turn( sess: &Session, turn_context: &TurnContext, turn_diff_tracker: &mut TurnDiffTracker, sub_id: &str, prompt: &Prompt, ) -> CodexResult> { // call_ids that are part of this response. let completed_call_ids = prompt .input .iter() .filter_map(|ri| match ri { ResponseItem::FunctionCallOutput { call_id, .. } => Some(call_id), ResponseItem::LocalShellCall { call_id: Some(call_id), .. } => Some(call_id), ResponseItem::CustomToolCallOutput { call_id, .. } => Some(call_id), _ => None, }) .collect::>(); // call_ids that were pending but are not part of this response. // This usually happens because the user interrupted the model before we responded to one of its tool calls // and then the user sent a follow-up message. let missing_calls = { prompt .input .iter() .filter_map(|ri| match ri { ResponseItem::FunctionCall { call_id, .. } => Some(call_id), ResponseItem::LocalShellCall { call_id: Some(call_id), .. } => Some(call_id), ResponseItem::CustomToolCall { call_id, .. } => Some(call_id), _ => None, }) .filter_map(|call_id| { if completed_call_ids.contains(&call_id) { None } else { Some(call_id.clone()) } }) .map(|call_id| ResponseItem::CustomToolCallOutput { call_id: call_id.clone(), output: "aborted".to_string(), }) .collect::>() }; let prompt: Cow = if missing_calls.is_empty() { Cow::Borrowed(prompt) } else { // Add the synthetic aborted missing calls to the beginning of the input to ensure all call ids have responses. let input = [missing_calls, prompt.input.clone()].concat(); Cow::Owned(Prompt { input, ..prompt.clone() }) }; let mut stream = turn_context.client.clone().stream(&prompt).await?; let mut output = Vec::new(); loop { // Poll the next item from the model stream. We must inspect *both* Ok and Err // cases so that transient stream failures (e.g., dropped SSE connection before // `response.completed`) bubble up and trigger the caller's retry logic. let event = stream.next().await; let Some(event) = event else { // Channel closed without yielding a final Completed event or explicit error. // Treat as a disconnected stream so the caller can retry. return Err(CodexErr::Stream( "stream closed before response.completed".into(), None, )); }; let event = match event { Ok(ev) => ev, Err(e) => { // Propagate the underlying stream error to the caller (run_turn), which // will apply the configured `stream_max_retries` policy. return Err(e); } }; match event { ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { let response = handle_response_item( sess, turn_context, turn_diff_tracker, sub_id, item.clone(), ) .await?; output.push(ProcessedResponseItem { item, response }); } ResponseEvent::WebSearchCallBegin { call_id, query } => { let q = query.unwrap_or_else(|| "Searching Web...".to_string()); let _ = sess .tx_event .send(Event { id: sub_id.to_string(), msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id, query: q }), }) .await; } ResponseEvent::Completed { response_id: _, token_usage, } => { if let Some(token_usage) = token_usage { sess.tx_event .send(Event { id: sub_id.to_string(), msg: EventMsg::TokenCount(token_usage), }) .await .ok(); } let unified_diff = turn_diff_tracker.get_unified_diff(); if let Ok(Some(unified_diff)) = unified_diff { let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff }); let event = Event { id: sub_id.to_string(), msg, }; let _ = sess.tx_event.send(event).await; } return Ok(output); } ResponseEvent::OutputTextDelta(delta) => { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }), }; sess.tx_event.send(event).await.ok(); } ResponseEvent::ReasoningSummaryDelta(delta) => { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }), }; sess.tx_event.send(event).await.ok(); } ResponseEvent::ReasoningSummaryPartAdded => { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}), }; sess.tx_event.send(event).await.ok(); } ResponseEvent::ReasoningContentDelta(delta) => { if sess.show_raw_agent_reasoning { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoningRawContentDelta( AgentReasoningRawContentDeltaEvent { delta }, ), }; sess.tx_event.send(event).await.ok(); } } } } } async fn run_compact_task( sess: Arc, turn_context: &TurnContext, sub_id: String, input: Vec, compact_instructions: String, ) { let model_context_window = turn_context.client.get_model_context_window(); let start_event = Event { id: sub_id.clone(), msg: EventMsg::TaskStarted(TaskStartedEvent { model_context_window, }), }; if sess.tx_event.send(start_event).await.is_err() { return; } let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let turn_input: Vec = sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]); let prompt = Prompt { input: turn_input, store: !turn_context.disable_response_storage, tools: Vec::new(), base_instructions_override: Some(compact_instructions.clone()), }; let max_retries = turn_context.client.get_provider().stream_max_retries(); let mut retries = 0; loop { let attempt_result = drain_to_completed(&sess, turn_context, &sub_id, &prompt).await; match attempt_result { Ok(()) => break, Err(CodexErr::Interrupted) => return, Err(e) => { if retries < max_retries { retries += 1; let delay = backoff(retries); sess.notify_stream_error( &sub_id, format!( "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…" ), ) .await; tokio::time::sleep(delay).await; continue; } else { let event = Event { id: sub_id.clone(), msg: EventMsg::Error(ErrorEvent { message: e.to_string(), }), }; sess.send_event(event).await; return; } } } } sess.remove_task(&sub_id); let event = Event { id: sub_id.clone(), msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Compact task completed".to_string(), }), }; sess.send_event(event).await; let event = Event { id: sub_id.clone(), msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message: None, }), }; sess.send_event(event).await; let mut state = sess.state.lock_unchecked(); state.history.keep_last_messages(1); } async fn handle_response_item( sess: &Session, turn_context: &TurnContext, turn_diff_tracker: &mut TurnDiffTracker, sub_id: &str, item: ResponseItem, ) -> CodexResult> { debug!(?item, "Output item"); let output = match item { ResponseItem::Message { content, .. } => { for item in content { if let ContentItem::OutputText { text } = item { let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }), }; sess.tx_event.send(event).await.ok(); } } None } ResponseItem::Reasoning { id: _, summary, content, encrypted_content: _, } => { for item in summary { let text = match item { ReasoningItemReasoningSummary::SummaryText { text } => text, }; let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }), }; sess.tx_event.send(event).await.ok(); } if sess.show_raw_agent_reasoning && let Some(content) = content { for item in content { let text = match item { ReasoningItemContent::ReasoningText { text } => text, ReasoningItemContent::Text { text } => text, }; let event = Event { id: sub_id.to_string(), msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text, }), }; sess.tx_event.send(event).await.ok(); } } None } ResponseItem::FunctionCall { name, arguments, call_id, .. } => { info!("FunctionCall: {name}({arguments})"); Some( handle_function_call( sess, turn_context, turn_diff_tracker, sub_id.to_string(), name, arguments, call_id, ) .await, ) } ResponseItem::LocalShellCall { id, call_id, status: _, action, } => { let LocalShellAction::Exec(action) = action; tracing::info!("LocalShellCall: {action:?}"); let params = ShellToolCallParams { command: action.command, workdir: action.working_directory, timeout_ms: action.timeout_ms, with_escalated_permissions: None, justification: None, }; let effective_call_id = match (call_id, id) { (Some(call_id), _) => call_id, (None, Some(id)) => id, (None, None) => { error!("LocalShellCall without call_id or id"); return Ok(Some(ResponseInputItem::FunctionCallOutput { call_id: "".to_string(), output: FunctionCallOutputPayload { content: "LocalShellCall without call_id or id".to_string(), success: None, }, })); } }; let exec_params = to_exec_params(params, turn_context); Some( handle_container_exec_with_params( exec_params, sess, turn_context, turn_diff_tracker, sub_id.to_string(), effective_call_id, ) .await, ) } ResponseItem::CustomToolCall { id: _, call_id, name, input, status: _, } => Some( handle_custom_tool_call( sess, turn_context, turn_diff_tracker, sub_id.to_string(), name, input, call_id, ) .await, ), ResponseItem::FunctionCallOutput { .. } => { debug!("unexpected FunctionCallOutput from stream"); None } ResponseItem::CustomToolCallOutput { .. } => { debug!("unexpected CustomToolCallOutput from stream"); None } ResponseItem::Other => None, }; Ok(output) } async fn handle_function_call( sess: &Session, turn_context: &TurnContext, turn_diff_tracker: &mut TurnDiffTracker, sub_id: String, name: String, arguments: String, call_id: String, ) -> ResponseInputItem { match name.as_str() { "container.exec" | "shell" => { let params = match parse_container_exec_arguments(arguments, turn_context, &call_id) { Ok(params) => params, Err(output) => { return *output; } }; handle_container_exec_with_params( params, sess, turn_context, turn_diff_tracker, sub_id, call_id, ) .await } "apply_patch" => { let args = match serde_json::from_str::(&arguments) { Ok(a) => a, Err(e) => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("failed to parse function arguments: {e}"), success: None, }, }; } }; let exec_params = ExecParams { command: vec!["apply_patch".to_string(), args.input.clone()], cwd: turn_context.cwd.clone(), timeout_ms: None, env: HashMap::new(), with_escalated_permissions: None, justification: None, }; handle_container_exec_with_params( exec_params, sess, turn_context, turn_diff_tracker, sub_id, call_id, ) .await } "update_plan" => handle_update_plan(sess, arguments, sub_id, call_id).await, EXEC_COMMAND_TOOL_NAME => { // TODO(mbolin): Sandbox check. let exec_params = match serde_json::from_str::(&arguments) { Ok(params) => params, Err(e) => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("failed to parse function arguments: {e}"), success: Some(false), }, }; } }; let result = sess .session_manager .handle_exec_command_request(exec_params) .await; let function_call_output = crate::exec_command::result_into_payload(result); ResponseInputItem::FunctionCallOutput { call_id, output: function_call_output, } } WRITE_STDIN_TOOL_NAME => { let write_stdin_params = match serde_json::from_str::(&arguments) { Ok(params) => params, Err(e) => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("failed to parse function arguments: {e}"), success: Some(false), }, }; } }; let result = sess .session_manager .handle_write_stdin_request(write_stdin_params) .await; let function_call_output: FunctionCallOutputPayload = crate::exec_command::result_into_payload(result); ResponseInputItem::FunctionCallOutput { call_id, output: function_call_output, } } _ => { match sess.mcp_connection_manager.parse_tool_name(&name) { Some((server, tool_name)) => { // TODO(mbolin): Determine appropriate timeout for tool call. let timeout = None; handle_mcp_tool_call( sess, &sub_id, call_id, server, tool_name, arguments, timeout, ) .await } None => { // Unknown function: reply with structured failure so the model can adapt. ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("unsupported call: {name}"), success: None, }, } } } } } } async fn handle_custom_tool_call( sess: &Session, turn_context: &TurnContext, turn_diff_tracker: &mut TurnDiffTracker, sub_id: String, name: String, input: String, call_id: String, ) -> ResponseInputItem { info!("CustomToolCall: {name} {input}"); match name.as_str() { "apply_patch" => { let exec_params = ExecParams { command: vec!["apply_patch".to_string(), input.clone()], cwd: turn_context.cwd.clone(), timeout_ms: None, env: HashMap::new(), with_escalated_permissions: None, justification: None, }; let resp = handle_container_exec_with_params( exec_params, sess, turn_context, turn_diff_tracker, sub_id, call_id, ) .await; // Convert function-call style output into a custom tool call output match resp { ResponseInputItem::FunctionCallOutput { call_id, output } => { ResponseInputItem::CustomToolCallOutput { call_id, output: output.content, } } // Pass through if already a custom tool output or other variant other => other, } } _ => { debug!("unexpected CustomToolCall from stream"); ResponseInputItem::CustomToolCallOutput { call_id, output: format!("unsupported custom tool call: {name}"), } } } } fn to_exec_params(params: ShellToolCallParams, turn_context: &TurnContext) -> ExecParams { ExecParams { command: params.command, cwd: turn_context.resolve_path(params.workdir.clone()), timeout_ms: params.timeout_ms, env: create_env(&turn_context.shell_environment_policy), with_escalated_permissions: params.with_escalated_permissions, justification: params.justification, } } fn parse_container_exec_arguments( arguments: String, turn_context: &TurnContext, call_id: &str, ) -> Result> { // parse command match serde_json::from_str::(&arguments) { Ok(shell_tool_call_params) => Ok(to_exec_params(shell_tool_call_params, turn_context)), Err(e) => { // allow model to re-sample let output = ResponseInputItem::FunctionCallOutput { call_id: call_id.to_string(), output: FunctionCallOutputPayload { content: format!("failed to parse function arguments: {e}"), success: None, }, }; Err(Box::new(output)) } } } pub struct ExecInvokeArgs<'a> { pub params: ExecParams, pub sandbox_type: SandboxType, pub sandbox_policy: &'a SandboxPolicy, pub codex_linux_sandbox_exe: &'a Option, pub stdout_stream: Option, } fn maybe_translate_shell_command( params: ExecParams, sess: &Session, turn_context: &TurnContext, ) -> ExecParams { let should_translate = matches!(sess.user_shell, crate::shell::Shell::PowerShell(_)) || turn_context.shell_environment_policy.use_profile; if should_translate && let Some(command) = sess .user_shell .format_default_shell_invocation(params.command.clone()) { return ExecParams { command, ..params }; } params } async fn handle_container_exec_with_params( params: ExecParams, sess: &Session, turn_context: &TurnContext, turn_diff_tracker: &mut TurnDiffTracker, sub_id: String, call_id: String, ) -> ResponseInputItem { // check if this was a patch, and apply it if so let apply_patch_exec = match maybe_parse_apply_patch_verified(¶ms.command, ¶ms.cwd) { MaybeApplyPatchVerified::Body(changes) => { match apply_patch::apply_patch(sess, turn_context, &sub_id, &call_id, changes).await { InternalApplyPatchInvocation::Output(item) => return item, InternalApplyPatchInvocation::DelegateToExec(apply_patch_exec) => { Some(apply_patch_exec) } } } MaybeApplyPatchVerified::CorrectnessError(parse_error) => { // It looks like an invocation of `apply_patch`, but we // could not resolve it into a patch that would apply // cleanly. Return to model for resample. return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("error: {parse_error:#}"), success: None, }, }; } MaybeApplyPatchVerified::ShellParseError(error) => { trace!("Failed to parse shell command, {error:?}"); None } MaybeApplyPatchVerified::NotApplyPatch => None, }; let (params, safety, command_for_display) = match &apply_patch_exec { Some(ApplyPatchExec { action: ApplyPatchAction { patch, cwd, .. }, user_explicitly_approved_this_action, }) => { let path_to_codex = std::env::current_exe() .ok() .map(|p| p.to_string_lossy().to_string()); let Some(path_to_codex) = path_to_codex else { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: "failed to determine path to codex executable".to_string(), success: None, }, }; }; let params = ExecParams { command: vec![ path_to_codex, CODEX_APPLY_PATCH_ARG1.to_string(), patch.clone(), ], cwd: cwd.clone(), timeout_ms: params.timeout_ms, env: HashMap::new(), with_escalated_permissions: params.with_escalated_permissions, justification: params.justification.clone(), }; let safety = if *user_explicitly_approved_this_action { SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, } } else { assess_safety_for_untrusted_command( turn_context.approval_policy, &turn_context.sandbox_policy, params.with_escalated_permissions.unwrap_or(false), ) }; ( params, safety, vec!["apply_patch".to_string(), patch.clone()], ) } None => { let safety = { let state = sess.state.lock_unchecked(); assess_command_safety( ¶ms.command, turn_context.approval_policy, &turn_context.sandbox_policy, &state.approved_commands, params.with_escalated_permissions.unwrap_or(false), ) }; let command_for_display = params.command.clone(); (params, safety, command_for_display) } }; let sandbox_type = match safety { SafetyCheck::AutoApprove { sandbox_type } => sandbox_type, SafetyCheck::AskUser => { let rx_approve = sess .request_command_approval( sub_id.clone(), call_id.clone(), params.command.clone(), params.cwd.clone(), params.justification.clone(), ) .await; match rx_approve.await.unwrap_or_default() { ReviewDecision::Approved => (), ReviewDecision::ApprovedForSession => { sess.add_approved_command(params.command.clone()); } ReviewDecision::Denied | ReviewDecision::Abort => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: "exec command rejected by user".to_string(), success: None, }, }; } } // No sandboxing is applied because the user has given // explicit approval. Often, we end up in this case because // the command cannot be run in a sandbox, such as // installing a new dependency that requires network access. SandboxType::None } SafetyCheck::Reject { reason } => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!("exec command rejected: {reason}"), success: None, }, }; } }; let exec_command_context = ExecCommandContext { sub_id: sub_id.clone(), call_id: call_id.clone(), command_for_display: command_for_display.clone(), cwd: params.cwd.clone(), apply_patch: apply_patch_exec.map( |ApplyPatchExec { action, user_explicitly_approved_this_action, }| ApplyPatchCommandContext { user_explicitly_approved_this_action, changes: convert_apply_patch_to_protocol(&action), }, ), }; let params = maybe_translate_shell_command(params, sess, turn_context); let output_result = sess .run_exec_with_events( turn_diff_tracker, exec_command_context.clone(), ExecInvokeArgs { params: params.clone(), sandbox_type, sandbox_policy: &turn_context.sandbox_policy, codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe, stdout_stream: if exec_command_context.apply_patch.is_some() { None } else { Some(StdoutStream { sub_id: sub_id.clone(), call_id: call_id.clone(), tx_event: sess.tx_event.clone(), }) }, }, ) .await; match output_result { Ok(output) => { let ExecToolCallOutput { exit_code, .. } = &output; let is_success = *exit_code == 0; let content = format_exec_output(&output); ResponseInputItem::FunctionCallOutput { call_id: call_id.clone(), output: FunctionCallOutputPayload { content, success: Some(is_success), }, } } Err(CodexErr::Sandbox(error)) => { handle_sandbox_error( turn_diff_tracker, params, exec_command_context, error, sandbox_type, sess, turn_context, ) .await } Err(e) => ResponseInputItem::FunctionCallOutput { call_id: call_id.clone(), output: FunctionCallOutputPayload { content: format!("execution error: {e}"), success: None, }, }, } } async fn handle_sandbox_error( turn_diff_tracker: &mut TurnDiffTracker, params: ExecParams, exec_command_context: ExecCommandContext, error: SandboxErr, sandbox_type: SandboxType, sess: &Session, turn_context: &TurnContext, ) -> ResponseInputItem { let call_id = exec_command_context.call_id.clone(); let sub_id = exec_command_context.sub_id.clone(); let cwd = exec_command_context.cwd.clone(); // Early out if either the user never wants to be asked for approval, or // we're letting the model manage escalation requests. Otherwise, continue match turn_context.approval_policy { AskForApproval::Never | AskForApproval::OnRequest => { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!( "failed in sandbox {sandbox_type:?} with execution error: {error}" ), success: Some(false), }, }; } AskForApproval::UnlessTrusted | AskForApproval::OnFailure => (), } // similarly, if the command timed out, we can simply return this failure to the model if matches!(error, SandboxErr::Timeout) { return ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: format!( "command timed out after {} milliseconds", params.timeout_duration().as_millis() ), success: Some(false), }, }; } // Note that when `error` is `SandboxErr::Denied`, it could be a false // positive. That is, it may have exited with a non-zero exit code, not // because the sandbox denied it, but because that is its expected behavior, // i.e., a grep command that did not match anything. Ideally we would // include additional metadata on the command to indicate whether non-zero // exit codes merit a retry. // For now, we categorically ask the user to retry without sandbox and // emit the raw error as a background event. sess.notify_background_event(&sub_id, format!("Execution failed: {error}")) .await; let rx_approve = sess .request_command_approval( sub_id.clone(), call_id.clone(), params.command.clone(), cwd.clone(), Some("command failed; retry without sandbox?".to_string()), ) .await; match rx_approve.await.unwrap_or_default() { ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { // Persist this command as pre‑approved for the // remainder of the session so future // executions skip the sandbox directly. // TODO(ragona): Isn't this a bug? It always saves the command in an | fork? sess.add_approved_command(params.command.clone()); // Inform UI we are retrying without sandbox. sess.notify_background_event(&sub_id, "retrying command without sandbox") .await; // This is an escalated retry; the policy will not be // examined and the sandbox has been set to `None`. let retry_output_result = sess .run_exec_with_events( turn_diff_tracker, exec_command_context.clone(), ExecInvokeArgs { params, sandbox_type: SandboxType::None, sandbox_policy: &turn_context.sandbox_policy, codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe, stdout_stream: if exec_command_context.apply_patch.is_some() { None } else { Some(StdoutStream { sub_id: sub_id.clone(), call_id: call_id.clone(), tx_event: sess.tx_event.clone(), }) }, }, ) .await; match retry_output_result { Ok(retry_output) => { let ExecToolCallOutput { exit_code, .. } = &retry_output; let is_success = *exit_code == 0; let content = format_exec_output(&retry_output); ResponseInputItem::FunctionCallOutput { call_id: call_id.clone(), output: FunctionCallOutputPayload { content, success: Some(is_success), }, } } Err(e) => ResponseInputItem::FunctionCallOutput { call_id: call_id.clone(), output: FunctionCallOutputPayload { content: format!("retry failed: {e}"), success: None, }, }, } } ReviewDecision::Denied | ReviewDecision::Abort => { // Fall through to original failure handling. ResponseInputItem::FunctionCallOutput { call_id, output: FunctionCallOutputPayload { content: "exec command rejected by user".to_string(), success: None, }, } } } } fn format_exec_output_str(exec_output: &ExecToolCallOutput) -> String { let ExecToolCallOutput { aggregated_output, .. } = exec_output; // Head+tail truncation for the model: show the beginning and end with an elision. // Clients still receive full streams; only this formatted summary is capped. let s = aggregated_output.text.as_str(); let total_lines = s.lines().count(); if s.len() <= MODEL_FORMAT_MAX_BYTES && total_lines <= MODEL_FORMAT_MAX_LINES { return s.to_string(); } let lines: Vec<&str> = s.lines().collect(); let head_take = MODEL_FORMAT_HEAD_LINES.min(lines.len()); let tail_take = MODEL_FORMAT_TAIL_LINES.min(lines.len().saturating_sub(head_take)); let omitted = lines.len().saturating_sub(head_take + tail_take); // Join head and tail blocks (lines() strips newlines; reinsert them) let head_block = lines .iter() .take(head_take) .cloned() .collect::>() .join("\n"); let tail_block = if tail_take > 0 { lines[lines.len() - tail_take..].join("\n") } else { String::new() }; let marker = format!("\n[... omitted {omitted} of {total_lines} lines ...]\n\n"); // Byte budgets for head/tail around the marker let mut head_budget = MODEL_FORMAT_HEAD_BYTES.min(MODEL_FORMAT_MAX_BYTES); let tail_budget = MODEL_FORMAT_MAX_BYTES.saturating_sub(head_budget + marker.len()); if tail_budget == 0 && marker.len() >= MODEL_FORMAT_MAX_BYTES { // Degenerate case: marker alone exceeds budget; return a clipped marker return take_bytes_at_char_boundary(&marker, MODEL_FORMAT_MAX_BYTES).to_string(); } if tail_budget == 0 { // Make room for the marker by shrinking head head_budget = MODEL_FORMAT_MAX_BYTES.saturating_sub(marker.len()); } // Enforce line-count cap by trimming head/tail lines let head_lines_text = head_block; let tail_lines_text = tail_block; // Build final string respecting byte budgets let head_part = take_bytes_at_char_boundary(&head_lines_text, head_budget); let mut result = String::with_capacity(MODEL_FORMAT_MAX_BYTES.min(s.len())); result.push_str(head_part); result.push_str(&marker); let remaining = MODEL_FORMAT_MAX_BYTES.saturating_sub(result.len()); let tail_budget_final = remaining; let tail_part = take_last_bytes_at_char_boundary(&tail_lines_text, tail_budget_final); result.push_str(tail_part); result } // Truncate a &str to a byte budget at a char boundary (prefix) #[inline] fn take_bytes_at_char_boundary(s: &str, maxb: usize) -> &str { if s.len() <= maxb { return s; } let mut last_ok = 0; for (i, ch) in s.char_indices() { let nb = i + ch.len_utf8(); if nb > maxb { break; } last_ok = nb; } &s[..last_ok] } // Take a suffix of a &str within a byte budget at a char boundary #[inline] fn take_last_bytes_at_char_boundary(s: &str, maxb: usize) -> &str { if s.len() <= maxb { return s; } let mut start = s.len(); let mut used = 0usize; for (i, ch) in s.char_indices().rev() { let nb = ch.len_utf8(); if used + nb > maxb { break; } start = i; used += nb; if start == 0 { break; } } &s[start..] } /// Exec output is a pre-serialized JSON payload fn format_exec_output(exec_output: &ExecToolCallOutput) -> String { let ExecToolCallOutput { exit_code, duration, .. } = exec_output; #[derive(Serialize)] struct ExecMetadata { exit_code: i32, duration_seconds: f32, } #[derive(Serialize)] struct ExecOutput<'a> { output: &'a str, metadata: ExecMetadata, } // round to 1 decimal place let duration_seconds = ((duration.as_secs_f32()) * 10.0).round() / 10.0; let formatted_output = format_exec_output_str(exec_output); let payload = ExecOutput { output: &formatted_output, metadata: ExecMetadata { exit_code: *exit_code, duration_seconds, }, }; #[expect(clippy::expect_used)] serde_json::to_string(&payload).expect("serialize ExecOutput") } fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option { responses.iter().rev().find_map(|item| { if let ResponseItem::Message { role, content, .. } = item { if role == "assistant" { content.iter().rev().find_map(|ci| { if let ContentItem::OutputText { text } = ci { Some(text.clone()) } else { None } }) } else { None } } else { None } }) } async fn drain_to_completed( sess: &Session, turn_context: &TurnContext, sub_id: &str, prompt: &Prompt, ) -> CodexResult<()> { let mut stream = turn_context.client.clone().stream(prompt).await?; loop { let maybe_event = stream.next().await; let Some(event) = maybe_event else { return Err(CodexErr::Stream( "stream closed before response.completed".into(), None, )); }; match event { Ok(ResponseEvent::OutputItemDone(item)) => { // Record only to in-memory conversation history; avoid state snapshot. let mut state = sess.state.lock_unchecked(); state.history.record_items(std::slice::from_ref(&item)); } Ok(ResponseEvent::Completed { response_id: _, token_usage, }) => { // some providers don't return token usage, so we default // TODO: consider approximate token usage let token_usage = token_usage.unwrap_or_default(); sess.tx_event .send(Event { id: sub_id.to_string(), msg: EventMsg::TokenCount(token_usage), }) .await .ok(); return Ok(()); } Ok(_) => continue, Err(e) => return Err(e), } } } fn convert_call_tool_result_to_function_call_output_payload( call_tool_result: &CallToolResult, ) -> FunctionCallOutputPayload { let CallToolResult { content, is_error, structured_content, } = call_tool_result; // In terms of what to send back to the model, we prefer structured_content, // if available, and fallback to content, otherwise. let mut is_success = is_error != &Some(true); let content = if let Some(structured_content) = structured_content && structured_content != &serde_json::Value::Null && let Ok(serialized_structured_content) = serde_json::to_string(&structured_content) { serialized_structured_content } else { match serde_json::to_string(&content) { Ok(serialized_content) => serialized_content, Err(err) => { // If we could not serialize either content or structured_content to // JSON, flag this as an error. is_success = false; err.to_string() } } }; FunctionCallOutputPayload { content, success: Some(is_success), } } #[cfg(test)] mod tests { use super::*; use mcp_types::ContentBlock; use mcp_types::TextContent; use pretty_assertions::assert_eq; use serde_json::json; use std::time::Duration as StdDuration; fn text_block(s: &str) -> ContentBlock { ContentBlock::TextContent(TextContent { annotations: None, text: s.to_string(), r#type: "text".to_string(), }) } #[test] fn prefers_structured_content_when_present() { let ctr = CallToolResult { // Content present but should be ignored because structured_content is set. content: vec![text_block("ignored")], is_error: None, structured_content: Some(json!({ "ok": true, "value": 42 })), }; let got = convert_call_tool_result_to_function_call_output_payload(&ctr); let expected = FunctionCallOutputPayload { content: serde_json::to_string(&json!({ "ok": true, "value": 42 })) .unwrap(), success: Some(true), }; assert_eq!(expected, got); } #[test] fn model_truncation_head_tail_by_lines() { // Build 400 short lines so line-count limit, not byte budget, triggers truncation let lines: Vec = (1..=400).map(|i| format!("line{i}")).collect(); let full = lines.join("\n"); let exec = ExecToolCallOutput { exit_code: 0, stdout: StreamOutput::new(String::new()), stderr: StreamOutput::new(String::new()), aggregated_output: StreamOutput::new(full.clone()), duration: StdDuration::from_secs(1), }; let out = format_exec_output_str(&exec); // Expect elision marker with correct counts let omitted = 400 - MODEL_FORMAT_MAX_LINES; // 144 let marker = format!("\n[... omitted {omitted} of 400 lines ...]\n\n"); assert!(out.contains(&marker), "missing marker: {out}"); // Validate head and tail let parts: Vec<&str> = out.split(&marker).collect(); assert_eq!(parts.len(), 2, "expected one marker split"); let head = parts[0]; let tail = parts[1]; let expected_head: String = (1..=MODEL_FORMAT_HEAD_LINES) .map(|i| format!("line{i}")) .collect::>() .join("\n"); assert!(head.starts_with(&expected_head), "head mismatch"); let expected_tail: String = ((400 - MODEL_FORMAT_TAIL_LINES + 1)..=400) .map(|i| format!("line{i}")) .collect::>() .join("\n"); assert!(tail.ends_with(&expected_tail), "tail mismatch"); } #[test] fn model_truncation_respects_byte_budget() { // Construct a large output (about 100kB) so byte budget dominates let big_line = "x".repeat(100); let full = std::iter::repeat_n(big_line.clone(), 1000) .collect::>() .join("\n"); let exec = ExecToolCallOutput { exit_code: 0, stdout: StreamOutput::new(String::new()), stderr: StreamOutput::new(String::new()), aggregated_output: StreamOutput::new(full.clone()), duration: StdDuration::from_secs(1), }; let out = format_exec_output_str(&exec); assert!(out.len() <= MODEL_FORMAT_MAX_BYTES, "exceeds byte budget"); assert!(out.contains("omitted"), "should contain elision marker"); // Ensure head and tail are drawn from the original assert!(full.starts_with(out.chars().take(8).collect::().as_str())); assert!( full.ends_with( out.chars() .rev() .take(8) .collect::() .chars() .rev() .collect::() .as_str() ) ); } #[test] fn falls_back_to_content_when_structured_is_null() { let ctr = CallToolResult { content: vec![text_block("hello"), text_block("world")], is_error: None, structured_content: Some(serde_json::Value::Null), }; let got = convert_call_tool_result_to_function_call_output_payload(&ctr); let expected = FunctionCallOutputPayload { content: serde_json::to_string(&vec![text_block("hello"), text_block("world")]) .unwrap(), success: Some(true), }; assert_eq!(expected, got); } #[test] fn success_flag_reflects_is_error_true() { let ctr = CallToolResult { content: vec![text_block("unused")], is_error: Some(true), structured_content: Some(json!({ "message": "bad" })), }; let got = convert_call_tool_result_to_function_call_output_payload(&ctr); let expected = FunctionCallOutputPayload { content: serde_json::to_string(&json!({ "message": "bad" })).unwrap(), success: Some(false), }; assert_eq!(expected, got); } #[test] fn success_flag_true_with_no_error_and_content_used() { let ctr = CallToolResult { content: vec![text_block("alpha")], is_error: Some(false), structured_content: None, }; let got = convert_call_tool_result_to_function_call_output_payload(&ctr); let expected = FunctionCallOutputPayload { content: serde_json::to_string(&vec![text_block("alpha")]).unwrap(), success: Some(true), }; assert_eq!(expected, got); } }