# Note for reviewers The bulk of this PR is in in the new file, `parse_command.rs`. This file is designed to be written TDD and implemented with Codex. Do not worry about reviewing the code, just review the unit tests (if you want). If any cases are missing, we'll add more tests and have Codex fix them. I think the best approach will be to land and iterate. I have some follow-ups I want to do after this lands. The next PR after this will let us merge (and dedupe) multiple sequential cells of the same such as multiple read commands. The deduping will also be important because the model often reads the same file multiple times in a row in chunks === This PR formats common commands like reading, formatting, testing, etc more nicely: It tries to extract things like file names, tests and falls back to the cmd if it doesn't. It also only shows stdout/err if the command failed. <img width="770" height="238" alt="CleanShot 2025-08-09 at 16 05 15" src="https://github.com/user-attachments/assets/0ead179a-8910-486b-aa3d-7d26264d751e" /> <img width="348" height="158" alt="CleanShot 2025-08-09 at 16 05 32" src="https://github.com/user-attachments/assets/4302681b-5e87-4ff3-85b4-0252c6c485a9" /> <img width="834" height="324" alt="CleanShot 2025-08-09 at 16 05 56 2" src="https://github.com/user-attachments/assets/09fb3517-7bd6-40f6-a126-4172106b700f" /> Part 2: https://github.com/openai/codex/pull/2097 Part 3: https://github.com/openai/codex/pull/2110
2255 lines
80 KiB
Rust
2255 lines
80 KiB
Rust
// Poisoned mutex should fail the program
|
||
#![allow(clippy::unwrap_used)]
|
||
|
||
use std::borrow::Cow;
|
||
use std::collections::HashMap;
|
||
use std::collections::HashSet;
|
||
use std::path::PathBuf;
|
||
use std::sync::Arc;
|
||
use std::sync::Mutex;
|
||
use std::sync::atomic::AtomicU64;
|
||
use std::time::Duration;
|
||
|
||
use async_channel::Receiver;
|
||
use async_channel::Sender;
|
||
use codex_apply_patch::ApplyPatchAction;
|
||
use codex_apply_patch::MaybeApplyPatchVerified;
|
||
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
||
use codex_login::CodexAuth;
|
||
use futures::prelude::*;
|
||
use mcp_types::CallToolResult;
|
||
use serde::Serialize;
|
||
use serde_json;
|
||
use tokio::sync::Notify;
|
||
use tokio::sync::oneshot;
|
||
use tokio::task::AbortHandle;
|
||
use tracing::debug;
|
||
use tracing::error;
|
||
use tracing::info;
|
||
use tracing::trace;
|
||
use tracing::warn;
|
||
use uuid::Uuid;
|
||
|
||
use crate::apply_patch::ApplyPatchExec;
|
||
use crate::apply_patch::CODEX_APPLY_PATCH_ARG1;
|
||
use crate::apply_patch::InternalApplyPatchInvocation;
|
||
use crate::apply_patch::convert_apply_patch_to_protocol;
|
||
use crate::apply_patch::get_writable_roots;
|
||
use crate::apply_patch::{self};
|
||
use crate::client::ModelClient;
|
||
use crate::client_common::EnvironmentContext;
|
||
use crate::client_common::Prompt;
|
||
use crate::client_common::ResponseEvent;
|
||
use crate::config::Config;
|
||
use crate::config_types::ShellEnvironmentPolicy;
|
||
use crate::conversation_history::ConversationHistory;
|
||
use crate::error::CodexErr;
|
||
use crate::error::Result as CodexResult;
|
||
use crate::error::SandboxErr;
|
||
use crate::error::get_error_message_ui;
|
||
use crate::exec::ExecParams;
|
||
use crate::exec::ExecToolCallOutput;
|
||
use crate::exec::SandboxType;
|
||
use crate::exec::StdoutStream;
|
||
use crate::exec::process_exec_tool_call;
|
||
use crate::exec_env::create_env;
|
||
use crate::mcp_connection_manager::McpConnectionManager;
|
||
use crate::mcp_tool_call::handle_mcp_tool_call;
|
||
use crate::models::ContentItem;
|
||
use crate::models::FunctionCallOutputPayload;
|
||
use crate::models::LocalShellAction;
|
||
use crate::models::ReasoningItemContent;
|
||
use crate::models::ReasoningItemReasoningSummary;
|
||
use crate::models::ResponseInputItem;
|
||
use crate::models::ResponseItem;
|
||
use crate::models::ShellToolCallParams;
|
||
use crate::openai_tools::ToolsConfig;
|
||
use crate::openai_tools::get_openai_tools;
|
||
use crate::parse_command::parse_command;
|
||
use crate::plan_tool::handle_update_plan;
|
||
use crate::project_doc::get_user_instructions;
|
||
use crate::protocol::AgentMessageDeltaEvent;
|
||
use crate::protocol::AgentMessageEvent;
|
||
use crate::protocol::AgentReasoningDeltaEvent;
|
||
use crate::protocol::AgentReasoningEvent;
|
||
use crate::protocol::AgentReasoningRawContentDeltaEvent;
|
||
use crate::protocol::AgentReasoningRawContentEvent;
|
||
use crate::protocol::ApplyPatchApprovalRequestEvent;
|
||
use crate::protocol::AskForApproval;
|
||
use crate::protocol::BackgroundEventEvent;
|
||
use crate::protocol::ErrorEvent;
|
||
use crate::protocol::Event;
|
||
use crate::protocol::EventMsg;
|
||
use crate::protocol::ExecApprovalRequestEvent;
|
||
use crate::protocol::ExecCommandBeginEvent;
|
||
use crate::protocol::ExecCommandEndEvent;
|
||
use crate::protocol::FileChange;
|
||
use crate::protocol::InputItem;
|
||
use crate::protocol::Op;
|
||
use crate::protocol::PatchApplyBeginEvent;
|
||
use crate::protocol::PatchApplyEndEvent;
|
||
use crate::protocol::ReviewDecision;
|
||
use crate::protocol::SandboxPolicy;
|
||
use crate::protocol::SessionConfiguredEvent;
|
||
use crate::protocol::Submission;
|
||
use crate::protocol::TaskCompleteEvent;
|
||
use crate::protocol::TurnDiffEvent;
|
||
use crate::rollout::RolloutRecorder;
|
||
use crate::safety::SafetyCheck;
|
||
use crate::safety::assess_command_safety;
|
||
use crate::safety::assess_safety_for_untrusted_command;
|
||
use crate::shell;
|
||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||
use crate::user_notification::UserNotification;
|
||
use crate::util::backoff;
|
||
|
||
/// The high-level interface to the Codex system.
|
||
/// It operates as a queue pair where you send submissions and receive events.
|
||
pub struct Codex {
|
||
next_id: AtomicU64,
|
||
tx_sub: Sender<Submission>,
|
||
rx_event: Receiver<Event>,
|
||
}
|
||
|
||
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
|
||
/// the submission id for the initial `ConfigureSession` request and the
|
||
/// unique session id.
|
||
pub struct CodexSpawnOk {
|
||
pub codex: Codex,
|
||
pub init_id: String,
|
||
pub session_id: Uuid,
|
||
}
|
||
|
||
impl Codex {
|
||
/// Spawn a new [`Codex`] and initialize the session.
|
||
pub async fn spawn(
|
||
config: Config,
|
||
auth: Option<CodexAuth>,
|
||
ctrl_c: Arc<Notify>,
|
||
) -> CodexResult<CodexSpawnOk> {
|
||
// experimental resume path (undocumented)
|
||
let resume_path = config.experimental_resume.clone();
|
||
info!("resume_path: {resume_path:?}");
|
||
let (tx_sub, rx_sub) = async_channel::bounded(64);
|
||
let (tx_event, rx_event) = async_channel::unbounded();
|
||
|
||
let user_instructions = get_user_instructions(&config).await;
|
||
|
||
let configure_session = Op::ConfigureSession {
|
||
provider: config.model_provider.clone(),
|
||
model: config.model.clone(),
|
||
model_reasoning_effort: config.model_reasoning_effort,
|
||
model_reasoning_summary: config.model_reasoning_summary,
|
||
user_instructions,
|
||
base_instructions: config.base_instructions.clone(),
|
||
approval_policy: config.approval_policy,
|
||
sandbox_policy: config.sandbox_policy.clone(),
|
||
disable_response_storage: config.disable_response_storage,
|
||
notify: config.notify.clone(),
|
||
cwd: config.cwd.clone(),
|
||
resume_path: resume_path.clone(),
|
||
};
|
||
|
||
let config = Arc::new(config);
|
||
|
||
// Generate a unique ID for the lifetime of this Codex session.
|
||
let session_id = Uuid::new_v4();
|
||
tokio::spawn(submission_loop(
|
||
session_id, config, auth, rx_sub, tx_event, ctrl_c,
|
||
));
|
||
let codex = Codex {
|
||
next_id: AtomicU64::new(0),
|
||
tx_sub,
|
||
rx_event,
|
||
};
|
||
let init_id = codex.submit(configure_session).await?;
|
||
|
||
Ok(CodexSpawnOk {
|
||
codex,
|
||
init_id,
|
||
session_id,
|
||
})
|
||
}
|
||
|
||
/// Submit the `op` wrapped in a `Submission` with a unique ID.
|
||
pub async fn submit(&self, op: Op) -> CodexResult<String> {
|
||
let id = self
|
||
.next_id
|
||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
|
||
.to_string();
|
||
let sub = Submission { id: id.clone(), op };
|
||
self.submit_with_id(sub).await?;
|
||
Ok(id)
|
||
}
|
||
|
||
/// Use sparingly: prefer `submit()` so Codex is responsible for generating
|
||
/// unique IDs for each submission.
|
||
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
|
||
self.tx_sub
|
||
.send(sub)
|
||
.await
|
||
.map_err(|_| CodexErr::InternalAgentDied)?;
|
||
Ok(())
|
||
}
|
||
|
||
pub async fn next_event(&self) -> CodexResult<Event> {
|
||
let event = self
|
||
.rx_event
|
||
.recv()
|
||
.await
|
||
.map_err(|_| CodexErr::InternalAgentDied)?;
|
||
Ok(event)
|
||
}
|
||
}
|
||
|
||
/// Context for an initialized model agent
|
||
///
|
||
/// A session has at most 1 running task at a time, and can be interrupted by user input.
|
||
pub(crate) struct Session {
|
||
client: ModelClient,
|
||
pub(crate) tx_event: Sender<Event>,
|
||
ctrl_c: Arc<Notify>,
|
||
|
||
/// The session's current working directory. All relative paths provided by
|
||
/// the model as well as sandbox policies are resolved against this path
|
||
/// instead of `std::env::current_dir()`.
|
||
pub(crate) cwd: PathBuf,
|
||
base_instructions: Option<String>,
|
||
user_instructions: Option<String>,
|
||
pub(crate) approval_policy: AskForApproval,
|
||
sandbox_policy: SandboxPolicy,
|
||
shell_environment_policy: ShellEnvironmentPolicy,
|
||
pub(crate) writable_roots: Mutex<Vec<PathBuf>>,
|
||
disable_response_storage: bool,
|
||
tools_config: ToolsConfig,
|
||
|
||
/// Manager for external MCP servers/tools.
|
||
mcp_connection_manager: McpConnectionManager,
|
||
|
||
/// External notifier command (will be passed as args to exec()). When
|
||
/// `None` this feature is disabled.
|
||
notify: Option<Vec<String>>,
|
||
|
||
/// Optional rollout recorder for persisting the conversation transcript so
|
||
/// sessions can be replayed or inspected later.
|
||
rollout: Mutex<Option<RolloutRecorder>>,
|
||
state: Mutex<State>,
|
||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||
user_shell: shell::Shell,
|
||
show_raw_agent_reasoning: bool,
|
||
}
|
||
|
||
impl Session {
|
||
fn resolve_path(&self, path: Option<String>) -> PathBuf {
|
||
path.as_ref()
|
||
.map(PathBuf::from)
|
||
.map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
|
||
}
|
||
}
|
||
|
||
/// Mutable state of the agent
|
||
#[derive(Default)]
|
||
struct State {
|
||
approved_commands: HashSet<Vec<String>>,
|
||
current_task: Option<AgentTask>,
|
||
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||
pending_input: Vec<ResponseInputItem>,
|
||
history: ConversationHistory,
|
||
}
|
||
|
||
impl Session {
|
||
pub fn set_task(&self, task: AgentTask) {
|
||
let mut state = self.state.lock().unwrap();
|
||
if let Some(current_task) = state.current_task.take() {
|
||
current_task.abort();
|
||
}
|
||
state.current_task = Some(task);
|
||
}
|
||
|
||
pub fn remove_task(&self, sub_id: &str) {
|
||
let mut state = self.state.lock().unwrap();
|
||
if let Some(task) = &state.current_task {
|
||
if task.sub_id == sub_id {
|
||
state.current_task.take();
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Sends the given event to the client and swallows the send event, if
|
||
/// any, logging it as an error.
|
||
pub(crate) async fn send_event(&self, event: Event) {
|
||
if let Err(e) = self.tx_event.send(event).await {
|
||
error!("failed to send tool call event: {e}");
|
||
}
|
||
}
|
||
|
||
pub async fn request_command_approval(
|
||
&self,
|
||
sub_id: String,
|
||
call_id: String,
|
||
command: Vec<String>,
|
||
cwd: PathBuf,
|
||
reason: Option<String>,
|
||
) -> oneshot::Receiver<ReviewDecision> {
|
||
let (tx_approve, rx_approve) = oneshot::channel();
|
||
let event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||
call_id,
|
||
command,
|
||
cwd,
|
||
reason,
|
||
}),
|
||
};
|
||
let _ = self.tx_event.send(event).await;
|
||
{
|
||
let mut state = self.state.lock().unwrap();
|
||
state.pending_approvals.insert(sub_id, tx_approve);
|
||
}
|
||
rx_approve
|
||
}
|
||
|
||
pub async fn request_patch_approval(
|
||
&self,
|
||
sub_id: String,
|
||
call_id: String,
|
||
action: &ApplyPatchAction,
|
||
reason: Option<String>,
|
||
grant_root: Option<PathBuf>,
|
||
) -> oneshot::Receiver<ReviewDecision> {
|
||
let (tx_approve, rx_approve) = oneshot::channel();
|
||
let event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||
call_id,
|
||
changes: convert_apply_patch_to_protocol(action),
|
||
reason,
|
||
grant_root,
|
||
}),
|
||
};
|
||
let _ = self.tx_event.send(event).await;
|
||
{
|
||
let mut state = self.state.lock().unwrap();
|
||
state.pending_approvals.insert(sub_id, tx_approve);
|
||
}
|
||
rx_approve
|
||
}
|
||
|
||
pub fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
|
||
let mut state = self.state.lock().unwrap();
|
||
if let Some(tx_approve) = state.pending_approvals.remove(sub_id) {
|
||
tx_approve.send(decision).ok();
|
||
}
|
||
}
|
||
|
||
pub fn add_approved_command(&self, cmd: Vec<String>) {
|
||
let mut state = self.state.lock().unwrap();
|
||
state.approved_commands.insert(cmd);
|
||
}
|
||
|
||
/// Records items to both the rollout and the chat completions/ZDR
|
||
/// transcript, if enabled.
|
||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||
debug!("Recording items for conversation: {items:?}");
|
||
self.record_state_snapshot(items).await;
|
||
|
||
self.state.lock().unwrap().history.record_items(items);
|
||
}
|
||
|
||
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
|
||
let snapshot = { crate::rollout::SessionStateSnapshot {} };
|
||
|
||
let recorder = {
|
||
let guard = self.rollout.lock().unwrap();
|
||
guard.as_ref().cloned()
|
||
};
|
||
|
||
if let Some(rec) = recorder {
|
||
if let Err(e) = rec.record_state(snapshot).await {
|
||
error!("failed to record rollout state: {e:#}");
|
||
}
|
||
if let Err(e) = rec.record_items(items).await {
|
||
error!("failed to record rollout items: {e:#}");
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn on_exec_command_begin(
|
||
&self,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
exec_command_context: ExecCommandContext,
|
||
) {
|
||
let ExecCommandContext {
|
||
sub_id,
|
||
call_id,
|
||
command_for_display,
|
||
cwd,
|
||
apply_patch,
|
||
} = exec_command_context;
|
||
let msg = match apply_patch {
|
||
Some(ApplyPatchCommandContext {
|
||
user_explicitly_approved_this_action,
|
||
changes,
|
||
}) => {
|
||
turn_diff_tracker.on_patch_begin(&changes);
|
||
|
||
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
|
||
call_id,
|
||
auto_approved: !user_explicitly_approved_this_action,
|
||
changes,
|
||
})
|
||
}
|
||
None => EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
|
||
call_id,
|
||
command: command_for_display.clone(),
|
||
cwd,
|
||
parsed_cmd: parse_command(&command_for_display),
|
||
}),
|
||
};
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg,
|
||
};
|
||
let _ = self.tx_event.send(event).await;
|
||
}
|
||
|
||
#[allow(clippy::too_many_arguments)]
|
||
async fn on_exec_command_end(
|
||
&self,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
sub_id: &str,
|
||
call_id: &str,
|
||
output: &ExecToolCallOutput,
|
||
is_apply_patch: bool,
|
||
) {
|
||
let ExecToolCallOutput {
|
||
stdout,
|
||
stderr,
|
||
duration,
|
||
exit_code,
|
||
} = output;
|
||
// Because stdout and stderr could each be up to 100 KiB, we send
|
||
// truncated versions.
|
||
const MAX_STREAM_OUTPUT: usize = 5 * 1024; // 5KiB
|
||
let stdout = stdout.chars().take(MAX_STREAM_OUTPUT).collect();
|
||
let stderr = stderr.chars().take(MAX_STREAM_OUTPUT).collect();
|
||
|
||
let msg = if is_apply_patch {
|
||
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
|
||
call_id: call_id.to_string(),
|
||
stdout,
|
||
stderr,
|
||
success: *exit_code == 0,
|
||
})
|
||
} else {
|
||
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
||
call_id: call_id.to_string(),
|
||
stdout,
|
||
stderr,
|
||
duration: *duration,
|
||
exit_code: *exit_code,
|
||
})
|
||
};
|
||
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg,
|
||
};
|
||
let _ = self.tx_event.send(event).await;
|
||
|
||
// If this is an apply_patch, after we emit the end patch, emit a second event
|
||
// with the full turn diff if there is one.
|
||
if is_apply_patch {
|
||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||
if let Ok(Some(unified_diff)) = unified_diff {
|
||
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
|
||
let event = Event {
|
||
id: sub_id.into(),
|
||
msg,
|
||
};
|
||
let _ = self.tx_event.send(event).await;
|
||
}
|
||
}
|
||
}
|
||
/// Runs the exec tool call and emits events for the begin and end of the
|
||
/// command even on error.
|
||
///
|
||
/// Returns the output of the exec tool call.
|
||
async fn run_exec_with_events<'a>(
|
||
&self,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
begin_ctx: ExecCommandContext,
|
||
exec_args: ExecInvokeArgs<'a>,
|
||
) -> crate::error::Result<ExecToolCallOutput> {
|
||
let is_apply_patch = begin_ctx.apply_patch.is_some();
|
||
let sub_id = begin_ctx.sub_id.clone();
|
||
let call_id = begin_ctx.call_id.clone();
|
||
|
||
self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone())
|
||
.await;
|
||
|
||
let result = process_exec_tool_call(
|
||
exec_args.params,
|
||
exec_args.sandbox_type,
|
||
exec_args.ctrl_c,
|
||
exec_args.sandbox_policy,
|
||
exec_args.codex_linux_sandbox_exe,
|
||
exec_args.stdout_stream,
|
||
)
|
||
.await;
|
||
|
||
let output_stderr;
|
||
let borrowed: &ExecToolCallOutput = match &result {
|
||
Ok(output) => output,
|
||
Err(e) => {
|
||
output_stderr = ExecToolCallOutput {
|
||
exit_code: -1,
|
||
stdout: String::new(),
|
||
stderr: get_error_message_ui(e),
|
||
duration: Duration::default(),
|
||
};
|
||
&output_stderr
|
||
}
|
||
};
|
||
self.on_exec_command_end(
|
||
turn_diff_tracker,
|
||
&sub_id,
|
||
&call_id,
|
||
borrowed,
|
||
is_apply_patch,
|
||
)
|
||
.await;
|
||
|
||
result
|
||
}
|
||
|
||
/// Helper that emits a BackgroundEvent with the given message. This keeps
|
||
/// the call‑sites terse so adding more diagnostics does not clutter the
|
||
/// core agent logic.
|
||
async fn notify_background_event(&self, sub_id: &str, message: impl Into<String>) {
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
|
||
message: message.into(),
|
||
}),
|
||
};
|
||
let _ = self.tx_event.send(event).await;
|
||
}
|
||
|
||
/// Build the full turn input by concatenating the current conversation
|
||
/// history with additional items for this turn.
|
||
pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
|
||
[self.state.lock().unwrap().history.contents(), extra].concat()
|
||
}
|
||
|
||
/// Returns the input if there was no task running to inject into
|
||
pub fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
|
||
let mut state = self.state.lock().unwrap();
|
||
if state.current_task.is_some() {
|
||
state.pending_input.push(input.into());
|
||
Ok(())
|
||
} else {
|
||
Err(input)
|
||
}
|
||
}
|
||
|
||
pub fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||
let mut state = self.state.lock().unwrap();
|
||
if state.pending_input.is_empty() {
|
||
Vec::with_capacity(0)
|
||
} else {
|
||
let mut ret = Vec::new();
|
||
std::mem::swap(&mut ret, &mut state.pending_input);
|
||
ret
|
||
}
|
||
}
|
||
|
||
pub async fn call_tool(
|
||
&self,
|
||
server: &str,
|
||
tool: &str,
|
||
arguments: Option<serde_json::Value>,
|
||
timeout: Option<Duration>,
|
||
) -> anyhow::Result<CallToolResult> {
|
||
self.mcp_connection_manager
|
||
.call_tool(server, tool, arguments, timeout)
|
||
.await
|
||
}
|
||
|
||
pub fn abort(&self) {
|
||
info!("Aborting existing session");
|
||
let mut state = self.state.lock().unwrap();
|
||
state.pending_approvals.clear();
|
||
state.pending_input.clear();
|
||
if let Some(task) = state.current_task.take() {
|
||
task.abort();
|
||
}
|
||
}
|
||
|
||
/// Spawn the configured notifier (if any) with the given JSON payload as
|
||
/// the last argument. Failures are logged but otherwise ignored so that
|
||
/// notification issues do not interfere with the main workflow.
|
||
fn maybe_notify(&self, notification: UserNotification) {
|
||
let Some(notify_command) = &self.notify else {
|
||
return;
|
||
};
|
||
|
||
if notify_command.is_empty() {
|
||
return;
|
||
}
|
||
|
||
let Ok(json) = serde_json::to_string(¬ification) else {
|
||
error!("failed to serialise notification payload");
|
||
return;
|
||
};
|
||
|
||
let mut command = std::process::Command::new(¬ify_command[0]);
|
||
if notify_command.len() > 1 {
|
||
command.args(¬ify_command[1..]);
|
||
}
|
||
command.arg(json);
|
||
|
||
// Fire-and-forget – we do not wait for completion.
|
||
if let Err(e) = command.spawn() {
|
||
warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
|
||
}
|
||
}
|
||
}
|
||
|
||
impl Drop for Session {
|
||
fn drop(&mut self) {
|
||
self.abort();
|
||
}
|
||
}
|
||
|
||
impl State {
|
||
pub fn partial_clone(&self) -> Self {
|
||
Self {
|
||
approved_commands: self.approved_commands.clone(),
|
||
history: self.history.clone(),
|
||
..Default::default()
|
||
}
|
||
}
|
||
}
|
||
|
||
#[derive(Clone, Debug)]
|
||
pub(crate) struct ExecCommandContext {
|
||
pub(crate) sub_id: String,
|
||
pub(crate) call_id: String,
|
||
pub(crate) command_for_display: Vec<String>,
|
||
pub(crate) cwd: PathBuf,
|
||
pub(crate) apply_patch: Option<ApplyPatchCommandContext>,
|
||
}
|
||
|
||
#[derive(Clone, Debug)]
|
||
pub(crate) struct ApplyPatchCommandContext {
|
||
pub(crate) user_explicitly_approved_this_action: bool,
|
||
pub(crate) changes: HashMap<PathBuf, FileChange>,
|
||
}
|
||
|
||
/// A series of Turns in response to user input.
|
||
pub(crate) struct AgentTask {
|
||
sess: Arc<Session>,
|
||
sub_id: String,
|
||
handle: AbortHandle,
|
||
}
|
||
|
||
impl AgentTask {
|
||
fn spawn(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) -> Self {
|
||
let handle =
|
||
tokio::spawn(run_task(Arc::clone(&sess), sub_id.clone(), input)).abort_handle();
|
||
Self {
|
||
sess,
|
||
sub_id,
|
||
handle,
|
||
}
|
||
}
|
||
fn compact(
|
||
sess: Arc<Session>,
|
||
sub_id: String,
|
||
input: Vec<InputItem>,
|
||
compact_instructions: String,
|
||
) -> Self {
|
||
let handle = tokio::spawn(run_compact_task(
|
||
Arc::clone(&sess),
|
||
sub_id.clone(),
|
||
input,
|
||
compact_instructions,
|
||
))
|
||
.abort_handle();
|
||
Self {
|
||
sess,
|
||
sub_id,
|
||
handle,
|
||
}
|
||
}
|
||
|
||
fn abort(self) {
|
||
if !self.handle.is_finished() {
|
||
self.handle.abort();
|
||
let event = Event {
|
||
id: self.sub_id,
|
||
msg: EventMsg::Error(ErrorEvent {
|
||
message: " Turn interrupted".to_string(),
|
||
}),
|
||
};
|
||
let tx_event = self.sess.tx_event.clone();
|
||
tokio::spawn(async move {
|
||
tx_event.send(event).await.ok();
|
||
});
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn submission_loop(
|
||
mut session_id: Uuid,
|
||
config: Arc<Config>,
|
||
auth: Option<CodexAuth>,
|
||
rx_sub: Receiver<Submission>,
|
||
tx_event: Sender<Event>,
|
||
ctrl_c: Arc<Notify>,
|
||
) {
|
||
let mut sess: Option<Arc<Session>> = None;
|
||
// shorthand - send an event when there is no active session
|
||
let send_no_session_event = |sub_id: String| async {
|
||
let event = Event {
|
||
id: sub_id,
|
||
msg: EventMsg::Error(ErrorEvent {
|
||
message: "No session initialized, expected 'ConfigureSession' as first Op"
|
||
.to_string(),
|
||
}),
|
||
};
|
||
tx_event.send(event).await.ok();
|
||
};
|
||
|
||
loop {
|
||
let interrupted = ctrl_c.notified();
|
||
let sub = tokio::select! {
|
||
res = rx_sub.recv() => match res {
|
||
Ok(sub) => sub,
|
||
Err(_) => break,
|
||
},
|
||
_ = interrupted => {
|
||
if let Some(sess) = sess.as_ref(){
|
||
sess.abort();
|
||
}
|
||
continue;
|
||
},
|
||
};
|
||
|
||
debug!(?sub, "Submission");
|
||
match sub.op {
|
||
Op::Interrupt => {
|
||
let sess = match sess.as_ref() {
|
||
Some(sess) => sess,
|
||
None => {
|
||
send_no_session_event(sub.id).await;
|
||
continue;
|
||
}
|
||
};
|
||
sess.abort();
|
||
}
|
||
Op::ConfigureSession {
|
||
provider,
|
||
model,
|
||
model_reasoning_effort,
|
||
model_reasoning_summary,
|
||
user_instructions,
|
||
base_instructions,
|
||
approval_policy,
|
||
sandbox_policy,
|
||
disable_response_storage,
|
||
notify,
|
||
cwd,
|
||
resume_path,
|
||
} => {
|
||
debug!(
|
||
"Configuring session: model={model}; provider={provider:?}; resume={resume_path:?}"
|
||
);
|
||
if !cwd.is_absolute() {
|
||
let message = format!("cwd is not absolute: {cwd:?}");
|
||
error!(message);
|
||
let event = Event {
|
||
id: sub.id,
|
||
msg: EventMsg::Error(ErrorEvent { message }),
|
||
};
|
||
if let Err(e) = tx_event.send(event).await {
|
||
error!("failed to send error message: {e:?}");
|
||
}
|
||
return;
|
||
}
|
||
// Optionally resume an existing rollout.
|
||
let mut restored_items: Option<Vec<ResponseItem>> = None;
|
||
let rollout_recorder: Option<RolloutRecorder> =
|
||
if let Some(path) = resume_path.as_ref() {
|
||
match RolloutRecorder::resume(path, cwd.clone()).await {
|
||
Ok((rec, saved)) => {
|
||
session_id = saved.session_id;
|
||
if !saved.items.is_empty() {
|
||
restored_items = Some(saved.items);
|
||
}
|
||
Some(rec)
|
||
}
|
||
Err(e) => {
|
||
warn!("failed to resume rollout from {path:?}: {e}");
|
||
None
|
||
}
|
||
}
|
||
} else {
|
||
None
|
||
};
|
||
|
||
let rollout_recorder = match rollout_recorder {
|
||
Some(rec) => Some(rec),
|
||
None => {
|
||
match RolloutRecorder::new(&config, session_id, user_instructions.clone())
|
||
.await
|
||
{
|
||
Ok(r) => Some(r),
|
||
Err(e) => {
|
||
warn!("failed to initialise rollout recorder: {e}");
|
||
None
|
||
}
|
||
}
|
||
}
|
||
};
|
||
|
||
let client = ModelClient::new(
|
||
config.clone(),
|
||
auth.clone(),
|
||
provider.clone(),
|
||
model_reasoning_effort,
|
||
model_reasoning_summary,
|
||
session_id,
|
||
);
|
||
|
||
// abort any current running session and clone its state
|
||
let state = match sess.take() {
|
||
Some(sess) => {
|
||
sess.abort();
|
||
sess.state.lock().unwrap().partial_clone()
|
||
}
|
||
None => State {
|
||
history: ConversationHistory::new(),
|
||
..Default::default()
|
||
},
|
||
};
|
||
|
||
let writable_roots = Mutex::new(get_writable_roots(&cwd));
|
||
|
||
// Error messages to dispatch after SessionConfigured is sent.
|
||
let mut mcp_connection_errors = Vec::<Event>::new();
|
||
let (mcp_connection_manager, failed_clients) =
|
||
match McpConnectionManager::new(config.mcp_servers.clone()).await {
|
||
Ok((mgr, failures)) => (mgr, failures),
|
||
Err(e) => {
|
||
let message = format!("Failed to create MCP connection manager: {e:#}");
|
||
error!("{message}");
|
||
mcp_connection_errors.push(Event {
|
||
id: sub.id.clone(),
|
||
msg: EventMsg::Error(ErrorEvent { message }),
|
||
});
|
||
(McpConnectionManager::default(), Default::default())
|
||
}
|
||
};
|
||
|
||
// Surface individual client start-up failures to the user.
|
||
if !failed_clients.is_empty() {
|
||
for (server_name, err) in failed_clients {
|
||
let message =
|
||
format!("MCP client for `{server_name}` failed to start: {err:#}");
|
||
error!("{message}");
|
||
mcp_connection_errors.push(Event {
|
||
id: sub.id.clone(),
|
||
msg: EventMsg::Error(ErrorEvent { message }),
|
||
});
|
||
}
|
||
}
|
||
let default_shell = shell::default_user_shell().await;
|
||
sess = Some(Arc::new(Session {
|
||
client,
|
||
tools_config: ToolsConfig::new(
|
||
&config.model_family,
|
||
approval_policy,
|
||
sandbox_policy.clone(),
|
||
config.include_plan_tool,
|
||
),
|
||
tx_event: tx_event.clone(),
|
||
ctrl_c: Arc::clone(&ctrl_c),
|
||
user_instructions,
|
||
base_instructions,
|
||
approval_policy,
|
||
sandbox_policy,
|
||
shell_environment_policy: config.shell_environment_policy.clone(),
|
||
cwd,
|
||
writable_roots,
|
||
mcp_connection_manager,
|
||
notify,
|
||
state: Mutex::new(state),
|
||
rollout: Mutex::new(rollout_recorder),
|
||
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
||
disable_response_storage,
|
||
user_shell: default_shell,
|
||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||
}));
|
||
|
||
// Patch restored state into the newly created session.
|
||
if let Some(sess_arc) = &sess {
|
||
if restored_items.is_some() {
|
||
let mut st = sess_arc.state.lock().unwrap();
|
||
st.history.record_items(restored_items.unwrap().iter());
|
||
}
|
||
}
|
||
|
||
// Gather history metadata for SessionConfiguredEvent.
|
||
let (history_log_id, history_entry_count) =
|
||
crate::message_history::history_metadata(&config).await;
|
||
|
||
// ack
|
||
let events = std::iter::once(Event {
|
||
id: sub.id.clone(),
|
||
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
|
||
session_id,
|
||
model,
|
||
history_log_id,
|
||
history_entry_count,
|
||
}),
|
||
})
|
||
.chain(mcp_connection_errors.into_iter());
|
||
for event in events {
|
||
if let Err(e) = tx_event.send(event).await {
|
||
error!("failed to send event: {e:?}");
|
||
}
|
||
}
|
||
}
|
||
Op::UserInput { items } => {
|
||
let sess = match sess.as_ref() {
|
||
Some(sess) => sess,
|
||
None => {
|
||
send_no_session_event(sub.id).await;
|
||
continue;
|
||
}
|
||
};
|
||
|
||
// attempt to inject input into current task
|
||
if let Err(items) = sess.inject_input(items) {
|
||
// no current task, spawn a new one
|
||
let task = AgentTask::spawn(Arc::clone(sess), sub.id, items);
|
||
sess.set_task(task);
|
||
}
|
||
}
|
||
Op::ExecApproval { id, decision } => {
|
||
let sess = match sess.as_ref() {
|
||
Some(sess) => sess,
|
||
None => {
|
||
send_no_session_event(sub.id).await;
|
||
continue;
|
||
}
|
||
};
|
||
match decision {
|
||
ReviewDecision::Abort => {
|
||
sess.abort();
|
||
}
|
||
other => sess.notify_approval(&id, other),
|
||
}
|
||
}
|
||
Op::PatchApproval { id, decision } => {
|
||
let sess = match sess.as_ref() {
|
||
Some(sess) => sess,
|
||
None => {
|
||
send_no_session_event(sub.id).await;
|
||
continue;
|
||
}
|
||
};
|
||
match decision {
|
||
ReviewDecision::Abort => {
|
||
sess.abort();
|
||
}
|
||
other => sess.notify_approval(&id, other),
|
||
}
|
||
}
|
||
Op::AddToHistory { text } => {
|
||
// TODO: What should we do if we got AddToHistory before ConfigureSession?
|
||
// currently, if ConfigureSession has resume path, this history will be ignored
|
||
let id = session_id;
|
||
let config = config.clone();
|
||
tokio::spawn(async move {
|
||
if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await
|
||
{
|
||
warn!("failed to append to message history: {e}");
|
||
}
|
||
});
|
||
}
|
||
|
||
Op::GetHistoryEntryRequest { offset, log_id } => {
|
||
let config = config.clone();
|
||
let tx_event = tx_event.clone();
|
||
let sub_id = sub.id.clone();
|
||
|
||
tokio::spawn(async move {
|
||
// Run lookup in blocking thread because it does file IO + locking.
|
||
let entry_opt = tokio::task::spawn_blocking(move || {
|
||
crate::message_history::lookup(log_id, offset, &config)
|
||
})
|
||
.await
|
||
.unwrap_or(None);
|
||
|
||
let event = Event {
|
||
id: sub_id,
|
||
msg: EventMsg::GetHistoryEntryResponse(
|
||
crate::protocol::GetHistoryEntryResponseEvent {
|
||
offset,
|
||
log_id,
|
||
entry: entry_opt,
|
||
},
|
||
),
|
||
};
|
||
|
||
if let Err(e) = tx_event.send(event).await {
|
||
warn!("failed to send GetHistoryEntryResponse event: {e}");
|
||
}
|
||
});
|
||
}
|
||
Op::Compact => {
|
||
let sess = match sess.as_ref() {
|
||
Some(sess) => sess,
|
||
None => {
|
||
send_no_session_event(sub.id).await;
|
||
continue;
|
||
}
|
||
};
|
||
|
||
// Create a summarization request as user input
|
||
const SUMMARIZATION_PROMPT: &str = include_str!("prompt_for_compact_command.md");
|
||
|
||
// Attempt to inject input into current task
|
||
if let Err(items) = sess.inject_input(vec![InputItem::Text {
|
||
text: "Start Summarization".to_string(),
|
||
}]) {
|
||
let task = AgentTask::compact(
|
||
sess.clone(),
|
||
sub.id,
|
||
items,
|
||
SUMMARIZATION_PROMPT.to_string(),
|
||
);
|
||
sess.set_task(task);
|
||
}
|
||
}
|
||
Op::Shutdown => {
|
||
info!("Shutting down Codex instance");
|
||
|
||
// Gracefully flush and shutdown rollout recorder on session end so tests
|
||
// that inspect the rollout file do not race with the background writer.
|
||
if let Some(sess_arc) = sess {
|
||
let recorder_opt = sess_arc.rollout.lock().unwrap().take();
|
||
if let Some(rec) = recorder_opt {
|
||
if let Err(e) = rec.shutdown().await {
|
||
warn!("failed to shutdown rollout recorder: {e}");
|
||
let event = Event {
|
||
id: sub.id.clone(),
|
||
msg: EventMsg::Error(ErrorEvent {
|
||
message: "Failed to shutdown rollout recorder".to_string(),
|
||
}),
|
||
};
|
||
if let Err(e) = tx_event.send(event).await {
|
||
warn!("failed to send error message: {e:?}");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
let event = Event {
|
||
id: sub.id.clone(),
|
||
msg: EventMsg::ShutdownComplete,
|
||
};
|
||
if let Err(e) = tx_event.send(event).await {
|
||
warn!("failed to send Shutdown event: {e}");
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
debug!("Agent loop exited");
|
||
}
|
||
|
||
/// Takes a user message as input and runs a loop where, at each turn, the model
|
||
/// replies with either:
|
||
///
|
||
/// - requested function calls
|
||
/// - an assistant message
|
||
///
|
||
/// While it is possible for the model to return multiple of these items in a
|
||
/// single turn, in practice, we generally one item per turn:
|
||
///
|
||
/// - If the model requests a function call, we execute it and send the output
|
||
/// back to the model in the next turn.
|
||
/// - If the model sends only an assistant message, we record it in the
|
||
/// conversation history and consider the task complete.
|
||
async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
|
||
if input.is_empty() {
|
||
return;
|
||
}
|
||
let event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::TaskStarted,
|
||
};
|
||
if sess.tx_event.send(event).await.is_err() {
|
||
return;
|
||
}
|
||
|
||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
|
||
.await;
|
||
|
||
let mut last_agent_message: Option<String> = None;
|
||
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
|
||
// many turns, from the perspective of the user, it is a single turn.
|
||
let mut turn_diff_tracker = TurnDiffTracker::new();
|
||
|
||
loop {
|
||
// Note that pending_input would be something like a message the user
|
||
// submitted through the UI while the model was running. Though the UI
|
||
// may support this, the model might not.
|
||
let pending_input = sess
|
||
.get_pending_input()
|
||
.into_iter()
|
||
.map(ResponseItem::from)
|
||
.collect::<Vec<ResponseItem>>();
|
||
sess.record_conversation_items(&pending_input).await;
|
||
|
||
// Construct the input that we will send to the model. When using the
|
||
// Chat completions API (or ZDR clients), the model needs the full
|
||
// conversation history on each turn. The rollout file, however, should
|
||
// only record the new items that originated in this turn so that it
|
||
// represents an append-only log without duplicates.
|
||
let turn_input: Vec<ResponseItem> = sess.turn_input_with_history(pending_input);
|
||
|
||
let turn_input_messages: Vec<String> = turn_input
|
||
.iter()
|
||
.filter_map(|item| match item {
|
||
ResponseItem::Message { content, .. } => Some(content),
|
||
_ => None,
|
||
})
|
||
.flat_map(|content| {
|
||
content.iter().filter_map(|item| match item {
|
||
ContentItem::OutputText { text } => Some(text.clone()),
|
||
_ => None,
|
||
})
|
||
})
|
||
.collect();
|
||
match run_turn(&sess, &mut turn_diff_tracker, sub_id.clone(), turn_input).await {
|
||
Ok(turn_output) => {
|
||
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
|
||
let mut responses = Vec::<ResponseInputItem>::new();
|
||
for processed_response_item in turn_output {
|
||
let ProcessedResponseItem { item, response } = processed_response_item;
|
||
match (&item, &response) {
|
||
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
|
||
// If the model returned a message, we need to record it.
|
||
items_to_record_in_conversation_history.push(item);
|
||
}
|
||
(
|
||
ResponseItem::LocalShellCall { .. },
|
||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||
) => {
|
||
items_to_record_in_conversation_history.push(item);
|
||
items_to_record_in_conversation_history.push(
|
||
ResponseItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: output.clone(),
|
||
},
|
||
);
|
||
}
|
||
(
|
||
ResponseItem::FunctionCall { .. },
|
||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||
) => {
|
||
items_to_record_in_conversation_history.push(item);
|
||
items_to_record_in_conversation_history.push(
|
||
ResponseItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: output.clone(),
|
||
},
|
||
);
|
||
}
|
||
(
|
||
ResponseItem::FunctionCall { .. },
|
||
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
|
||
) => {
|
||
items_to_record_in_conversation_history.push(item);
|
||
let (content, success): (String, Option<bool>) = match result {
|
||
Ok(CallToolResult {
|
||
content,
|
||
is_error,
|
||
structured_content: _,
|
||
}) => match serde_json::to_string(content) {
|
||
Ok(content) => (content, *is_error),
|
||
Err(e) => {
|
||
warn!("Failed to serialize MCP tool call output: {e}");
|
||
(e.to_string(), Some(true))
|
||
}
|
||
},
|
||
Err(e) => (e.clone(), Some(true)),
|
||
};
|
||
items_to_record_in_conversation_history.push(
|
||
ResponseItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: FunctionCallOutputPayload { content, success },
|
||
},
|
||
);
|
||
}
|
||
(
|
||
ResponseItem::Reasoning {
|
||
id,
|
||
summary,
|
||
content,
|
||
encrypted_content,
|
||
},
|
||
None,
|
||
) => {
|
||
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
|
||
id: id.clone(),
|
||
summary: summary.clone(),
|
||
content: content.clone(),
|
||
encrypted_content: encrypted_content.clone(),
|
||
});
|
||
}
|
||
_ => {
|
||
warn!("Unexpected response item: {item:?} with response: {response:?}");
|
||
}
|
||
};
|
||
if let Some(response) = response {
|
||
responses.push(response);
|
||
}
|
||
}
|
||
|
||
// Only attempt to take the lock if there is something to record.
|
||
if !items_to_record_in_conversation_history.is_empty() {
|
||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||
.await;
|
||
}
|
||
|
||
if responses.is_empty() {
|
||
debug!("Turn completed");
|
||
last_agent_message = get_last_assistant_message_from_turn(
|
||
&items_to_record_in_conversation_history,
|
||
);
|
||
sess.maybe_notify(UserNotification::AgentTurnComplete {
|
||
turn_id: sub_id.clone(),
|
||
input_messages: turn_input_messages,
|
||
last_assistant_message: last_agent_message.clone(),
|
||
});
|
||
break;
|
||
}
|
||
}
|
||
Err(e) => {
|
||
info!("Turn error: {e:#}");
|
||
let event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::Error(ErrorEvent {
|
||
message: e.to_string(),
|
||
}),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
// let the user continue the conversation
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
sess.remove_task(&sub_id);
|
||
let event = Event {
|
||
id: sub_id,
|
||
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
}
|
||
|
||
async fn run_turn(
|
||
sess: &Session,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
sub_id: String,
|
||
input: Vec<ResponseItem>,
|
||
) -> CodexResult<Vec<ProcessedResponseItem>> {
|
||
let tools = get_openai_tools(
|
||
&sess.tools_config,
|
||
Some(sess.mcp_connection_manager.list_all_tools()),
|
||
);
|
||
|
||
let prompt = Prompt {
|
||
input,
|
||
user_instructions: sess.user_instructions.clone(),
|
||
store: !sess.disable_response_storage,
|
||
tools,
|
||
base_instructions_override: sess.base_instructions.clone(),
|
||
environment_context: Some(EnvironmentContext {
|
||
cwd: sess.cwd.clone(),
|
||
approval_policy: sess.approval_policy,
|
||
sandbox_policy: sess.sandbox_policy.clone(),
|
||
}),
|
||
};
|
||
|
||
let mut retries = 0;
|
||
loop {
|
||
match try_run_turn(sess, turn_diff_tracker, &sub_id, &prompt).await {
|
||
Ok(output) => return Ok(output),
|
||
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
|
||
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
|
||
Err(e @ (CodexErr::UsageLimitReached(_) | CodexErr::UsageNotIncluded)) => {
|
||
return Err(e);
|
||
}
|
||
Err(e) => {
|
||
// Use the configured provider-specific stream retry budget.
|
||
let max_retries = sess.client.get_provider().stream_max_retries();
|
||
if retries < max_retries {
|
||
retries += 1;
|
||
let delay = backoff(retries);
|
||
warn!(
|
||
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
|
||
);
|
||
|
||
// Surface retry information to any UI/front‑end so the
|
||
// user understands what is happening instead of staring
|
||
// at a seemingly frozen screen.
|
||
sess.notify_background_event(
|
||
&sub_id,
|
||
format!(
|
||
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
|
||
),
|
||
)
|
||
.await;
|
||
|
||
tokio::time::sleep(delay).await;
|
||
} else {
|
||
return Err(e);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// When the model is prompted, it returns a stream of events. Some of these
|
||
/// events map to a `ResponseItem`. A `ResponseItem` may need to be
|
||
/// "handled" such that it produces a `ResponseInputItem` that needs to be
|
||
/// sent back to the model on the next turn.
|
||
#[derive(Debug)]
|
||
struct ProcessedResponseItem {
|
||
item: ResponseItem,
|
||
response: Option<ResponseInputItem>,
|
||
}
|
||
|
||
async fn try_run_turn(
|
||
sess: &Session,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
sub_id: &str,
|
||
prompt: &Prompt,
|
||
) -> CodexResult<Vec<ProcessedResponseItem>> {
|
||
// call_ids that are part of this response.
|
||
let completed_call_ids = prompt
|
||
.input
|
||
.iter()
|
||
.filter_map(|ri| match ri {
|
||
ResponseItem::FunctionCallOutput { call_id, .. } => Some(call_id),
|
||
ResponseItem::LocalShellCall {
|
||
call_id: Some(call_id),
|
||
..
|
||
} => Some(call_id),
|
||
_ => None,
|
||
})
|
||
.collect::<Vec<_>>();
|
||
|
||
// call_ids that were pending but are not part of this response.
|
||
// This usually happens because the user interrupted the model before we responded to one of its tool calls
|
||
// and then the user sent a follow-up message.
|
||
let missing_calls = {
|
||
prompt
|
||
.input
|
||
.iter()
|
||
.filter_map(|ri| match ri {
|
||
ResponseItem::FunctionCall { call_id, .. } => Some(call_id),
|
||
ResponseItem::LocalShellCall {
|
||
call_id: Some(call_id),
|
||
..
|
||
} => Some(call_id),
|
||
_ => None,
|
||
})
|
||
.filter_map(|call_id| {
|
||
if completed_call_ids.contains(&call_id) {
|
||
None
|
||
} else {
|
||
Some(call_id.clone())
|
||
}
|
||
})
|
||
.map(|call_id| ResponseItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: FunctionCallOutputPayload {
|
||
content: "aborted".to_string(),
|
||
success: Some(false),
|
||
},
|
||
})
|
||
.collect::<Vec<_>>()
|
||
};
|
||
let prompt: Cow<Prompt> = if missing_calls.is_empty() {
|
||
Cow::Borrowed(prompt)
|
||
} else {
|
||
// Add the synthetic aborted missing calls to the beginning of the input to ensure all call ids have responses.
|
||
let input = [missing_calls, prompt.input.clone()].concat();
|
||
Cow::Owned(Prompt {
|
||
input,
|
||
..prompt.clone()
|
||
})
|
||
};
|
||
|
||
let mut stream = sess.client.clone().stream(&prompt).await?;
|
||
|
||
let mut output = Vec::new();
|
||
loop {
|
||
// Poll the next item from the model stream. We must inspect *both* Ok and Err
|
||
// cases so that transient stream failures (e.g., dropped SSE connection before
|
||
// `response.completed`) bubble up and trigger the caller's retry logic.
|
||
let event = stream.next().await;
|
||
let Some(event) = event else {
|
||
// Channel closed without yielding a final Completed event or explicit error.
|
||
// Treat as a disconnected stream so the caller can retry.
|
||
return Err(CodexErr::Stream(
|
||
"stream closed before response.completed".into(),
|
||
));
|
||
};
|
||
|
||
let event = match event {
|
||
Ok(ev) => ev,
|
||
Err(e) => {
|
||
// Propagate the underlying stream error to the caller (run_turn), which
|
||
// will apply the configured `stream_max_retries` policy.
|
||
return Err(e);
|
||
}
|
||
};
|
||
|
||
match event {
|
||
ResponseEvent::Created => {}
|
||
ResponseEvent::OutputItemDone(item) => {
|
||
let response =
|
||
handle_response_item(sess, turn_diff_tracker, sub_id, item.clone()).await?;
|
||
|
||
output.push(ProcessedResponseItem { item, response });
|
||
}
|
||
ResponseEvent::Completed {
|
||
response_id: _,
|
||
token_usage,
|
||
} => {
|
||
if let Some(token_usage) = token_usage {
|
||
sess.tx_event
|
||
.send(Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::TokenCount(token_usage),
|
||
})
|
||
.await
|
||
.ok();
|
||
}
|
||
|
||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||
if let Ok(Some(unified_diff)) = unified_diff {
|
||
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg,
|
||
};
|
||
let _ = sess.tx_event.send(event).await;
|
||
}
|
||
|
||
return Ok(output);
|
||
}
|
||
ResponseEvent::OutputTextDelta(delta) => {
|
||
{
|
||
let mut st = sess.state.lock().unwrap();
|
||
st.history.append_assistant_text(&delta);
|
||
}
|
||
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
}
|
||
ResponseEvent::ReasoningSummaryDelta(delta) => {
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
}
|
||
ResponseEvent::ReasoningContentDelta(delta) => {
|
||
if sess.show_raw_agent_reasoning {
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::AgentReasoningRawContentDelta(
|
||
AgentReasoningRawContentDeltaEvent { delta },
|
||
),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn run_compact_task(
|
||
sess: Arc<Session>,
|
||
sub_id: String,
|
||
input: Vec<InputItem>,
|
||
compact_instructions: String,
|
||
) {
|
||
let start_event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::TaskStarted,
|
||
};
|
||
if sess.tx_event.send(start_event).await.is_err() {
|
||
return;
|
||
}
|
||
|
||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||
let turn_input: Vec<ResponseItem> =
|
||
sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]);
|
||
|
||
let prompt = Prompt {
|
||
input: turn_input,
|
||
user_instructions: None,
|
||
store: !sess.disable_response_storage,
|
||
environment_context: None,
|
||
tools: Vec::new(),
|
||
base_instructions_override: Some(compact_instructions.clone()),
|
||
};
|
||
|
||
let max_retries = sess.client.get_provider().stream_max_retries();
|
||
let mut retries = 0;
|
||
|
||
loop {
|
||
let attempt_result = drain_to_completed(&sess, &sub_id, &prompt).await;
|
||
|
||
match attempt_result {
|
||
Ok(()) => break,
|
||
Err(CodexErr::Interrupted) => return,
|
||
Err(e) => {
|
||
if retries < max_retries {
|
||
retries += 1;
|
||
let delay = backoff(retries);
|
||
sess.notify_background_event(
|
||
&sub_id,
|
||
format!(
|
||
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
|
||
),
|
||
)
|
||
.await;
|
||
tokio::time::sleep(delay).await;
|
||
continue;
|
||
} else {
|
||
let event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::Error(ErrorEvent {
|
||
message: e.to_string(),
|
||
}),
|
||
};
|
||
sess.send_event(event).await;
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
sess.remove_task(&sub_id);
|
||
let event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::AgentMessage(AgentMessageEvent {
|
||
message: "Compact task completed".to_string(),
|
||
}),
|
||
};
|
||
sess.send_event(event).await;
|
||
let event = Event {
|
||
id: sub_id.clone(),
|
||
msg: EventMsg::TaskComplete(TaskCompleteEvent {
|
||
last_agent_message: None,
|
||
}),
|
||
};
|
||
sess.send_event(event).await;
|
||
|
||
let mut state = sess.state.lock().unwrap();
|
||
state.history.keep_last_messages(1);
|
||
}
|
||
|
||
async fn handle_response_item(
|
||
sess: &Session,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
sub_id: &str,
|
||
item: ResponseItem,
|
||
) -> CodexResult<Option<ResponseInputItem>> {
|
||
debug!(?item, "Output item");
|
||
let output = match item {
|
||
ResponseItem::Message { content, .. } => {
|
||
for item in content {
|
||
if let ContentItem::OutputText { text } = item {
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
}
|
||
}
|
||
None
|
||
}
|
||
ResponseItem::Reasoning {
|
||
id: _,
|
||
summary,
|
||
content,
|
||
encrypted_content: _,
|
||
} => {
|
||
for item in summary {
|
||
let text = match item {
|
||
ReasoningItemReasoningSummary::SummaryText { text } => text,
|
||
};
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
}
|
||
if sess.show_raw_agent_reasoning && content.is_some() {
|
||
let content = content.unwrap();
|
||
for item in content {
|
||
let text = match item {
|
||
ReasoningItemContent::ReasoningText { text } => text,
|
||
};
|
||
let event = Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
|
||
text,
|
||
}),
|
||
};
|
||
sess.tx_event.send(event).await.ok();
|
||
}
|
||
}
|
||
None
|
||
}
|
||
ResponseItem::FunctionCall {
|
||
name,
|
||
arguments,
|
||
call_id,
|
||
..
|
||
} => {
|
||
info!("FunctionCall: {arguments}");
|
||
Some(
|
||
handle_function_call(
|
||
sess,
|
||
turn_diff_tracker,
|
||
sub_id.to_string(),
|
||
name,
|
||
arguments,
|
||
call_id,
|
||
)
|
||
.await,
|
||
)
|
||
}
|
||
ResponseItem::LocalShellCall {
|
||
id,
|
||
call_id,
|
||
status: _,
|
||
action,
|
||
} => {
|
||
let LocalShellAction::Exec(action) = action;
|
||
tracing::info!("LocalShellCall: {action:?}");
|
||
let params = ShellToolCallParams {
|
||
command: action.command,
|
||
workdir: action.working_directory,
|
||
timeout_ms: action.timeout_ms,
|
||
with_escalated_permissions: None,
|
||
justification: None,
|
||
};
|
||
let effective_call_id = match (call_id, id) {
|
||
(Some(call_id), _) => call_id,
|
||
(None, Some(id)) => id,
|
||
(None, None) => {
|
||
error!("LocalShellCall without call_id or id");
|
||
return Ok(Some(ResponseInputItem::FunctionCallOutput {
|
||
call_id: "".to_string(),
|
||
output: FunctionCallOutputPayload {
|
||
content: "LocalShellCall without call_id or id".to_string(),
|
||
success: None,
|
||
},
|
||
}));
|
||
}
|
||
};
|
||
|
||
let exec_params = to_exec_params(params, sess);
|
||
Some(
|
||
handle_container_exec_with_params(
|
||
exec_params,
|
||
sess,
|
||
turn_diff_tracker,
|
||
sub_id.to_string(),
|
||
effective_call_id,
|
||
)
|
||
.await,
|
||
)
|
||
}
|
||
ResponseItem::FunctionCallOutput { .. } => {
|
||
debug!("unexpected FunctionCallOutput from stream");
|
||
None
|
||
}
|
||
ResponseItem::Other => None,
|
||
};
|
||
Ok(output)
|
||
}
|
||
|
||
async fn handle_function_call(
|
||
sess: &Session,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
sub_id: String,
|
||
name: String,
|
||
arguments: String,
|
||
call_id: String,
|
||
) -> ResponseInputItem {
|
||
match name.as_str() {
|
||
"container.exec" | "shell" => {
|
||
let params = match parse_container_exec_arguments(arguments, sess, &call_id) {
|
||
Ok(params) => params,
|
||
Err(output) => {
|
||
return *output;
|
||
}
|
||
};
|
||
handle_container_exec_with_params(params, sess, turn_diff_tracker, sub_id, call_id)
|
||
.await
|
||
}
|
||
"update_plan" => handle_update_plan(sess, arguments, sub_id, call_id).await,
|
||
_ => {
|
||
match sess.mcp_connection_manager.parse_tool_name(&name) {
|
||
Some((server, tool_name)) => {
|
||
// TODO(mbolin): Determine appropriate timeout for tool call.
|
||
let timeout = None;
|
||
handle_mcp_tool_call(
|
||
sess, &sub_id, call_id, server, tool_name, arguments, timeout,
|
||
)
|
||
.await
|
||
}
|
||
None => {
|
||
// Unknown function: reply with structured failure so the model can adapt.
|
||
ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: format!("unsupported call: {name}"),
|
||
success: None,
|
||
},
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
fn to_exec_params(params: ShellToolCallParams, sess: &Session) -> ExecParams {
|
||
ExecParams {
|
||
command: params.command,
|
||
cwd: sess.resolve_path(params.workdir.clone()),
|
||
timeout_ms: params.timeout_ms,
|
||
env: create_env(&sess.shell_environment_policy),
|
||
with_escalated_permissions: params.with_escalated_permissions,
|
||
justification: params.justification,
|
||
}
|
||
}
|
||
|
||
fn parse_container_exec_arguments(
|
||
arguments: String,
|
||
sess: &Session,
|
||
call_id: &str,
|
||
) -> Result<ExecParams, Box<ResponseInputItem>> {
|
||
// parse command
|
||
match serde_json::from_str::<ShellToolCallParams>(&arguments) {
|
||
Ok(shell_tool_call_params) => Ok(to_exec_params(shell_tool_call_params, sess)),
|
||
Err(e) => {
|
||
// allow model to re-sample
|
||
let output = ResponseInputItem::FunctionCallOutput {
|
||
call_id: call_id.to_string(),
|
||
output: FunctionCallOutputPayload {
|
||
content: format!("failed to parse function arguments: {e}"),
|
||
success: None,
|
||
},
|
||
};
|
||
Err(Box::new(output))
|
||
}
|
||
}
|
||
}
|
||
|
||
pub struct ExecInvokeArgs<'a> {
|
||
pub params: ExecParams,
|
||
pub sandbox_type: SandboxType,
|
||
pub ctrl_c: Arc<Notify>,
|
||
pub sandbox_policy: &'a SandboxPolicy,
|
||
pub codex_linux_sandbox_exe: &'a Option<PathBuf>,
|
||
pub stdout_stream: Option<StdoutStream>,
|
||
}
|
||
|
||
fn maybe_run_with_user_profile(params: ExecParams, sess: &Session) -> ExecParams {
|
||
if sess.shell_environment_policy.use_profile {
|
||
let command = sess
|
||
.user_shell
|
||
.format_default_shell_invocation(params.command.clone());
|
||
if let Some(command) = command {
|
||
return ExecParams { command, ..params };
|
||
}
|
||
}
|
||
params
|
||
}
|
||
|
||
async fn handle_container_exec_with_params(
|
||
params: ExecParams,
|
||
sess: &Session,
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
sub_id: String,
|
||
call_id: String,
|
||
) -> ResponseInputItem {
|
||
// check if this was a patch, and apply it if so
|
||
let apply_patch_exec = match maybe_parse_apply_patch_verified(¶ms.command, ¶ms.cwd) {
|
||
MaybeApplyPatchVerified::Body(changes) => {
|
||
match apply_patch::apply_patch(sess, &sub_id, &call_id, changes).await {
|
||
InternalApplyPatchInvocation::Output(item) => return item,
|
||
InternalApplyPatchInvocation::DelegateToExec(apply_patch_exec) => {
|
||
Some(apply_patch_exec)
|
||
}
|
||
}
|
||
}
|
||
MaybeApplyPatchVerified::CorrectnessError(parse_error) => {
|
||
// It looks like an invocation of `apply_patch`, but we
|
||
// could not resolve it into a patch that would apply
|
||
// cleanly. Return to model for resample.
|
||
return ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: format!("error: {parse_error:#}"),
|
||
success: None,
|
||
},
|
||
};
|
||
}
|
||
MaybeApplyPatchVerified::ShellParseError(error) => {
|
||
trace!("Failed to parse shell command, {error:?}");
|
||
None
|
||
}
|
||
MaybeApplyPatchVerified::NotApplyPatch => None,
|
||
};
|
||
|
||
let (params, safety, command_for_display) = match &apply_patch_exec {
|
||
Some(ApplyPatchExec {
|
||
action: ApplyPatchAction { patch, cwd, .. },
|
||
user_explicitly_approved_this_action,
|
||
}) => {
|
||
let path_to_codex = std::env::current_exe()
|
||
.ok()
|
||
.map(|p| p.to_string_lossy().to_string());
|
||
let Some(path_to_codex) = path_to_codex else {
|
||
return ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: "failed to determine path to codex executable".to_string(),
|
||
success: None,
|
||
},
|
||
};
|
||
};
|
||
|
||
let params = ExecParams {
|
||
command: vec![
|
||
path_to_codex,
|
||
CODEX_APPLY_PATCH_ARG1.to_string(),
|
||
patch.clone(),
|
||
],
|
||
cwd: cwd.clone(),
|
||
timeout_ms: params.timeout_ms,
|
||
env: HashMap::new(),
|
||
with_escalated_permissions: params.with_escalated_permissions,
|
||
justification: params.justification.clone(),
|
||
};
|
||
let safety = if *user_explicitly_approved_this_action {
|
||
SafetyCheck::AutoApprove {
|
||
sandbox_type: SandboxType::None,
|
||
}
|
||
} else {
|
||
assess_safety_for_untrusted_command(
|
||
sess.approval_policy,
|
||
&sess.sandbox_policy,
|
||
params.with_escalated_permissions.unwrap_or(false),
|
||
)
|
||
};
|
||
(
|
||
params,
|
||
safety,
|
||
vec!["apply_patch".to_string(), patch.clone()],
|
||
)
|
||
}
|
||
None => {
|
||
let safety = {
|
||
let state = sess.state.lock().unwrap();
|
||
assess_command_safety(
|
||
¶ms.command,
|
||
sess.approval_policy,
|
||
&sess.sandbox_policy,
|
||
&state.approved_commands,
|
||
params.with_escalated_permissions.unwrap_or(false),
|
||
)
|
||
};
|
||
let command_for_display = params.command.clone();
|
||
(params, safety, command_for_display)
|
||
}
|
||
};
|
||
|
||
let sandbox_type = match safety {
|
||
SafetyCheck::AutoApprove { sandbox_type } => sandbox_type,
|
||
SafetyCheck::AskUser => {
|
||
let rx_approve = sess
|
||
.request_command_approval(
|
||
sub_id.clone(),
|
||
call_id.clone(),
|
||
params.command.clone(),
|
||
params.cwd.clone(),
|
||
params.justification.clone(),
|
||
)
|
||
.await;
|
||
match rx_approve.await.unwrap_or_default() {
|
||
ReviewDecision::Approved => (),
|
||
ReviewDecision::ApprovedForSession => {
|
||
sess.add_approved_command(params.command.clone());
|
||
}
|
||
ReviewDecision::Denied | ReviewDecision::Abort => {
|
||
return ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: "exec command rejected by user".to_string(),
|
||
success: None,
|
||
},
|
||
};
|
||
}
|
||
}
|
||
// No sandboxing is applied because the user has given
|
||
// explicit approval. Often, we end up in this case because
|
||
// the command cannot be run in a sandbox, such as
|
||
// installing a new dependency that requires network access.
|
||
SandboxType::None
|
||
}
|
||
SafetyCheck::Reject { reason } => {
|
||
return ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: format!("exec command rejected: {reason}"),
|
||
success: None,
|
||
},
|
||
};
|
||
}
|
||
};
|
||
|
||
let exec_command_context = ExecCommandContext {
|
||
sub_id: sub_id.clone(),
|
||
call_id: call_id.clone(),
|
||
command_for_display: command_for_display.clone(),
|
||
cwd: params.cwd.clone(),
|
||
apply_patch: apply_patch_exec.map(
|
||
|ApplyPatchExec {
|
||
action,
|
||
user_explicitly_approved_this_action,
|
||
}| ApplyPatchCommandContext {
|
||
user_explicitly_approved_this_action,
|
||
changes: convert_apply_patch_to_protocol(&action),
|
||
},
|
||
),
|
||
};
|
||
|
||
let params = maybe_run_with_user_profile(params, sess);
|
||
let output_result = sess
|
||
.run_exec_with_events(
|
||
turn_diff_tracker,
|
||
exec_command_context.clone(),
|
||
ExecInvokeArgs {
|
||
params: params.clone(),
|
||
sandbox_type,
|
||
ctrl_c: sess.ctrl_c.clone(),
|
||
sandbox_policy: &sess.sandbox_policy,
|
||
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
|
||
stdout_stream: Some(StdoutStream {
|
||
sub_id: sub_id.clone(),
|
||
call_id: call_id.clone(),
|
||
tx_event: sess.tx_event.clone(),
|
||
}),
|
||
},
|
||
)
|
||
.await;
|
||
|
||
match output_result {
|
||
Ok(output) => {
|
||
let ExecToolCallOutput {
|
||
exit_code,
|
||
stdout,
|
||
stderr,
|
||
duration,
|
||
} = &output;
|
||
|
||
let is_success = *exit_code == 0;
|
||
let content = format_exec_output(
|
||
if is_success { stdout } else { stderr },
|
||
*exit_code,
|
||
*duration,
|
||
);
|
||
ResponseInputItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: FunctionCallOutputPayload {
|
||
content,
|
||
success: Some(is_success),
|
||
},
|
||
}
|
||
}
|
||
Err(CodexErr::Sandbox(error)) => {
|
||
handle_sandbox_error(
|
||
turn_diff_tracker,
|
||
params,
|
||
exec_command_context,
|
||
error,
|
||
sandbox_type,
|
||
sess,
|
||
)
|
||
.await
|
||
}
|
||
Err(e) => ResponseInputItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: FunctionCallOutputPayload {
|
||
content: format!("execution error: {e}"),
|
||
success: None,
|
||
},
|
||
},
|
||
}
|
||
}
|
||
|
||
async fn handle_sandbox_error(
|
||
turn_diff_tracker: &mut TurnDiffTracker,
|
||
params: ExecParams,
|
||
exec_command_context: ExecCommandContext,
|
||
error: SandboxErr,
|
||
sandbox_type: SandboxType,
|
||
sess: &Session,
|
||
) -> ResponseInputItem {
|
||
let call_id = exec_command_context.call_id.clone();
|
||
let sub_id = exec_command_context.sub_id.clone();
|
||
let cwd = exec_command_context.cwd.clone();
|
||
|
||
// Early out if either the user never wants to be asked for approval, or
|
||
// we're letting the model manage escalation requests. Otherwise, continue
|
||
match sess.approval_policy {
|
||
AskForApproval::Never | AskForApproval::OnRequest => {
|
||
return ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: format!(
|
||
"failed in sandbox {sandbox_type:?} with execution error: {error}"
|
||
),
|
||
success: Some(false),
|
||
},
|
||
};
|
||
}
|
||
AskForApproval::UnlessTrusted | AskForApproval::OnFailure => (),
|
||
}
|
||
|
||
// similarly, if the command timed out, we can simply return this failure to the model
|
||
if matches!(error, SandboxErr::Timeout) {
|
||
return ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: format!(
|
||
"command timed out after {} milliseconds",
|
||
params.timeout_duration().as_millis()
|
||
),
|
||
success: Some(false),
|
||
},
|
||
};
|
||
}
|
||
|
||
// Note that when `error` is `SandboxErr::Denied`, it could be a false
|
||
// positive. That is, it may have exited with a non-zero exit code, not
|
||
// because the sandbox denied it, but because that is its expected behavior,
|
||
// i.e., a grep command that did not match anything. Ideally we would
|
||
// include additional metadata on the command to indicate whether non-zero
|
||
// exit codes merit a retry.
|
||
|
||
// For now, we categorically ask the user to retry without sandbox and
|
||
// emit the raw error as a background event.
|
||
sess.notify_background_event(&sub_id, format!("Execution failed: {error}"))
|
||
.await;
|
||
|
||
let rx_approve = sess
|
||
.request_command_approval(
|
||
sub_id.clone(),
|
||
call_id.clone(),
|
||
params.command.clone(),
|
||
cwd.clone(),
|
||
Some("command failed; retry without sandbox?".to_string()),
|
||
)
|
||
.await;
|
||
|
||
match rx_approve.await.unwrap_or_default() {
|
||
ReviewDecision::Approved | ReviewDecision::ApprovedForSession => {
|
||
// Persist this command as pre‑approved for the
|
||
// remainder of the session so future
|
||
// executions skip the sandbox directly.
|
||
// TODO(ragona): Isn't this a bug? It always saves the command in an | fork?
|
||
sess.add_approved_command(params.command.clone());
|
||
// Inform UI we are retrying without sandbox.
|
||
sess.notify_background_event(&sub_id, "retrying command without sandbox")
|
||
.await;
|
||
|
||
// This is an escalated retry; the policy will not be
|
||
// examined and the sandbox has been set to `None`.
|
||
let retry_output_result = sess
|
||
.run_exec_with_events(
|
||
turn_diff_tracker,
|
||
exec_command_context.clone(),
|
||
ExecInvokeArgs {
|
||
params,
|
||
sandbox_type: SandboxType::None,
|
||
ctrl_c: sess.ctrl_c.clone(),
|
||
sandbox_policy: &sess.sandbox_policy,
|
||
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
|
||
stdout_stream: Some(StdoutStream {
|
||
sub_id: sub_id.clone(),
|
||
call_id: call_id.clone(),
|
||
tx_event: sess.tx_event.clone(),
|
||
}),
|
||
},
|
||
)
|
||
.await;
|
||
|
||
match retry_output_result {
|
||
Ok(retry_output) => {
|
||
let ExecToolCallOutput {
|
||
exit_code,
|
||
stdout,
|
||
stderr,
|
||
duration,
|
||
} = &retry_output;
|
||
|
||
let is_success = *exit_code == 0;
|
||
let content = format_exec_output(
|
||
if is_success { stdout } else { stderr },
|
||
*exit_code,
|
||
*duration,
|
||
);
|
||
|
||
ResponseInputItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: FunctionCallOutputPayload {
|
||
content,
|
||
success: Some(is_success),
|
||
},
|
||
}
|
||
}
|
||
Err(e) => ResponseInputItem::FunctionCallOutput {
|
||
call_id: call_id.clone(),
|
||
output: FunctionCallOutputPayload {
|
||
content: format!("retry failed: {e}"),
|
||
success: None,
|
||
},
|
||
},
|
||
}
|
||
}
|
||
ReviewDecision::Denied | ReviewDecision::Abort => {
|
||
// Fall through to original failure handling.
|
||
ResponseInputItem::FunctionCallOutput {
|
||
call_id,
|
||
output: FunctionCallOutputPayload {
|
||
content: "exec command rejected by user".to_string(),
|
||
success: None,
|
||
},
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Exec output is a pre-serialized JSON payload
|
||
fn format_exec_output(output: &str, exit_code: i32, duration: Duration) -> String {
|
||
#[derive(Serialize)]
|
||
struct ExecMetadata {
|
||
exit_code: i32,
|
||
duration_seconds: f32,
|
||
}
|
||
|
||
#[derive(Serialize)]
|
||
struct ExecOutput<'a> {
|
||
output: &'a str,
|
||
metadata: ExecMetadata,
|
||
}
|
||
|
||
// round to 1 decimal place
|
||
let duration_seconds = ((duration.as_secs_f32()) * 10.0).round() / 10.0;
|
||
|
||
let payload = ExecOutput {
|
||
output,
|
||
metadata: ExecMetadata {
|
||
exit_code,
|
||
duration_seconds,
|
||
},
|
||
};
|
||
|
||
#[expect(clippy::expect_used)]
|
||
serde_json::to_string(&payload).expect("serialize ExecOutput")
|
||
}
|
||
|
||
fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
|
||
responses.iter().rev().find_map(|item| {
|
||
if let ResponseItem::Message { role, content, .. } = item {
|
||
if role == "assistant" {
|
||
content.iter().rev().find_map(|ci| {
|
||
if let ContentItem::OutputText { text } = ci {
|
||
Some(text.clone())
|
||
} else {
|
||
None
|
||
}
|
||
})
|
||
} else {
|
||
None
|
||
}
|
||
} else {
|
||
None
|
||
}
|
||
})
|
||
}
|
||
|
||
async fn drain_to_completed(sess: &Session, sub_id: &str, prompt: &Prompt) -> CodexResult<()> {
|
||
let mut stream = sess.client.clone().stream(prompt).await?;
|
||
loop {
|
||
let maybe_event = stream.next().await;
|
||
let Some(event) = maybe_event else {
|
||
return Err(CodexErr::Stream(
|
||
"stream closed before response.completed".into(),
|
||
));
|
||
};
|
||
match event {
|
||
Ok(ResponseEvent::OutputItemDone(item)) => {
|
||
// Record only to in-memory conversation history; avoid state snapshot.
|
||
let mut state = sess.state.lock().unwrap();
|
||
state.history.record_items(std::slice::from_ref(&item));
|
||
}
|
||
Ok(ResponseEvent::Completed {
|
||
response_id: _,
|
||
token_usage,
|
||
}) => {
|
||
let token_usage = match token_usage {
|
||
Some(usage) => usage,
|
||
None => {
|
||
return Err(CodexErr::Stream(
|
||
"token_usage was None in ResponseEvent::Completed".into(),
|
||
));
|
||
}
|
||
};
|
||
sess.tx_event
|
||
.send(Event {
|
||
id: sub_id.to_string(),
|
||
msg: EventMsg::TokenCount(token_usage),
|
||
})
|
||
.await
|
||
.ok();
|
||
return Ok(());
|
||
}
|
||
Ok(_) => continue,
|
||
Err(e) => return Err(e),
|
||
}
|
||
}
|
||
}
|