Files
llmx/codex-rs/core/src/codex.rs
Ahmed Ibrahim 431a10fc50 chore: unify history loading (#2736)
We have two ways of loading conversation with a previous history. Fork
conversation and the experimental resume that we had before. In this PR,
I am unifying their code path. The path is getting the history items and
recording them in a brand new conversation. This PR also constraint the
rollout recorder responsibilities to be only recording to the disk and
loading from the disk.

The PR also fixes a current bug when we have two forking in a row:
History 1:
<Environment Context>
UserMessage_1
UserMessage_2
UserMessage_3

**Fork with n = 1 (only remove one element)**
History 2:
<Environment Context>
UserMessage_1
UserMessage_2
<Environment Context>

**Fork with n = 1 (only remove one element)**
History 2:
<Environment Context>
UserMessage_1
UserMessage_2
**<Environment Context>**

This shouldn't happen but because we were appending the `<Environment
Context>` after each spawning and it's considered as _user message_.
Now, we don't add this message if restoring and old conversation.
2025-09-02 22:44:29 +00:00

3095 lines
112 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::MaybeApplyPatchVerified;
use codex_apply_patch::maybe_parse_apply_patch_verified;
use codex_login::AuthManager;
use codex_protocol::protocol::ConversationHistoryResponseEvent;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use futures::prelude::*;
use mcp_types::CallToolResult;
use serde::Serialize;
use serde_json;
use tokio::sync::oneshot;
use tokio::task::AbortHandle;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::trace;
use tracing::warn;
use uuid::Uuid;
use crate::ModelProviderInfo;
use crate::apply_patch;
use crate::apply_patch::ApplyPatchExec;
use crate::apply_patch::CODEX_APPLY_PATCH_ARG1;
use crate::apply_patch::InternalApplyPatchInvocation;
use crate::apply_patch::convert_apply_patch_to_protocol;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::config::Config;
use crate::config_types::ShellEnvironmentPolicy;
use crate::conversation_history::ConversationHistory;
use crate::conversation_manager::InitialHistory;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::error::SandboxErr;
use crate::error::get_error_message_ui;
use crate::exec::ExecParams;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::StreamOutput;
use crate::exec::process_exec_tool_call;
use crate::exec_command::EXEC_COMMAND_TOOL_NAME;
use crate::exec_command::ExecCommandParams;
use crate::exec_command::ExecSessionManager;
use crate::exec_command::WRITE_STDIN_TOOL_NAME;
use crate::exec_command::WriteStdinParams;
use crate::exec_env::create_env;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_tool_call::handle_mcp_tool_call;
use crate::model_family::find_family_for_model;
use crate::openai_model_info::get_model_info;
use crate::openai_tools::ApplyPatchToolArgs;
use crate::openai_tools::ToolsConfig;
use crate::openai_tools::ToolsConfigParams;
use crate::openai_tools::get_openai_tools;
use crate::parse_command::parse_command;
use crate::plan_tool::handle_update_plan;
use crate::project_doc::get_user_instructions;
use crate::protocol::AgentMessageDeltaEvent;
use crate::protocol::AgentMessageEvent;
use crate::protocol::AgentReasoningDeltaEvent;
use crate::protocol::AgentReasoningEvent;
use crate::protocol::AgentReasoningRawContentDeltaEvent;
use crate::protocol::AgentReasoningRawContentEvent;
use crate::protocol::AgentReasoningSectionBreakEvent;
use crate::protocol::ApplyPatchApprovalRequestEvent;
use crate::protocol::AskForApproval;
use crate::protocol::BackgroundEventEvent;
use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecApprovalRequestEvent;
use crate::protocol::ExecCommandBeginEvent;
use crate::protocol::ExecCommandEndEvent;
use crate::protocol::FileChange;
use crate::protocol::InputItem;
use crate::protocol::ListCustomPromptsResponseEvent;
use crate::protocol::Op;
use crate::protocol::PatchApplyBeginEvent;
use crate::protocol::PatchApplyEndEvent;
use crate::protocol::ReviewDecision;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent;
use crate::protocol::StreamErrorEvent;
use crate::protocol::Submission;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnDiffEvent;
use crate::protocol::WebSearchBeginEvent;
use crate::protocol::WebSearchEndEvent;
use crate::rollout::RolloutRecorder;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
use crate::shell;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::user_notification::UserNotification;
use crate::util::backoff;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::custom_prompts::CustomPrompt;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::LocalShellAction;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::ShellToolCallParams;
use codex_protocol::models::WebSearchAction;
// A convenience extension trait for acquiring mutex locks where poisoning is
// unrecoverable and should abort the program. This avoids scattered `.unwrap()`
// calls on `lock()` while still surfacing a clear panic message when a lock is
// poisoned.
trait MutexExt<T> {
fn lock_unchecked(&self) -> MutexGuard<'_, T>;
}
impl<T> MutexExt<T> for Mutex<T> {
fn lock_unchecked(&self) -> MutexGuard<'_, T> {
#[expect(clippy::expect_used)]
self.lock().expect("poisoned lock")
}
}
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
pub struct Codex {
next_id: AtomicU64,
tx_sub: Sender<Submission>,
rx_event: Receiver<Event>,
}
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
/// the submission id for the initial `ConfigureSession` request and the
/// unique session id.
pub struct CodexSpawnOk {
pub codex: Codex,
pub session_id: Uuid,
}
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 64;
// Model-formatting limits: clients get full streams; oonly content sent to the model is truncated.
pub(crate) const MODEL_FORMAT_MAX_BYTES: usize = 10 * 1024; // 10 KiB
pub(crate) const MODEL_FORMAT_MAX_LINES: usize = 256; // lines
pub(crate) const MODEL_FORMAT_HEAD_LINES: usize = MODEL_FORMAT_MAX_LINES / 2;
pub(crate) const MODEL_FORMAT_TAIL_LINES: usize = MODEL_FORMAT_MAX_LINES - MODEL_FORMAT_HEAD_LINES; // 128
pub(crate) const MODEL_FORMAT_HEAD_BYTES: usize = MODEL_FORMAT_MAX_BYTES / 2;
impl Codex {
/// Spawn a new [`Codex`] and initialize the session.
pub async fn spawn(
config: Config,
auth_manager: Arc<AuthManager>,
conversation_history: InitialHistory,
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
let user_instructions = get_user_instructions(&config).await;
let config = Arc::new(config);
let configure_session = ConfigureSession {
provider: config.model_provider.clone(),
model: config.model.clone(),
model_reasoning_effort: config.model_reasoning_effort,
model_reasoning_summary: config.model_reasoning_summary,
user_instructions,
base_instructions: config.base_instructions.clone(),
approval_policy: config.approval_policy,
sandbox_policy: config.sandbox_policy.clone(),
disable_response_storage: config.disable_response_storage,
notify: config.notify.clone(),
cwd: config.cwd.clone(),
};
// Generate a unique ID for the lifetime of this Codex session.
let (session, turn_context) = Session::new(
configure_session,
config.clone(),
auth_manager.clone(),
tx_event.clone(),
)
.await
.map_err(|e| {
error!("Failed to create session: {e:#}");
CodexErr::InternalAgentDied
})?;
session
.record_initial_history(&turn_context, conversation_history)
.await;
let session_id = session.session_id;
// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(
session.clone(),
turn_context,
config,
rx_sub,
));
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event,
};
Ok(CodexSpawnOk { codex, session_id })
}
/// Submit the `op` wrapped in a `Submission` with a unique ID.
pub async fn submit(&self, op: Op) -> CodexResult<String> {
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
.to_string();
let sub = Submission { id: id.clone(), op };
self.submit_with_id(sub).await?;
Ok(id)
}
/// Use sparingly: prefer `submit()` so Codex is responsible for generating
/// unique IDs for each submission.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
self.tx_sub
.send(sub)
.await
.map_err(|_| CodexErr::InternalAgentDied)?;
Ok(())
}
pub async fn next_event(&self) -> CodexResult<Event> {
let event = self
.rx_event
.recv()
.await
.map_err(|_| CodexErr::InternalAgentDied)?;
Ok(event)
}
}
/// Mutable state of the agent
#[derive(Default)]
struct State {
approved_commands: HashSet<Vec<String>>,
current_task: Option<AgentTask>,
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
history: ConversationHistory,
}
/// Context for an initialized model agent
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
pub(crate) struct Session {
session_id: Uuid,
tx_event: Sender<Event>,
/// Manager for external MCP servers/tools.
mcp_connection_manager: McpConnectionManager,
session_manager: ExecSessionManager,
/// External notifier command (will be passed as args to exec()). When
/// `None` this feature is disabled.
notify: Option<Vec<String>>,
/// Optional rollout recorder for persisting the conversation transcript so
/// sessions can be replayed or inspected later.
rollout: Mutex<Option<RolloutRecorder>>,
state: Mutex<State>,
codex_linux_sandbox_exe: Option<PathBuf>,
user_shell: shell::Shell,
show_raw_agent_reasoning: bool,
}
/// The context needed for a single turn of the conversation.
#[derive(Debug)]
pub(crate) struct TurnContext {
pub(crate) client: ModelClient,
/// The session's current working directory. All relative paths provided by
/// the model as well as sandbox policies are resolved against this path
/// instead of `std::env::current_dir()`.
pub(crate) cwd: PathBuf,
pub(crate) base_instructions: Option<String>,
pub(crate) user_instructions: Option<String>,
pub(crate) approval_policy: AskForApproval,
pub(crate) sandbox_policy: SandboxPolicy,
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
pub(crate) disable_response_storage: bool,
pub(crate) tools_config: ToolsConfig,
}
impl TurnContext {
fn resolve_path(&self, path: Option<String>) -> PathBuf {
path.as_ref()
.map(PathBuf::from)
.map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
}
}
/// Configure the model session.
struct ConfigureSession {
/// Provider identifier ("openai", "openrouter", ...).
provider: ModelProviderInfo,
/// If not specified, server will use its default model.
model: String,
model_reasoning_effort: ReasoningEffortConfig,
model_reasoning_summary: ReasoningSummaryConfig,
/// Model instructions that are appended to the base instructions.
user_instructions: Option<String>,
/// Base instructions override.
base_instructions: Option<String>,
/// When to escalate for approval for execution
approval_policy: AskForApproval,
/// How to sandbox commands executed in the system
sandbox_policy: SandboxPolicy,
/// Disable server-side response storage (send full context each request)
disable_response_storage: bool,
/// Optional external notifier command tokens. Present only when the
/// client wants the agent to spawn a program after each completed
/// turn.
notify: Option<Vec<String>>,
/// Working directory that should be treated as the *root* of the
/// session. All relative paths supplied by the model as well as the
/// execution sandbox are resolved against this directory **instead**
/// of the process-wide current working directory. CLI front-ends are
/// expected to expand this to an absolute path before sending the
/// `ConfigureSession` operation so that the business-logic layer can
/// operate deterministically.
cwd: PathBuf,
}
impl Session {
async fn new(
configure_session: ConfigureSession,
config: Arc<Config>,
auth_manager: Arc<AuthManager>,
tx_event: Sender<Event>,
) -> anyhow::Result<(Arc<Self>, TurnContext)> {
let session_id = Uuid::new_v4();
let ConfigureSession {
provider,
model,
model_reasoning_effort,
model_reasoning_summary,
user_instructions,
base_instructions,
approval_policy,
sandbox_policy,
disable_response_storage,
notify,
cwd,
} = configure_session;
debug!("Configuring session: model={model}; provider={provider:?}");
if !cwd.is_absolute() {
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
}
// Error messages to dispatch after SessionConfigured is sent.
let mut post_session_configured_error_events = Vec::<Event>::new();
// Kick off independent async setup tasks in parallel to reduce startup latency.
//
// - initialize RolloutRecorder with new or resumed session info
// - spin up MCP connection manager
// - perform default shell discovery
// - load history metadata
let rollout_fut = RolloutRecorder::new(&config, session_id, user_instructions.clone());
let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
let default_shell_fut = shell::default_user_shell();
let history_meta_fut = crate::message_history::history_metadata(&config);
// Join all independent futures.
let (rollout_recorder, mcp_res, default_shell, (history_log_id, history_entry_count)) =
tokio::join!(rollout_fut, mcp_fut, default_shell_fut, history_meta_fut);
let rollout_recorder = rollout_recorder.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
anyhow::anyhow!("failed to initialize rollout recorder: {e:#}")
})?;
// Create the mutable state for the Session.
let state = State {
history: ConversationHistory::new(),
..Default::default()
};
// Handle MCP manager result and record any startup failures.
let (mcp_connection_manager, failed_clients) = match mcp_res {
Ok((mgr, failures)) => (mgr, failures),
Err(e) => {
let message = format!("Failed to create MCP connection manager: {e:#}");
error!("{message}");
post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent { message }),
});
(McpConnectionManager::default(), Default::default())
}
};
// Surface individual client start-up failures to the user.
if !failed_clients.is_empty() {
for (server_name, err) in failed_clients {
let message = format!("MCP client for `{server_name}` failed to start: {err:#}");
error!("{message}");
post_session_configured_error_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Error(ErrorEvent { message }),
});
}
}
// Now that `session_id` is final (may have been updated by resume),
// construct the model client.
let client = ModelClient::new(
config.clone(),
Some(auth_manager.clone()),
provider.clone(),
model_reasoning_effort,
model_reasoning_summary,
session_id,
);
let turn_context = TurnContext {
client,
tools_config: ToolsConfig::new(&ToolsConfigParams {
model_family: &config.model_family,
approval_policy,
sandbox_policy: sandbox_policy.clone(),
include_plan_tool: config.include_plan_tool,
include_apply_patch_tool: config.include_apply_patch_tool,
include_web_search_request: config.tools_web_search_request,
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
include_view_image_tool: config.include_view_image_tool,
}),
user_instructions,
base_instructions,
approval_policy,
sandbox_policy,
shell_environment_policy: config.shell_environment_policy.clone(),
cwd,
disable_response_storage,
};
let sess = Arc::new(Session {
session_id,
tx_event: tx_event.clone(),
mcp_connection_manager,
session_manager: ExecSessionManager::default(),
notify,
state: Mutex::new(state),
rollout: Mutex::new(Some(rollout_recorder)),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
user_shell: default_shell,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
});
// Dispatch the SessionConfiguredEvent first and then report any errors.
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id,
model,
history_log_id,
history_entry_count,
}),
})
.chain(post_session_configured_error_events.into_iter());
for event in events {
if let Err(e) = tx_event.send(event).await {
error!("failed to send event: {e:?}");
}
}
Ok((sess, turn_context))
}
pub fn set_task(&self, task: AgentTask) {
let mut state = self.state.lock_unchecked();
if let Some(current_task) = state.current_task.take() {
current_task.abort(TurnAbortReason::Replaced);
}
state.current_task = Some(task);
}
pub fn remove_task(&self, sub_id: &str) {
let mut state = self.state.lock_unchecked();
if let Some(task) = &state.current_task
&& task.sub_id == sub_id
{
state.current_task.take();
}
}
async fn record_initial_history(
&self,
turn_context: &TurnContext,
conversation_history: InitialHistory,
) {
match conversation_history {
InitialHistory::New => {
self.record_initial_history_new(turn_context).await;
}
InitialHistory::Resumed(items) => {
self.record_initial_history_resumed(items).await;
}
}
}
async fn record_initial_history_new(&self, turn_context: &TurnContext) {
// record the initial user instructions and environment context,
// regardless of whether we restored items.
// TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages.
let mut conversation_items = Vec::<ResponseItem>::with_capacity(2);
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
conversation_items.push(Prompt::format_user_instructions_message(user_instructions));
}
conversation_items.push(ResponseItem::from(EnvironmentContext::new(
Some(turn_context.cwd.clone()),
Some(turn_context.approval_policy),
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
)));
self.record_conversation_items(&conversation_items).await;
}
async fn record_initial_history_resumed(&self, items: Vec<ResponseItem>) {
self.record_conversation_items(&items).await;
}
/// Sends the given event to the client and swallows the send event, if
/// any, logging it as an error.
pub(crate) async fn send_event(&self, event: Event) {
if let Err(e) = self.tx_event.send(event).await {
error!("failed to send tool call event: {e}");
}
}
pub async fn request_command_approval(
&self,
sub_id: String,
call_id: String,
command: Vec<String>,
cwd: PathBuf,
reason: Option<String>,
) -> oneshot::Receiver<ReviewDecision> {
let (tx_approve, rx_approve) = oneshot::channel();
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id,
command,
cwd,
reason,
}),
};
let _ = self.tx_event.send(event).await;
{
let mut state = self.state.lock_unchecked();
state.pending_approvals.insert(sub_id, tx_approve);
}
rx_approve
}
pub async fn request_patch_approval(
&self,
sub_id: String,
call_id: String,
action: &ApplyPatchAction,
reason: Option<String>,
grant_root: Option<PathBuf>,
) -> oneshot::Receiver<ReviewDecision> {
let (tx_approve, rx_approve) = oneshot::channel();
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
changes: convert_apply_patch_to_protocol(action),
reason,
grant_root,
}),
};
let _ = self.tx_event.send(event).await;
{
let mut state = self.state.lock_unchecked();
state.pending_approvals.insert(sub_id, tx_approve);
}
rx_approve
}
pub fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
let entry = {
let mut state = self.state.lock_unchecked();
state.pending_approvals.remove(sub_id)
};
match entry {
Some(tx_approve) => {
tx_approve.send(decision).ok();
}
None => {
warn!("No pending approval found for sub_id: {sub_id}");
}
}
}
pub fn add_approved_command(&self, cmd: Vec<String>) {
let mut state = self.state.lock_unchecked();
state.approved_commands.insert(cmd);
}
/// Records items to both the rollout and the chat completions/ZDR
/// transcript, if enabled.
async fn record_conversation_items(&self, items: &[ResponseItem]) {
debug!("Recording items for conversation: {items:?}");
self.record_state_snapshot(items).await;
self.state.lock_unchecked().history.record_items(items);
}
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
let snapshot = { crate::rollout::SessionStateSnapshot {} };
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder {
if let Err(e) = rec.record_state(snapshot).await {
error!("failed to record rollout state: {e:#}");
}
if let Err(e) = rec.record_items(items).await {
error!("failed to record rollout items: {e:#}");
}
}
}
async fn on_exec_command_begin(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
exec_command_context: ExecCommandContext,
) {
let ExecCommandContext {
sub_id,
call_id,
command_for_display,
cwd,
apply_patch,
} = exec_command_context;
let msg = match apply_patch {
Some(ApplyPatchCommandContext {
user_explicitly_approved_this_action,
changes,
}) => {
turn_diff_tracker.on_patch_begin(&changes);
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id,
auto_approved: !user_explicitly_approved_this_action,
changes,
})
}
None => EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
command: command_for_display.clone(),
cwd,
parsed_cmd: parse_command(&command_for_display)
.into_iter()
.map(Into::into)
.collect(),
}),
};
let event = Event {
id: sub_id.to_string(),
msg,
};
let _ = self.tx_event.send(event).await;
}
async fn on_exec_command_end(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: &str,
call_id: &str,
output: &ExecToolCallOutput,
is_apply_patch: bool,
) {
let ExecToolCallOutput {
stdout,
stderr,
aggregated_output,
duration,
exit_code,
} = output;
// Send full stdout/stderr to clients; do not truncate.
let stdout = stdout.text.clone();
let stderr = stderr.text.clone();
let formatted_output = format_exec_output_str(output);
let aggregated_output: String = aggregated_output.text.clone();
let msg = if is_apply_patch {
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: call_id.to_string(),
stdout,
stderr,
success: *exit_code == 0,
})
} else {
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: call_id.to_string(),
stdout,
stderr,
aggregated_output,
exit_code: *exit_code,
duration: *duration,
formatted_output,
})
};
let event = Event {
id: sub_id.to_string(),
msg,
};
let _ = self.tx_event.send(event).await;
// If this is an apply_patch, after we emit the end patch, emit a second event
// with the full turn diff if there is one.
if is_apply_patch {
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
let event = Event {
id: sub_id.into(),
msg,
};
let _ = self.tx_event.send(event).await;
}
}
}
/// Runs the exec tool call and emits events for the begin and end of the
/// command even on error.
///
/// Returns the output of the exec tool call.
async fn run_exec_with_events<'a>(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
begin_ctx: ExecCommandContext,
exec_args: ExecInvokeArgs<'a>,
) -> crate::error::Result<ExecToolCallOutput> {
let is_apply_patch = begin_ctx.apply_patch.is_some();
let sub_id = begin_ctx.sub_id.clone();
let call_id = begin_ctx.call_id.clone();
self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone())
.await;
let result = process_exec_tool_call(
exec_args.params,
exec_args.sandbox_type,
exec_args.sandbox_policy,
exec_args.codex_linux_sandbox_exe,
exec_args.stdout_stream,
)
.await;
let output_stderr;
let borrowed: &ExecToolCallOutput = match &result {
Ok(output) => output,
Err(e) => {
output_stderr = ExecToolCallOutput {
exit_code: -1,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(get_error_message_ui(e)),
aggregated_output: StreamOutput::new(get_error_message_ui(e)),
duration: Duration::default(),
};
&output_stderr
}
};
self.on_exec_command_end(
turn_diff_tracker,
&sub_id,
&call_id,
borrowed,
is_apply_patch,
)
.await;
result
}
/// Helper that emits a BackgroundEvent with the given message. This keeps
/// the callsites terse so adding more diagnostics does not clutter the
/// core agent logic.
async fn notify_background_event(&self, sub_id: &str, message: impl Into<String>) {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: message.into(),
}),
};
let _ = self.tx_event.send(event).await;
}
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::StreamError(StreamErrorEvent {
message: message.into(),
}),
};
let _ = self.tx_event.send(event).await;
}
/// Build the full turn input by concatenating the current conversation
/// history with additional items for this turn.
pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
[self.state.lock_unchecked().history.contents(), extra].concat()
}
/// Returns the input if there was no task running to inject into
pub fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
let mut state = self.state.lock_unchecked();
if state.current_task.is_some() {
state.pending_input.push(input.into());
Ok(())
} else {
Err(input)
}
}
pub fn get_pending_input(&self) -> Vec<ResponseInputItem> {
let mut state = self.state.lock_unchecked();
if state.pending_input.is_empty() {
Vec::with_capacity(0)
} else {
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut state.pending_input);
ret
}
}
pub async fn call_tool(
&self,
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
timeout: Option<Duration>,
) -> anyhow::Result<CallToolResult> {
self.mcp_connection_manager
.call_tool(server, tool, arguments, timeout)
.await
}
fn interrupt_task(&self) {
info!("interrupt received: abort current task, if any");
let mut state = self.state.lock_unchecked();
state.pending_approvals.clear();
state.pending_input.clear();
if let Some(task) = state.current_task.take() {
task.abort(TurnAbortReason::Interrupted);
}
}
/// Spawn the configured notifier (if any) with the given JSON payload as
/// the last argument. Failures are logged but otherwise ignored so that
/// notification issues do not interfere with the main workflow.
fn maybe_notify(&self, notification: UserNotification) {
let Some(notify_command) = &self.notify else {
return;
};
if notify_command.is_empty() {
return;
}
let Ok(json) = serde_json::to_string(&notification) else {
error!("failed to serialise notification payload");
return;
};
let mut command = std::process::Command::new(&notify_command[0]);
if notify_command.len() > 1 {
command.args(&notify_command[1..]);
}
command.arg(json);
// Fire-and-forget we do not wait for completion.
if let Err(e) = command.spawn() {
warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
}
}
}
impl Drop for Session {
fn drop(&mut self) {
self.interrupt_task();
}
}
#[derive(Clone, Debug)]
pub(crate) struct ExecCommandContext {
pub(crate) sub_id: String,
pub(crate) call_id: String,
pub(crate) command_for_display: Vec<String>,
pub(crate) cwd: PathBuf,
pub(crate) apply_patch: Option<ApplyPatchCommandContext>,
}
#[derive(Clone, Debug)]
pub(crate) struct ApplyPatchCommandContext {
pub(crate) user_explicitly_approved_this_action: bool,
pub(crate) changes: HashMap<PathBuf, FileChange>,
}
/// A series of Turns in response to user input.
pub(crate) struct AgentTask {
sess: Arc<Session>,
sub_id: String,
handle: AbortHandle,
}
impl AgentTask {
fn spawn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
) -> Self {
let handle = {
let sess = sess.clone();
let sub_id = sub_id.clone();
let tc = Arc::clone(&turn_context);
tokio::spawn(async move { run_task(sess, tc.as_ref(), sub_id, input).await })
.abort_handle()
};
Self {
sess,
sub_id,
handle,
}
}
fn compact(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
compact_instructions: String,
) -> Self {
let handle = {
let sess = sess.clone();
let sub_id = sub_id.clone();
let tc = Arc::clone(&turn_context);
tokio::spawn(async move {
run_compact_task(sess, tc.as_ref(), sub_id, input, compact_instructions).await
})
.abort_handle()
};
Self {
sess,
sub_id,
handle,
}
}
fn abort(self, reason: TurnAbortReason) {
// TOCTOU?
if !self.handle.is_finished() {
self.handle.abort();
let event = Event {
id: self.sub_id,
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
};
let tx_event = self.sess.tx_event.clone();
tokio::spawn(async move {
tx_event.send(event).await.ok();
});
}
}
}
async fn submission_loop(
sess: Arc<Session>,
turn_context: TurnContext,
config: Arc<Config>,
rx_sub: Receiver<Submission>,
) {
// Wrap once to avoid cloning TurnContext for each task.
let mut turn_context = Arc::new(turn_context);
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
debug!(?sub, "Submission");
match sub.op {
Op::Interrupt => {
sess.interrupt_task();
}
Op::OverrideTurnContext {
cwd,
approval_policy,
sandbox_policy,
model,
effort,
summary,
} => {
// Recalculate the persistent turn context with provided overrides.
let prev = Arc::clone(&turn_context);
let provider = prev.client.get_provider();
// Effective model + family
let (effective_model, effective_family) = if let Some(m) = model {
let fam =
find_family_for_model(&m).unwrap_or_else(|| config.model_family.clone());
(m, fam)
} else {
(prev.client.get_model(), prev.client.get_model_family())
};
// Effective reasoning settings
let effective_effort = effort.unwrap_or(prev.client.get_reasoning_effort());
let effective_summary = summary.unwrap_or(prev.client.get_reasoning_summary());
let auth_manager = prev.client.get_auth_manager();
// Build updated config for the client
let mut updated_config = (*config).clone();
updated_config.model = effective_model.clone();
updated_config.model_family = effective_family.clone();
if let Some(model_info) = get_model_info(&effective_family) {
updated_config.model_context_window = Some(model_info.context_window);
}
let client = ModelClient::new(
Arc::new(updated_config),
auth_manager,
provider,
effective_effort,
effective_summary,
sess.session_id,
);
let new_approval_policy = approval_policy.unwrap_or(prev.approval_policy);
let new_sandbox_policy = sandbox_policy
.clone()
.unwrap_or(prev.sandbox_policy.clone());
let new_cwd = cwd.clone().unwrap_or_else(|| prev.cwd.clone());
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_family: &effective_family,
approval_policy: new_approval_policy,
sandbox_policy: new_sandbox_policy.clone(),
include_plan_tool: config.include_plan_tool,
include_apply_patch_tool: config.include_apply_patch_tool,
include_web_search_request: config.tools_web_search_request,
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
include_view_image_tool: config.include_view_image_tool,
});
let new_turn_context = TurnContext {
client,
tools_config,
user_instructions: prev.user_instructions.clone(),
base_instructions: prev.base_instructions.clone(),
approval_policy: new_approval_policy,
sandbox_policy: new_sandbox_policy.clone(),
shell_environment_policy: prev.shell_environment_policy.clone(),
cwd: new_cwd.clone(),
disable_response_storage: prev.disable_response_storage,
};
// Install the new persistent context for subsequent tasks/turns.
turn_context = Arc::new(new_turn_context);
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
))])
.await;
}
}
Op::UserInput { items } => {
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items) {
// no current task, spawn a new one
let task =
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
sess.set_task(task);
}
}
Op::UserTurn {
items,
cwd,
approval_policy,
sandbox_policy,
model,
effort,
summary,
} => {
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items) {
// Derive a fresh TurnContext for this turn using the provided overrides.
let provider = turn_context.client.get_provider();
let auth_manager = turn_context.client.get_auth_manager();
// Derive a model family for the requested model; fall back to the session's.
let model_family = find_family_for_model(&model)
.unwrap_or_else(|| config.model_family.clone());
// Create a perturn Config clone with the requested model/family.
let mut per_turn_config = (*config).clone();
per_turn_config.model = model.clone();
per_turn_config.model_family = model_family.clone();
if let Some(model_info) = get_model_info(&model_family) {
per_turn_config.model_context_window = Some(model_info.context_window);
}
// Build a new client with perturn reasoning settings.
// Reuse the same provider and session id; auth defaults to env/API key.
let client = ModelClient::new(
Arc::new(per_turn_config),
auth_manager,
provider,
effort,
summary,
sess.session_id,
);
let fresh_turn_context = TurnContext {
client,
tools_config: ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy,
sandbox_policy: sandbox_policy.clone(),
include_plan_tool: config.include_plan_tool,
include_apply_patch_tool: config.include_apply_patch_tool,
include_web_search_request: config.tools_web_search_request,
use_streamable_shell_tool: config
.use_experimental_streamable_shell_tool,
include_view_image_tool: config.include_view_image_tool,
}),
user_instructions: turn_context.user_instructions.clone(),
base_instructions: turn_context.base_instructions.clone(),
approval_policy,
sandbox_policy,
shell_environment_policy: turn_context.shell_environment_policy.clone(),
cwd,
disable_response_storage: turn_context.disable_response_storage,
};
// TODO: record the new environment context in the conversation history
// no current task, spawn a new one with the perturn context
let task =
AgentTask::spawn(sess.clone(), Arc::new(fresh_turn_context), sub.id, items);
sess.set_task(task);
}
}
Op::ExecApproval { id, decision } => match decision {
ReviewDecision::Abort => {
sess.interrupt_task();
}
other => sess.notify_approval(&id, other),
},
Op::PatchApproval { id, decision } => match decision {
ReviewDecision::Abort => {
sess.interrupt_task();
}
other => sess.notify_approval(&id, other),
},
Op::AddToHistory { text } => {
let id = sess.session_id;
let config = config.clone();
tokio::spawn(async move {
if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await
{
warn!("failed to append to message history: {e}");
}
});
}
Op::GetHistoryEntryRequest { offset, log_id } => {
let config = config.clone();
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
tokio::spawn(async move {
// Run lookup in blocking thread because it does file IO + locking.
let entry_opt = tokio::task::spawn_blocking(move || {
crate::message_history::lookup(log_id, offset, &config)
})
.await
.unwrap_or(None);
let event = Event {
id: sub_id,
msg: EventMsg::GetHistoryEntryResponse(
crate::protocol::GetHistoryEntryResponseEvent {
offset,
log_id,
entry: entry_opt.map(|e| {
codex_protocol::message_history::HistoryEntry {
session_id: e.session_id,
ts: e.ts,
text: e.text,
}
}),
},
),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send GetHistoryEntryResponse event: {e}");
}
});
}
Op::ListMcpTools => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
let tools = sess.mcp_connection_manager.list_all_tools();
let event = Event {
id: sub_id,
msg: EventMsg::McpListToolsResponse(
crate::protocol::McpListToolsResponseEvent { tools },
),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send McpListToolsResponse event: {e}");
}
}
Op::ListCustomPrompts => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
let custom_prompts: Vec<CustomPrompt> =
if let Some(dir) = crate::custom_prompts::default_prompts_dir() {
crate::custom_prompts::discover_prompts_in(&dir).await
} else {
Vec::new()
};
let event = Event {
id: sub_id,
msg: EventMsg::ListCustomPromptsResponse(ListCustomPromptsResponseEvent {
custom_prompts,
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ListCustomPromptsResponse event: {e}");
}
}
Op::Compact => {
// Create a summarization request as user input
const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md");
// Attempt to inject input into current task
if let Err(items) = sess.inject_input(vec![InputItem::Text {
text: "Start Summarization".to_string(),
}]) {
let task = AgentTask::compact(
sess.clone(),
Arc::clone(&turn_context),
sub.id,
items,
SUMMARIZATION_PROMPT.to_string(),
);
sess.set_task(task);
}
}
Op::Shutdown => {
info!("Shutting down Codex instance");
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
let recorder_opt = sess.rollout.lock_unchecked().take();
if let Some(rec) = recorder_opt
&& let Err(e) = rec.shutdown().await
{
warn!("failed to shutdown rollout recorder: {e}");
let event = Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send error message: {e:?}");
}
}
let event = Event {
id: sub.id.clone(),
msg: EventMsg::ShutdownComplete,
};
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send Shutdown event: {e}");
}
break;
}
Op::GetHistory => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
conversation_id: sess.session_id,
entries: sess.state.lock_unchecked().history.contents(),
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ConversationHistory event: {e}");
}
}
_ => {
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
}
}
debug!("Agent loop exited");
}
/// Takes a user message as input and runs a loop where, at each turn, the model
/// replies with either:
///
/// - requested function calls
/// - an assistant message
///
/// While it is possible for the model to return multiple of these items in a
/// single turn, in practice, we generally one item per turn:
///
/// - If the model requests a function call, we execute it and send the output
/// back to the model in the next turn.
/// - If the model sends only an assistant message, we record it in the
/// conversation history and consider the task complete.
async fn run_task(
sess: Arc<Session>,
turn_context: &TurnContext,
sub_id: String,
input: Vec<InputItem>,
) {
if input.is_empty() {
return;
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
}),
};
if sess.tx_event.send(event).await.is_err() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
.await;
let mut last_agent_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.
let mut turn_diff_tracker = TurnDiffTracker::new();
loop {
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
let pending_input = sess
.get_pending_input()
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
sess.record_conversation_items(&pending_input).await;
// Construct the input that we will send to the model. When using the
// Chat completions API (or ZDR clients), the model needs the full
// conversation history on each turn. The rollout file, however, should
// only record the new items that originated in this turn so that it
// represents an append-only log without duplicates.
let turn_input: Vec<ResponseItem> = sess.turn_input_with_history(pending_input);
let turn_input_messages: Vec<String> = turn_input
.iter()
.filter_map(|item| match item {
ResponseItem::Message { content, .. } => Some(content),
_ => None,
})
.flat_map(|content| {
content.iter().filter_map(|item| match item {
ContentItem::OutputText { text } => Some(text.clone()),
_ => None,
})
})
.collect();
match run_turn(
&sess,
turn_context,
&mut turn_diff_tracker,
sub_id.clone(),
turn_input,
)
.await
{
Ok(turn_output) => {
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in turn_output {
let ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
convert_call_tool_result_to_function_call_output_payload(
call_tool_result,
)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
},
);
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
}
if responses.is_empty() {
debug!("Turn completed");
last_agent_message = get_last_assistant_message_from_turn(
&items_to_record_in_conversation_history,
);
sess.maybe_notify(UserNotification::AgentTurnComplete {
turn_id: sub_id.clone(),
input_messages: turn_input_messages,
last_assistant_message: last_agent_message.clone(),
});
break;
}
}
Err(e) => {
info!("Turn error: {e:#}");
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: e.to_string(),
}),
};
sess.tx_event.send(event).await.ok();
// let the user continue the conversation
break;
}
}
}
sess.remove_task(&sub_id);
let event = Event {
id: sub_id,
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
};
sess.tx_event.send(event).await.ok();
}
async fn run_turn(
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: String,
input: Vec<ResponseItem>,
) -> CodexResult<Vec<ProcessedResponseItem>> {
let tools = get_openai_tools(
&turn_context.tools_config,
Some(sess.mcp_connection_manager.list_all_tools()),
);
let prompt = Prompt {
input,
store: !turn_context.disable_response_storage,
tools,
base_instructions_override: turn_context.base_instructions.clone(),
};
let mut retries = 0;
loop {
match try_run_turn(sess, turn_context, turn_diff_tracker, &sub_id, &prompt).await {
Ok(output) => return Ok(output),
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
Err(e @ (CodexErr::UsageLimitReached(_) | CodexErr::UsageNotIncluded)) => {
return Err(e);
}
Err(e) => {
// Use the configured provider-specific stream retry budget.
let max_retries = turn_context.client.get_provider().stream_max_retries();
if retries < max_retries {
retries += 1;
let delay = match e {
CodexErr::Stream(_, Some(delay)) => delay,
_ => backoff(retries),
};
warn!(
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
);
// Surface retry information to any UI/frontend so the
// user understands what is happening instead of staring
// at a seemingly frozen screen.
sess.notify_stream_error(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"
),
)
.await;
tokio::time::sleep(delay).await;
} else {
return Err(e);
}
}
}
}
}
/// When the model is prompted, it returns a stream of events. Some of these
/// events map to a `ResponseItem`. A `ResponseItem` may need to be
/// "handled" such that it produces a `ResponseInputItem` that needs to be
/// sent back to the model on the next turn.
#[derive(Debug)]
struct ProcessedResponseItem {
item: ResponseItem,
response: Option<ResponseInputItem>,
}
async fn try_run_turn(
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: &str,
prompt: &Prompt,
) -> CodexResult<Vec<ProcessedResponseItem>> {
// call_ids that are part of this response.
let completed_call_ids = prompt
.input
.iter()
.filter_map(|ri| match ri {
ResponseItem::FunctionCallOutput { call_id, .. } => Some(call_id),
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => Some(call_id),
ResponseItem::CustomToolCallOutput { call_id, .. } => Some(call_id),
_ => None,
})
.collect::<Vec<_>>();
// call_ids that were pending but are not part of this response.
// This usually happens because the user interrupted the model before we responded to one of its tool calls
// and then the user sent a follow-up message.
let missing_calls = {
prompt
.input
.iter()
.filter_map(|ri| match ri {
ResponseItem::FunctionCall { call_id, .. } => Some(call_id),
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => Some(call_id),
ResponseItem::CustomToolCall { call_id, .. } => Some(call_id),
_ => None,
})
.filter_map(|call_id| {
if completed_call_ids.contains(&call_id) {
None
} else {
Some(call_id.clone())
}
})
.map(|call_id| ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: "aborted".to_string(),
})
.collect::<Vec<_>>()
};
let prompt: Cow<Prompt> = if missing_calls.is_empty() {
Cow::Borrowed(prompt)
} else {
// Add the synthetic aborted missing calls to the beginning of the input to ensure all call ids have responses.
let input = [missing_calls, prompt.input.clone()].concat();
Cow::Owned(Prompt {
input,
..prompt.clone()
})
};
let mut stream = turn_context.client.clone().stream(&prompt).await?;
let mut output = Vec::new();
loop {
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = stream.next().await;
let Some(event) = event else {
// Channel closed without yielding a final Completed event or explicit error.
// Treat as a disconnected stream so the caller can retry.
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
};
let event = match event {
Ok(ev) => ev,
Err(e) => {
// Propagate the underlying stream error to the caller (run_turn), which
// will apply the configured `stream_max_retries` policy.
return Err(e);
}
};
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let response = handle_response_item(
sess,
turn_context,
turn_diff_tracker,
sub_id,
item.clone(),
)
.await?;
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::WebSearchCallBegin { call_id } => {
let _ = sess
.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
})
.await;
}
ResponseEvent::Completed {
response_id: _,
token_usage,
} => {
if let Some(token_usage) = token_usage {
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(token_usage),
})
.await
.ok();
}
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
let event = Event {
id: sub_id.to_string(),
msg,
};
let _ = sess.tx_event.send(event).await;
}
return Ok(output);
}
ResponseEvent::OutputTextDelta(delta) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryPartAdded => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningContentDelta(delta) => {
if sess.show_raw_agent_reasoning {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningRawContentDelta(
AgentReasoningRawContentDeltaEvent { delta },
),
};
sess.tx_event.send(event).await.ok();
}
}
}
}
}
async fn run_compact_task(
sess: Arc<Session>,
turn_context: &TurnContext,
sub_id: String,
input: Vec<InputItem>,
compact_instructions: String,
) {
let model_context_window = turn_context.client.get_model_context_window();
let start_event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window,
}),
};
if sess.tx_event.send(start_event).await.is_err() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let turn_input: Vec<ResponseItem> =
sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]);
let prompt = Prompt {
input: turn_input,
store: !turn_context.disable_response_storage,
tools: Vec::new(),
base_instructions_override: Some(compact_instructions.clone()),
};
let max_retries = turn_context.client.get_provider().stream_max_retries();
let mut retries = 0;
loop {
let attempt_result = drain_to_completed(&sess, turn_context, &sub_id, &prompt).await;
match attempt_result {
Ok(()) => break,
Err(CodexErr::Interrupted) => return,
Err(e) => {
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_stream_error(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"
),
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: e.to_string(),
}),
};
sess.send_event(event).await;
return;
}
}
}
}
sess.remove_task(&sub_id);
{
let mut state = sess.state.lock_unchecked();
state.history.keep_last_messages(1);
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
}),
};
sess.send_event(event).await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
};
sess.send_event(event).await;
}
async fn handle_response_item(
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: &str,
item: ResponseItem,
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");
let output = match item {
ResponseItem::Message { content, .. } => {
for item in content {
if let ContentItem::OutputText { text } = item {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
};
sess.tx_event.send(event).await.ok();
}
}
None
}
ResponseItem::Reasoning {
id: _,
summary,
content,
encrypted_content: _,
} => {
for item in summary {
let text = match item {
ReasoningItemReasoningSummary::SummaryText { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
if sess.show_raw_agent_reasoning
&& let Some(content) = content
{
for item in content {
let text = match item {
ReasoningItemContent::ReasoningText { text } => text,
ReasoningItemContent::Text { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
text,
}),
};
sess.tx_event.send(event).await.ok();
}
}
None
}
ResponseItem::FunctionCall {
name,
arguments,
call_id,
..
} => {
info!("FunctionCall: {name}({arguments})");
Some(
handle_function_call(
sess,
turn_context,
turn_diff_tracker,
sub_id.to_string(),
name,
arguments,
call_id,
)
.await,
)
}
ResponseItem::LocalShellCall {
id,
call_id,
status: _,
action,
} => {
let LocalShellAction::Exec(action) = action;
tracing::info!("LocalShellCall: {action:?}");
let params = ShellToolCallParams {
command: action.command,
workdir: action.working_directory,
timeout_ms: action.timeout_ms,
with_escalated_permissions: None,
justification: None,
};
let effective_call_id = match (call_id, id) {
(Some(call_id), _) => call_id,
(None, Some(id)) => id,
(None, None) => {
error!("LocalShellCall without call_id or id");
return Ok(Some(ResponseInputItem::FunctionCallOutput {
call_id: "".to_string(),
output: FunctionCallOutputPayload {
content: "LocalShellCall without call_id or id".to_string(),
success: None,
},
}));
}
};
let exec_params = to_exec_params(params, turn_context);
Some(
handle_container_exec_with_params(
exec_params,
sess,
turn_context,
turn_diff_tracker,
sub_id.to_string(),
effective_call_id,
)
.await,
)
}
ResponseItem::CustomToolCall {
id: _,
call_id,
name,
input,
status: _,
} => Some(
handle_custom_tool_call(
sess,
turn_context,
turn_diff_tracker,
sub_id.to_string(),
name,
input,
call_id,
)
.await,
),
ResponseItem::FunctionCallOutput { .. } => {
debug!("unexpected FunctionCallOutput from stream");
None
}
ResponseItem::CustomToolCallOutput { .. } => {
debug!("unexpected CustomToolCallOutput from stream");
None
}
ResponseItem::WebSearchCall { id, action, .. } => {
if let WebSearchAction::Search { query } = action {
let call_id = id.unwrap_or_else(|| "".to_string());
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchEnd(WebSearchEndEvent { call_id, query }),
};
sess.tx_event.send(event).await.ok();
}
None
}
ResponseItem::Other => None,
};
Ok(output)
}
async fn handle_function_call(
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: String,
name: String,
arguments: String,
call_id: String,
) -> ResponseInputItem {
match name.as_str() {
"container.exec" | "shell" => {
let params = match parse_container_exec_arguments(arguments, turn_context, &call_id) {
Ok(params) => params,
Err(output) => {
return *output;
}
};
handle_container_exec_with_params(
params,
sess,
turn_context,
turn_diff_tracker,
sub_id,
call_id,
)
.await
}
"view_image" => {
#[derive(serde::Deserialize)]
struct SeeImageArgs {
path: String,
}
let args = match serde_json::from_str::<SeeImageArgs>(&arguments) {
Ok(a) => a,
Err(e) => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {e}"),
success: Some(false),
},
};
}
};
let abs = turn_context.resolve_path(Some(args.path));
let output = match sess.inject_input(vec![InputItem::LocalImage { path: abs }]) {
Ok(()) => FunctionCallOutputPayload {
content: "attached local image path".to_string(),
success: Some(true),
},
Err(_) => FunctionCallOutputPayload {
content: "unable to attach image (no active task)".to_string(),
success: Some(false),
},
};
ResponseInputItem::FunctionCallOutput { call_id, output }
}
"apply_patch" => {
let args = match serde_json::from_str::<ApplyPatchToolArgs>(&arguments) {
Ok(a) => a,
Err(e) => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {e}"),
success: None,
},
};
}
};
let exec_params = ExecParams {
command: vec!["apply_patch".to_string(), args.input.clone()],
cwd: turn_context.cwd.clone(),
timeout_ms: None,
env: HashMap::new(),
with_escalated_permissions: None,
justification: None,
};
handle_container_exec_with_params(
exec_params,
sess,
turn_context,
turn_diff_tracker,
sub_id,
call_id,
)
.await
}
"update_plan" => handle_update_plan(sess, arguments, sub_id, call_id).await,
EXEC_COMMAND_TOOL_NAME => {
// TODO(mbolin): Sandbox check.
let exec_params = match serde_json::from_str::<ExecCommandParams>(&arguments) {
Ok(params) => params,
Err(e) => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {e}"),
success: Some(false),
},
};
}
};
let result = sess
.session_manager
.handle_exec_command_request(exec_params)
.await;
let function_call_output = crate::exec_command::result_into_payload(result);
ResponseInputItem::FunctionCallOutput {
call_id,
output: function_call_output,
}
}
WRITE_STDIN_TOOL_NAME => {
let write_stdin_params = match serde_json::from_str::<WriteStdinParams>(&arguments) {
Ok(params) => params,
Err(e) => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {e}"),
success: Some(false),
},
};
}
};
let result = sess
.session_manager
.handle_write_stdin_request(write_stdin_params)
.await;
let function_call_output: FunctionCallOutputPayload =
crate::exec_command::result_into_payload(result);
ResponseInputItem::FunctionCallOutput {
call_id,
output: function_call_output,
}
}
_ => {
match sess.mcp_connection_manager.parse_tool_name(&name) {
Some((server, tool_name)) => {
// TODO(mbolin): Determine appropriate timeout for tool call.
let timeout = None;
handle_mcp_tool_call(
sess, &sub_id, call_id, server, tool_name, arguments, timeout,
)
.await
}
None => {
// Unknown function: reply with structured failure so the model can adapt.
ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("unsupported call: {name}"),
success: None,
},
}
}
}
}
}
}
async fn handle_custom_tool_call(
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: String,
name: String,
input: String,
call_id: String,
) -> ResponseInputItem {
info!("CustomToolCall: {name} {input}");
match name.as_str() {
"apply_patch" => {
let exec_params = ExecParams {
command: vec!["apply_patch".to_string(), input.clone()],
cwd: turn_context.cwd.clone(),
timeout_ms: None,
env: HashMap::new(),
with_escalated_permissions: None,
justification: None,
};
let resp = handle_container_exec_with_params(
exec_params,
sess,
turn_context,
turn_diff_tracker,
sub_id,
call_id,
)
.await;
// Convert function-call style output into a custom tool call output
match resp {
ResponseInputItem::FunctionCallOutput { call_id, output } => {
ResponseInputItem::CustomToolCallOutput {
call_id,
output: output.content,
}
}
// Pass through if already a custom tool output or other variant
other => other,
}
}
_ => {
debug!("unexpected CustomToolCall from stream");
ResponseInputItem::CustomToolCallOutput {
call_id,
output: format!("unsupported custom tool call: {name}"),
}
}
}
}
fn to_exec_params(params: ShellToolCallParams, turn_context: &TurnContext) -> ExecParams {
ExecParams {
command: params.command,
cwd: turn_context.resolve_path(params.workdir.clone()),
timeout_ms: params.timeout_ms,
env: create_env(&turn_context.shell_environment_policy),
with_escalated_permissions: params.with_escalated_permissions,
justification: params.justification,
}
}
fn parse_container_exec_arguments(
arguments: String,
turn_context: &TurnContext,
call_id: &str,
) -> Result<ExecParams, Box<ResponseInputItem>> {
// parse command
match serde_json::from_str::<ShellToolCallParams>(&arguments) {
Ok(shell_tool_call_params) => Ok(to_exec_params(shell_tool_call_params, turn_context)),
Err(e) => {
// allow model to re-sample
let output = ResponseInputItem::FunctionCallOutput {
call_id: call_id.to_string(),
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {e}"),
success: None,
},
};
Err(Box::new(output))
}
}
}
pub struct ExecInvokeArgs<'a> {
pub params: ExecParams,
pub sandbox_type: SandboxType,
pub sandbox_policy: &'a SandboxPolicy,
pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
pub stdout_stream: Option<StdoutStream>,
}
fn maybe_translate_shell_command(
params: ExecParams,
sess: &Session,
turn_context: &TurnContext,
) -> ExecParams {
let should_translate = matches!(sess.user_shell, crate::shell::Shell::PowerShell(_))
|| turn_context.shell_environment_policy.use_profile;
if should_translate
&& let Some(command) = sess
.user_shell
.format_default_shell_invocation(params.command.clone())
{
return ExecParams { command, ..params };
}
params
}
async fn handle_container_exec_with_params(
params: ExecParams,
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: String,
call_id: String,
) -> ResponseInputItem {
// check if this was a patch, and apply it if so
let apply_patch_exec = match maybe_parse_apply_patch_verified(&params.command, &params.cwd) {
MaybeApplyPatchVerified::Body(changes) => {
match apply_patch::apply_patch(sess, turn_context, &sub_id, &call_id, changes).await {
InternalApplyPatchInvocation::Output(item) => return item,
InternalApplyPatchInvocation::DelegateToExec(apply_patch_exec) => {
Some(apply_patch_exec)
}
}
}
MaybeApplyPatchVerified::CorrectnessError(parse_error) => {
// It looks like an invocation of `apply_patch`, but we
// could not resolve it into a patch that would apply
// cleanly. Return to model for resample.
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("error: {parse_error:#}"),
success: None,
},
};
}
MaybeApplyPatchVerified::ShellParseError(error) => {
trace!("Failed to parse shell command, {error:?}");
None
}
MaybeApplyPatchVerified::NotApplyPatch => None,
};
let (params, safety, command_for_display) = match &apply_patch_exec {
Some(ApplyPatchExec {
action: ApplyPatchAction { patch, cwd, .. },
user_explicitly_approved_this_action,
}) => {
let path_to_codex = std::env::current_exe()
.ok()
.map(|p| p.to_string_lossy().to_string());
let Some(path_to_codex) = path_to_codex else {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: "failed to determine path to codex executable".to_string(),
success: None,
},
};
};
let params = ExecParams {
command: vec![
path_to_codex,
CODEX_APPLY_PATCH_ARG1.to_string(),
patch.clone(),
],
cwd: cwd.clone(),
timeout_ms: params.timeout_ms,
env: HashMap::new(),
with_escalated_permissions: params.with_escalated_permissions,
justification: params.justification.clone(),
};
let safety = if *user_explicitly_approved_this_action {
SafetyCheck::AutoApprove {
sandbox_type: SandboxType::None,
}
} else {
assess_safety_for_untrusted_command(
turn_context.approval_policy,
&turn_context.sandbox_policy,
params.with_escalated_permissions.unwrap_or(false),
)
};
(
params,
safety,
vec!["apply_patch".to_string(), patch.clone()],
)
}
None => {
let safety = {
let state = sess.state.lock_unchecked();
assess_command_safety(
&params.command,
turn_context.approval_policy,
&turn_context.sandbox_policy,
&state.approved_commands,
params.with_escalated_permissions.unwrap_or(false),
)
};
let command_for_display = params.command.clone();
(params, safety, command_for_display)
}
};
let sandbox_type = match safety {
SafetyCheck::AutoApprove { sandbox_type } => sandbox_type,
SafetyCheck::AskUser => {
let rx_approve = sess
.request_command_approval(
sub_id.clone(),
call_id.clone(),
params.command.clone(),
params.cwd.clone(),
params.justification.clone(),
)
.await;
match rx_approve.await.unwrap_or_default() {
ReviewDecision::Approved => (),
ReviewDecision::ApprovedForSession => {
sess.add_approved_command(params.command.clone());
}
ReviewDecision::Denied | ReviewDecision::Abort => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: "exec command rejected by user".to_string(),
success: None,
},
};
}
}
// No sandboxing is applied because the user has given
// explicit approval. Often, we end up in this case because
// the command cannot be run in a sandbox, such as
// installing a new dependency that requires network access.
SandboxType::None
}
SafetyCheck::Reject { reason } => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("exec command rejected: {reason}"),
success: None,
},
};
}
};
let exec_command_context = ExecCommandContext {
sub_id: sub_id.clone(),
call_id: call_id.clone(),
command_for_display: command_for_display.clone(),
cwd: params.cwd.clone(),
apply_patch: apply_patch_exec.map(
|ApplyPatchExec {
action,
user_explicitly_approved_this_action,
}| ApplyPatchCommandContext {
user_explicitly_approved_this_action,
changes: convert_apply_patch_to_protocol(&action),
},
),
};
let params = maybe_translate_shell_command(params, sess, turn_context);
let output_result = sess
.run_exec_with_events(
turn_diff_tracker,
exec_command_context.clone(),
ExecInvokeArgs {
params: params.clone(),
sandbox_type,
sandbox_policy: &turn_context.sandbox_policy,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
stdout_stream: if exec_command_context.apply_patch.is_some() {
None
} else {
Some(StdoutStream {
sub_id: sub_id.clone(),
call_id: call_id.clone(),
tx_event: sess.tx_event.clone(),
})
},
},
)
.await;
match output_result {
Ok(output) => {
let ExecToolCallOutput { exit_code, .. } = &output;
let is_success = *exit_code == 0;
let content = format_exec_output(&output);
ResponseInputItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content,
success: Some(is_success),
},
}
}
Err(CodexErr::Sandbox(error)) => {
handle_sandbox_error(
turn_diff_tracker,
params,
exec_command_context,
error,
sandbox_type,
sess,
turn_context,
)
.await
}
Err(e) => ResponseInputItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content: format!("execution error: {e}"),
success: None,
},
},
}
}
async fn handle_sandbox_error(
turn_diff_tracker: &mut TurnDiffTracker,
params: ExecParams,
exec_command_context: ExecCommandContext,
error: SandboxErr,
sandbox_type: SandboxType,
sess: &Session,
turn_context: &TurnContext,
) -> ResponseInputItem {
let call_id = exec_command_context.call_id.clone();
let sub_id = exec_command_context.sub_id.clone();
let cwd = exec_command_context.cwd.clone();
// Early out if either the user never wants to be asked for approval, or
// we're letting the model manage escalation requests. Otherwise, continue
match turn_context.approval_policy {
AskForApproval::Never | AskForApproval::OnRequest => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!(
"failed in sandbox {sandbox_type:?} with execution error: {error}"
),
success: Some(false),
},
};
}
AskForApproval::UnlessTrusted | AskForApproval::OnFailure => (),
}
// similarly, if the command timed out, we can simply return this failure to the model
if matches!(error, SandboxErr::Timeout) {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!(
"command timed out after {} milliseconds",
params.timeout_duration().as_millis()
),
success: Some(false),
},
};
}
// Note that when `error` is `SandboxErr::Denied`, it could be a false
// positive. That is, it may have exited with a non-zero exit code, not
// because the sandbox denied it, but because that is its expected behavior,
// i.e., a grep command that did not match anything. Ideally we would
// include additional metadata on the command to indicate whether non-zero
// exit codes merit a retry.
// For now, we categorically ask the user to retry without sandbox and
// emit the raw error as a background event.
sess.notify_background_event(&sub_id, format!("Execution failed: {error}"))
.await;
let rx_approve = sess
.request_command_approval(
sub_id.clone(),
call_id.clone(),
params.command.clone(),
cwd.clone(),
Some("command failed; retry without sandbox?".to_string()),
)
.await;
match rx_approve.await.unwrap_or_default() {
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
// Persist this command as preapproved for the
// remainder of the session so future
// executions skip the sandbox directly.
// TODO(ragona): Isn't this a bug? It always saves the command in an | fork?
sess.add_approved_command(params.command.clone());
// Inform UI we are retrying without sandbox.
sess.notify_background_event(&sub_id, "retrying command without sandbox")
.await;
// This is an escalated retry; the policy will not be
// examined and the sandbox has been set to `None`.
let retry_output_result = sess
.run_exec_with_events(
turn_diff_tracker,
exec_command_context.clone(),
ExecInvokeArgs {
params,
sandbox_type: SandboxType::None,
sandbox_policy: &turn_context.sandbox_policy,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
stdout_stream: if exec_command_context.apply_patch.is_some() {
None
} else {
Some(StdoutStream {
sub_id: sub_id.clone(),
call_id: call_id.clone(),
tx_event: sess.tx_event.clone(),
})
},
},
)
.await;
match retry_output_result {
Ok(retry_output) => {
let ExecToolCallOutput { exit_code, .. } = &retry_output;
let is_success = *exit_code == 0;
let content = format_exec_output(&retry_output);
ResponseInputItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content,
success: Some(is_success),
},
}
}
Err(e) => ResponseInputItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content: format!("retry failed: {e}"),
success: None,
},
},
}
}
ReviewDecision::Denied | ReviewDecision::Abort => {
// Fall through to original failure handling.
ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: "exec command rejected by user".to_string(),
success: None,
},
}
}
}
}
fn format_exec_output_str(exec_output: &ExecToolCallOutput) -> String {
let ExecToolCallOutput {
aggregated_output, ..
} = exec_output;
// Head+tail truncation for the model: show the beginning and end with an elision.
// Clients still receive full streams; only this formatted summary is capped.
let s = aggregated_output.text.as_str();
let total_lines = s.lines().count();
if s.len() <= MODEL_FORMAT_MAX_BYTES && total_lines <= MODEL_FORMAT_MAX_LINES {
return s.to_string();
}
let lines: Vec<&str> = s.lines().collect();
let head_take = MODEL_FORMAT_HEAD_LINES.min(lines.len());
let tail_take = MODEL_FORMAT_TAIL_LINES.min(lines.len().saturating_sub(head_take));
let omitted = lines.len().saturating_sub(head_take + tail_take);
// Join head and tail blocks (lines() strips newlines; reinsert them)
let head_block = lines
.iter()
.take(head_take)
.cloned()
.collect::<Vec<_>>()
.join("\n");
let tail_block = if tail_take > 0 {
lines[lines.len() - tail_take..].join("\n")
} else {
String::new()
};
let marker = format!("\n[... omitted {omitted} of {total_lines} lines ...]\n\n");
// Byte budgets for head/tail around the marker
let mut head_budget = MODEL_FORMAT_HEAD_BYTES.min(MODEL_FORMAT_MAX_BYTES);
let tail_budget = MODEL_FORMAT_MAX_BYTES.saturating_sub(head_budget + marker.len());
if tail_budget == 0 && marker.len() >= MODEL_FORMAT_MAX_BYTES {
// Degenerate case: marker alone exceeds budget; return a clipped marker
return take_bytes_at_char_boundary(&marker, MODEL_FORMAT_MAX_BYTES).to_string();
}
if tail_budget == 0 {
// Make room for the marker by shrinking head
head_budget = MODEL_FORMAT_MAX_BYTES.saturating_sub(marker.len());
}
// Enforce line-count cap by trimming head/tail lines
let head_lines_text = head_block;
let tail_lines_text = tail_block;
// Build final string respecting byte budgets
let head_part = take_bytes_at_char_boundary(&head_lines_text, head_budget);
let mut result = String::with_capacity(MODEL_FORMAT_MAX_BYTES.min(s.len()));
result.push_str(head_part);
result.push_str(&marker);
let remaining = MODEL_FORMAT_MAX_BYTES.saturating_sub(result.len());
let tail_budget_final = remaining;
let tail_part = take_last_bytes_at_char_boundary(&tail_lines_text, tail_budget_final);
result.push_str(tail_part);
result
}
// Truncate a &str to a byte budget at a char boundary (prefix)
#[inline]
fn take_bytes_at_char_boundary(s: &str, maxb: usize) -> &str {
if s.len() <= maxb {
return s;
}
let mut last_ok = 0;
for (i, ch) in s.char_indices() {
let nb = i + ch.len_utf8();
if nb > maxb {
break;
}
last_ok = nb;
}
&s[..last_ok]
}
// Take a suffix of a &str within a byte budget at a char boundary
#[inline]
fn take_last_bytes_at_char_boundary(s: &str, maxb: usize) -> &str {
if s.len() <= maxb {
return s;
}
let mut start = s.len();
let mut used = 0usize;
for (i, ch) in s.char_indices().rev() {
let nb = ch.len_utf8();
if used + nb > maxb {
break;
}
start = i;
used += nb;
if start == 0 {
break;
}
}
&s[start..]
}
/// Exec output is a pre-serialized JSON payload
fn format_exec_output(exec_output: &ExecToolCallOutput) -> String {
let ExecToolCallOutput {
exit_code,
duration,
..
} = exec_output;
#[derive(Serialize)]
struct ExecMetadata {
exit_code: i32,
duration_seconds: f32,
}
#[derive(Serialize)]
struct ExecOutput<'a> {
output: &'a str,
metadata: ExecMetadata,
}
// round to 1 decimal place
let duration_seconds = ((duration.as_secs_f32()) * 10.0).round() / 10.0;
let formatted_output = format_exec_output_str(exec_output);
let payload = ExecOutput {
output: &formatted_output,
metadata: ExecMetadata {
exit_code: *exit_code,
duration_seconds,
},
};
#[expect(clippy::expect_used)]
serde_json::to_string(&payload).expect("serialize ExecOutput")
}
fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
responses.iter().rev().find_map(|item| {
if let ResponseItem::Message { role, content, .. } = item {
if role == "assistant" {
content.iter().rev().find_map(|ci| {
if let ContentItem::OutputText { text } = ci {
Some(text.clone())
} else {
None
}
})
} else {
None
}
} else {
None
}
})
}
async fn drain_to_completed(
sess: &Session,
turn_context: &TurnContext,
sub_id: &str,
prompt: &Prompt,
) -> CodexResult<()> {
let mut stream = turn_context.client.clone().stream(prompt).await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
};
match event {
Ok(ResponseEvent::OutputItemDone(item)) => {
// Record only to in-memory conversation history; avoid state snapshot.
let mut state = sess.state.lock_unchecked();
state.history.record_items(std::slice::from_ref(&item));
}
Ok(ResponseEvent::Completed {
response_id: _,
token_usage,
}) => {
// some providers don't return token usage, so we default
// TODO: consider approximate token usage
let token_usage = token_usage.unwrap_or_default();
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(token_usage),
})
.await
.ok();
return Ok(());
}
Ok(_) => continue,
Err(e) => return Err(e),
}
}
}
fn convert_call_tool_result_to_function_call_output_payload(
call_tool_result: &CallToolResult,
) -> FunctionCallOutputPayload {
let CallToolResult {
content,
is_error,
structured_content,
} = call_tool_result;
// In terms of what to send back to the model, we prefer structured_content,
// if available, and fallback to content, otherwise.
let mut is_success = is_error != &Some(true);
let content = if let Some(structured_content) = structured_content
&& structured_content != &serde_json::Value::Null
&& let Ok(serialized_structured_content) = serde_json::to_string(&structured_content)
{
serialized_structured_content
} else {
match serde_json::to_string(&content) {
Ok(serialized_content) => serialized_content,
Err(err) => {
// If we could not serialize either content or structured_content to
// JSON, flag this as an error.
is_success = false;
err.to_string()
}
}
};
FunctionCallOutputPayload {
content,
success: Some(is_success),
}
}
#[cfg(test)]
mod tests {
use super::*;
use mcp_types::ContentBlock;
use mcp_types::TextContent;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::time::Duration as StdDuration;
fn text_block(s: &str) -> ContentBlock {
ContentBlock::TextContent(TextContent {
annotations: None,
text: s.to_string(),
r#type: "text".to_string(),
})
}
#[test]
fn prefers_structured_content_when_present() {
let ctr = CallToolResult {
// Content present but should be ignored because structured_content is set.
content: vec![text_block("ignored")],
is_error: None,
structured_content: Some(json!({
"ok": true,
"value": 42
})),
};
let got = convert_call_tool_result_to_function_call_output_payload(&ctr);
let expected = FunctionCallOutputPayload {
content: serde_json::to_string(&json!({
"ok": true,
"value": 42
}))
.unwrap(),
success: Some(true),
};
assert_eq!(expected, got);
}
#[test]
fn model_truncation_head_tail_by_lines() {
// Build 400 short lines so line-count limit, not byte budget, triggers truncation
let lines: Vec<String> = (1..=400).map(|i| format!("line{i}")).collect();
let full = lines.join("\n");
let exec = ExecToolCallOutput {
exit_code: 0,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(String::new()),
aggregated_output: StreamOutput::new(full.clone()),
duration: StdDuration::from_secs(1),
};
let out = format_exec_output_str(&exec);
// Expect elision marker with correct counts
let omitted = 400 - MODEL_FORMAT_MAX_LINES; // 144
let marker = format!("\n[... omitted {omitted} of 400 lines ...]\n\n");
assert!(out.contains(&marker), "missing marker: {out}");
// Validate head and tail
let parts: Vec<&str> = out.split(&marker).collect();
assert_eq!(parts.len(), 2, "expected one marker split");
let head = parts[0];
let tail = parts[1];
let expected_head: String = (1..=MODEL_FORMAT_HEAD_LINES)
.map(|i| format!("line{i}"))
.collect::<Vec<_>>()
.join("\n");
assert!(head.starts_with(&expected_head), "head mismatch");
let expected_tail: String = ((400 - MODEL_FORMAT_TAIL_LINES + 1)..=400)
.map(|i| format!("line{i}"))
.collect::<Vec<_>>()
.join("\n");
assert!(tail.ends_with(&expected_tail), "tail mismatch");
}
#[test]
fn model_truncation_respects_byte_budget() {
// Construct a large output (about 100kB) so byte budget dominates
let big_line = "x".repeat(100);
let full = std::iter::repeat_n(big_line.clone(), 1000)
.collect::<Vec<_>>()
.join("\n");
let exec = ExecToolCallOutput {
exit_code: 0,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(String::new()),
aggregated_output: StreamOutput::new(full.clone()),
duration: StdDuration::from_secs(1),
};
let out = format_exec_output_str(&exec);
assert!(out.len() <= MODEL_FORMAT_MAX_BYTES, "exceeds byte budget");
assert!(out.contains("omitted"), "should contain elision marker");
// Ensure head and tail are drawn from the original
assert!(full.starts_with(out.chars().take(8).collect::<String>().as_str()));
assert!(
full.ends_with(
out.chars()
.rev()
.take(8)
.collect::<String>()
.chars()
.rev()
.collect::<String>()
.as_str()
)
);
}
#[test]
fn falls_back_to_content_when_structured_is_null() {
let ctr = CallToolResult {
content: vec![text_block("hello"), text_block("world")],
is_error: None,
structured_content: Some(serde_json::Value::Null),
};
let got = convert_call_tool_result_to_function_call_output_payload(&ctr);
let expected = FunctionCallOutputPayload {
content: serde_json::to_string(&vec![text_block("hello"), text_block("world")])
.unwrap(),
success: Some(true),
};
assert_eq!(expected, got);
}
#[test]
fn success_flag_reflects_is_error_true() {
let ctr = CallToolResult {
content: vec![text_block("unused")],
is_error: Some(true),
structured_content: Some(json!({ "message": "bad" })),
};
let got = convert_call_tool_result_to_function_call_output_payload(&ctr);
let expected = FunctionCallOutputPayload {
content: serde_json::to_string(&json!({ "message": "bad" })).unwrap(),
success: Some(false),
};
assert_eq!(expected, got);
}
#[test]
fn success_flag_true_with_no_error_and_content_used() {
let ctr = CallToolResult {
content: vec![text_block("alpha")],
is_error: Some(false),
structured_content: None,
};
let got = convert_call_tool_result_to_function_call_output_payload(&ctr);
let expected = FunctionCallOutputPayload {
content: serde_json::to_string(&vec![text_block("alpha")]).unwrap(),
success: Some(true),
};
assert_eq!(expected, got);
}
}