Phase 1: Repository & Infrastructure Setup

- Renamed directories: codex-rs -> llmx-rs, codex-cli -> llmx-cli
- Updated package.json files:
  - Root: llmx-monorepo
  - CLI: @llmx/llmx
  - SDK: @llmx/llmx-sdk
- Updated pnpm workspace configuration
- Renamed binary: codex.js -> llmx.js
- Updated environment variables: CODEX_* -> LLMX_*
- Changed repository URLs to valknar/llmx

🤖 Generated with Claude Code
This commit is contained in:
Sebastian Krüger
2025-11-11 14:01:52 +01:00
parent 052b052832
commit f237fe560d
1151 changed files with 41 additions and 35 deletions

View File

@@ -0,0 +1,29 @@
use crate::exec::ExecToolCallOutput;
use thiserror::Error;
#[derive(Debug, Error)]
pub(crate) enum UnifiedExecError {
#[error("Failed to create unified exec session: {message}")]
CreateSession { message: String },
#[error("Unknown session id {session_id}")]
UnknownSessionId { session_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for unified exec request")]
MissingCommandLine,
#[error("Command denied by sandbox: {message}")]
SandboxDenied {
message: String,
output: ExecToolCallOutput,
},
}
impl UnifiedExecError {
pub(crate) fn create_session(message: String) -> Self {
Self::CreateSession { message }
}
pub(crate) fn sandbox_denied(message: String, output: ExecToolCallOutput) -> Self {
Self::SandboxDenied { message, output }
}
}

View File

@@ -0,0 +1,441 @@
//! Unified Exec: interactive PTY execution orchestrated with approvals + sandboxing.
//!
//! Responsibilities
//! - Manages interactive PTY sessions (create, reuse, buffer output with caps).
//! - Uses the shared ToolOrchestrator to handle approval, sandbox selection, and
//! retry semantics in a single, descriptive flow.
//! - Spawns the PTY from a sandboxtransformed `ExecEnv`; on sandbox denial,
//! retries without sandbox when policy allows (no reprompt thanks to caching).
//! - Uses the shared `is_likely_sandbox_denied` heuristic to keep denial messages
//! consistent with other exec paths.
//!
//! Flow at a glance (open session)
//! 1) Build a small request `{ command, cwd }`.
//! 2) Orchestrator: approval (bypass/cache/prompt) → select sandbox → run.
//! 3) Runtime: transform `CommandSpec` → `ExecEnv` → spawn PTY.
//! 4) If denial, orchestrator retries with `SandboxType::None`.
//! 5) Session is returned with streaming output + metadata.
//!
//! This keeps policy logic and user interaction centralized while the PTY/session
//! concerns remain isolated here. The implementation is split between:
//! - `session.rs`: PTY session lifecycle + output buffering.
//! - `session_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::time::Duration;
use rand::Rng;
use rand::rng;
use tokio::sync::Mutex;
use crate::codex::Session;
use crate::codex::TurnContext;
mod errors;
mod session;
mod session_manager;
pub(crate) use errors::UnifiedExecError;
pub(crate) use session::UnifiedExecSession;
pub(crate) const DEFAULT_YIELD_TIME_MS: u64 = 10_000;
pub(crate) const MIN_YIELD_TIME_MS: u64 = 250;
pub(crate) const MAX_YIELD_TIME_MS: u64 = 30_000;
pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
pub(crate) struct UnifiedExecContext {
pub session: Arc<Session>,
pub turn: Arc<TurnContext>,
pub call_id: String,
}
impl UnifiedExecContext {
pub fn new(session: Arc<Session>, turn: Arc<TurnContext>, call_id: String) -> Self {
Self {
session,
turn,
call_id,
}
}
}
#[derive(Debug)]
pub(crate) struct ExecCommandRequest<'a> {
pub command: &'a str,
pub shell: &'a str,
pub login: bool,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
pub workdir: Option<PathBuf>,
}
#[derive(Debug)]
pub(crate) struct WriteStdinRequest<'a> {
pub session_id: i32,
pub input: &'a str,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct UnifiedExecResponse {
pub event_call_id: String,
pub chunk_id: String,
pub wall_time: Duration,
pub output: String,
pub session_id: Option<i32>,
pub exit_code: Option<i32>,
pub original_token_count: Option<usize>,
}
#[derive(Default)]
pub(crate) struct UnifiedExecSessionManager {
next_session_id: AtomicI32,
sessions: Mutex<HashMap<i32, SessionEntry>>,
}
struct SessionEntry {
session: session::UnifiedExecSession,
session_ref: Arc<Session>,
turn_ref: Arc<TurnContext>,
call_id: String,
command: String,
cwd: PathBuf,
started_at: tokio::time::Instant,
}
pub(crate) fn clamp_yield_time(yield_time_ms: Option<u64>) -> u64 {
match yield_time_ms {
Some(value) => value.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS),
None => DEFAULT_YIELD_TIME_MS,
}
}
pub(crate) fn resolve_max_tokens(max_tokens: Option<usize>) -> usize {
max_tokens.unwrap_or(DEFAULT_MAX_OUTPUT_TOKENS)
}
pub(crate) fn generate_chunk_id() -> String {
let mut rng = rng();
(0..6)
.map(|_| format!("{:x}", rng.random_range(0..16)))
.collect()
}
pub(crate) fn truncate_output_to_tokens(
output: &str,
max_tokens: usize,
) -> (String, Option<usize>) {
if max_tokens == 0 {
let total_tokens = output.chars().count();
let message = format!("{total_tokens} tokens truncated…");
return (message, Some(total_tokens));
}
let tokens: Vec<char> = output.chars().collect();
let total_tokens = tokens.len();
if total_tokens <= max_tokens {
return (output.to_string(), None);
}
let half = max_tokens / 2;
if half == 0 {
let truncated = total_tokens.saturating_sub(max_tokens);
let message = format!("{truncated} tokens truncated…");
return (message, Some(total_tokens));
}
let truncated = total_tokens.saturating_sub(half * 2);
let mut truncated_output = String::new();
truncated_output.extend(&tokens[..half]);
truncated_output.push_str(&format!("{truncated} tokens truncated…"));
truncated_output.extend(&tokens[total_tokens - half..]);
(truncated_output, Some(total_tokens))
}
#[cfg(test)]
#[cfg(unix)]
mod tests {
use super::*;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::make_session_and_context;
use crate::protocol::AskForApproval;
use crate::protocol::SandboxPolicy;
use crate::unified_exec::ExecCommandRequest;
use crate::unified_exec::WriteStdinRequest;
use core_test_support::skip_if_sandbox;
use std::sync::Arc;
use tokio::time::Duration;
use super::session::OutputBufferState;
fn test_session_and_turn() -> (Arc<Session>, Arc<TurnContext>) {
let (session, mut turn) = make_session_and_context();
turn.approval_policy = AskForApproval::Never;
turn.sandbox_policy = SandboxPolicy::DangerFullAccess;
(Arc::new(session), Arc::new(turn))
}
async fn exec_command(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
cmd: &str,
yield_time_ms: Option<u64>,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let context =
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
session
.services
.unified_exec_manager
.exec_command(
ExecCommandRequest {
command: cmd,
shell: "/bin/bash",
login: true,
yield_time_ms,
max_output_tokens: None,
workdir: None,
},
&context,
)
.await
}
async fn write_stdin(
session: &Arc<Session>,
session_id: i32,
input: &str,
yield_time_ms: Option<u64>,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
session
.services
.unified_exec_manager
.write_stdin(WriteStdinRequest {
session_id,
input,
yield_time_ms,
max_output_tokens: None,
})
.await
}
#[test]
fn push_chunk_trims_only_excess_bytes() {
let mut buffer = OutputBufferState::default();
buffer.push_chunk(vec![b'a'; UNIFIED_EXEC_OUTPUT_MAX_BYTES]);
buffer.push_chunk(vec![b'b']);
buffer.push_chunk(vec![b'c']);
assert_eq!(buffer.total_bytes, UNIFIED_EXEC_OUTPUT_MAX_BYTES);
let snapshot = buffer.snapshot();
assert_eq!(snapshot.len(), 3);
assert_eq!(
snapshot.first().unwrap().len(),
UNIFIED_EXEC_OUTPUT_MAX_BYTES - 2
);
assert_eq!(snapshot.get(2).unwrap(), &vec![b'c']);
assert_eq!(snapshot.get(1).unwrap(), &vec![b'b']);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_persists_across_requests() -> anyhow::Result<()> {
skip_if_sandbox!(Ok(()));
let (session, turn) = test_session_and_turn();
let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_id = open_shell.session_id.expect("expected session_id");
write_stdin(
&session,
session_id,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
Some(2_500),
)
.await?;
let out_2 = write_stdin(
&session,
session_id,
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
Some(2_500),
)
.await?;
assert!(
out_2.output.contains("codex"),
"expected environment variable output"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multi_unified_exec_sessions() -> anyhow::Result<()> {
skip_if_sandbox!(Ok(()));
let (session, turn) = test_session_and_turn();
let shell_a = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_a = shell_a.session_id.expect("expected session id");
write_stdin(
&session,
session_a,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
Some(2_500),
)
.await?;
let out_2 = exec_command(
&session,
&turn,
"echo $CODEX_INTERACTIVE_SHELL_VAR",
Some(2_500),
)
.await?;
assert!(
out_2.session_id.is_none(),
"short command should not retain a session"
);
assert!(
!out_2.output.contains("codex"),
"short command should run in a fresh shell"
);
let out_3 = write_stdin(
&session,
session_a,
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
Some(2_500),
)
.await?;
assert!(
out_3.output.contains("codex"),
"session should preserve state"
);
Ok(())
}
#[tokio::test]
async fn unified_exec_timeouts() -> anyhow::Result<()> {
skip_if_sandbox!(Ok(()));
let (session, turn) = test_session_and_turn();
let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_id = open_shell.session_id.expect("expected session id");
write_stdin(
&session,
session_id,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
Some(2_500),
)
.await?;
let out_2 = write_stdin(
&session,
session_id,
"sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n",
Some(10),
)
.await?;
assert!(
!out_2.output.contains("codex"),
"timeout too short should yield incomplete output"
);
tokio::time::sleep(Duration::from_secs(7)).await;
let out_3 = write_stdin(&session, session_id, "", Some(100)).await?;
assert!(
out_3.output.contains("codex"),
"subsequent poll should retrieve output"
);
Ok(())
}
#[tokio::test]
#[ignore] // Ignored while we have a better way to test this.
async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn();
let result = exec_command(&session, &turn, "echo codex", Some(120_000)).await?;
assert!(result.session_id.is_none());
assert!(result.output.contains("codex"));
Ok(())
}
#[tokio::test]
#[ignore] // Ignored while we have a better way to test this.
async fn completed_commands_do_not_persist_sessions() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn();
let result = exec_command(&session, &turn, "echo codex", Some(2_500)).await?;
assert!(
result.session_id.is_none(),
"completed command should not retain session"
);
assert!(result.output.contains("codex"));
assert!(
session
.services
.unified_exec_manager
.sessions
.lock()
.await
.is_empty()
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reusing_completed_session_returns_unknown_session() -> anyhow::Result<()> {
skip_if_sandbox!(Ok(()));
let (session, turn) = test_session_and_turn();
let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_id = open_shell.session_id.expect("expected session id");
write_stdin(&session, session_id, "exit\n", Some(2_500)).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let err = write_stdin(&session, session_id, "", Some(100))
.await
.expect_err("expected unknown session error");
match err {
UnifiedExecError::UnknownSessionId { session_id: err_id } => {
assert_eq!(err_id, session_id);
}
other => panic!("expected UnknownSessionId, got {other:?}"),
}
assert!(
!session
.services
.unified_exec_manager
.sessions
.lock()
.await
.contains_key(&session_id)
);
Ok(())
}
}

View File

@@ -0,0 +1,217 @@
#![allow(clippy::module_inception)]
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StreamOutput;
use crate::exec::is_likely_sandbox_denied;
use crate::truncate::truncate_middle;
use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::SpawnedPty;
use super::UNIFIED_EXEC_OUTPUT_MAX_BYTES;
use super::UnifiedExecError;
#[derive(Debug, Default)]
pub(crate) struct OutputBufferState {
chunks: VecDeque<Vec<u8>>,
pub(crate) total_bytes: usize,
}
impl OutputBufferState {
pub(super) fn push_chunk(&mut self, chunk: Vec<u8>) {
self.total_bytes = self.total_bytes.saturating_add(chunk.len());
self.chunks.push_back(chunk);
let mut excess = self
.total_bytes
.saturating_sub(UNIFIED_EXEC_OUTPUT_MAX_BYTES);
while excess > 0 {
match self.chunks.front_mut() {
Some(front) if excess >= front.len() => {
excess -= front.len();
self.total_bytes = self.total_bytes.saturating_sub(front.len());
self.chunks.pop_front();
}
Some(front) => {
front.drain(..excess);
self.total_bytes = self.total_bytes.saturating_sub(excess);
break;
}
None => break,
}
}
}
pub(super) fn drain(&mut self) -> Vec<Vec<u8>> {
let drained: Vec<Vec<u8>> = self.chunks.drain(..).collect();
self.total_bytes = 0;
drained
}
pub(super) fn snapshot(&self) -> Vec<Vec<u8>> {
self.chunks.iter().cloned().collect()
}
}
pub(crate) type OutputBuffer = Arc<Mutex<OutputBufferState>>;
pub(crate) type OutputHandles = (OutputBuffer, Arc<Notify>);
#[derive(Debug)]
pub(crate) struct UnifiedExecSession {
session: ExecCommandSession,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_task: JoinHandle<()>,
sandbox_type: SandboxType,
}
impl UnifiedExecSession {
pub(super) fn new(
session: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
) -> Self {
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
let output_notify = Arc::new(Notify::new());
let mut receiver = initial_output_rx;
let buffer_clone = Arc::clone(&output_buffer);
let notify_clone = Arc::clone(&output_notify);
let output_task = tokio::spawn(async move {
loop {
match receiver.recv().await {
Ok(chunk) => {
let mut guard = buffer_clone.lock().await;
guard.push_chunk(chunk);
drop(guard);
notify_clone.notify_waiters();
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
Self {
session,
output_buffer,
output_notify,
output_task,
sandbox_type,
}
}
pub(super) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.session.writer_sender()
}
pub(super) fn output_handles(&self) -> OutputHandles {
(
Arc::clone(&self.output_buffer),
Arc::clone(&self.output_notify),
)
}
pub(super) fn has_exited(&self) -> bool {
self.session.has_exited()
}
pub(super) fn exit_code(&self) -> Option<i32> {
self.session.exit_code()
}
async fn snapshot_output(&self) -> Vec<Vec<u8>> {
let guard = self.output_buffer.lock().await;
guard.snapshot()
}
fn sandbox_type(&self) -> SandboxType {
self.sandbox_type
}
pub(super) async fn check_for_sandbox_denial(&self) -> Result<(), UnifiedExecError> {
if self.sandbox_type() == SandboxType::None || !self.has_exited() {
return Ok(());
}
let _ =
tokio::time::timeout(Duration::from_millis(20), self.output_notify.notified()).await;
let collected_chunks = self.snapshot_output().await;
let mut aggregated: Vec<u8> = Vec::new();
for chunk in collected_chunks {
aggregated.extend_from_slice(&chunk);
}
let aggregated_text = String::from_utf8_lossy(&aggregated).to_string();
let exit_code = self.exit_code().unwrap_or(-1);
let exec_output = ExecToolCallOutput {
exit_code,
stdout: StreamOutput::new(aggregated_text.clone()),
stderr: StreamOutput::new(String::new()),
aggregated_output: StreamOutput::new(aggregated_text.clone()),
duration: Duration::ZERO,
timed_out: false,
};
if is_likely_sandbox_denied(self.sandbox_type(), &exec_output) {
let (snippet, _) = truncate_middle(&aggregated_text, UNIFIED_EXEC_OUTPUT_MAX_BYTES);
let message = if snippet.is_empty() {
format!("exit code {exit_code}")
} else {
snippet
};
return Err(UnifiedExecError::sandbox_denied(message, exec_output));
}
Ok(())
}
pub(super) async fn from_spawned(
spawned: SpawnedPty,
sandbox_type: SandboxType,
) -> Result<Self, UnifiedExecError> {
let SpawnedPty {
session,
output_rx,
mut exit_rx,
} = spawned;
let managed = Self::new(session, output_rx, sandbox_type);
let exit_ready = match exit_rx.try_recv() {
Ok(_) | Err(TryRecvError::Closed) => true,
Err(TryRecvError::Empty) => false,
};
if exit_ready {
managed.check_for_sandbox_denial().await?;
return Ok(managed);
}
tokio::pin!(exit_rx);
if tokio::time::timeout(Duration::from_millis(50), &mut exit_rx)
.await
.is_ok()
{
managed.check_for_sandbox_denial().await?;
}
Ok(managed)
}
}
impl Drop for UnifiedExecSession {
fn drop(&mut self) {
self.output_task.abort();
}
}

View File

@@ -0,0 +1,402 @@
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio::time::Instant;
use crate::exec::ExecToolCallOutput;
use crate::exec::StreamOutput;
use crate::exec_env::create_env;
use crate::sandboxing::ExecEnv;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventStage;
use crate::tools::orchestrator::ToolOrchestrator;
use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest;
use crate::tools::runtimes::unified_exec::UnifiedExecRuntime;
use crate::tools::sandboxing::ToolCtx;
use super::ExecCommandRequest;
use super::MIN_YIELD_TIME_MS;
use super::SessionEntry;
use super::UnifiedExecContext;
use super::UnifiedExecError;
use super::UnifiedExecResponse;
use super::UnifiedExecSessionManager;
use super::WriteStdinRequest;
use super::clamp_yield_time;
use super::generate_chunk_id;
use super::resolve_max_tokens;
use super::session::OutputBuffer;
use super::session::UnifiedExecSession;
use super::truncate_output_to_tokens;
impl UnifiedExecSessionManager {
pub(crate) async fn exec_command(
&self,
request: ExecCommandRequest<'_>,
context: &UnifiedExecContext,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let cwd = request
.workdir
.clone()
.unwrap_or_else(|| context.turn.cwd.clone());
let shell_flag = if request.login { "-lc" } else { "-c" };
let command = vec![
request.shell.to_string(),
shell_flag.to_string(),
request.command.to_string(),
];
let session = self
.open_session_with_sandbox(command, cwd.clone(), context)
.await?;
let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms =
clamp_yield_time(Some(request.yield_time_ms.unwrap_or(MIN_YIELD_TIME_MS)));
let start = Instant::now();
let (output_buffer, output_notify) = session.output_handles();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected =
Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens);
let chunk_id = generate_chunk_id();
let exit_code = session.exit_code();
let session_id = if session.has_exited() {
None
} else {
Some(
self.store_session(session, context, request.command, cwd.clone(), start)
.await,
)
};
let response = UnifiedExecResponse {
event_call_id: context.call_id.clone(),
chunk_id,
wall_time,
output,
session_id,
exit_code,
original_token_count,
};
// If the command completed during this call, emit an ExecCommandEnd via the emitter.
if response.session_id.is_none() {
let exit = response.exit_code.unwrap_or(-1);
Self::emit_exec_end_from_context(
context,
request.command.to_string(),
cwd,
response.output.clone(),
exit,
response.wall_time,
)
.await;
}
Ok(response)
}
pub(crate) async fn write_stdin(
&self,
request: WriteStdinRequest<'_>,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let session_id = request.session_id;
let (writer_tx, output_buffer, output_notify) =
self.prepare_session_handles(session_id).await?;
if !request.input.is_empty() {
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
let start = Instant::now();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected =
Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens);
let chunk_id = generate_chunk_id();
let status = self.refresh_session_state(session_id).await;
let (session_id, exit_code, completion_entry, event_call_id) = match status {
SessionStatus::Alive { exit_code, call_id } => {
(Some(session_id), exit_code, None, call_id)
}
SessionStatus::Exited { exit_code, entry } => {
let call_id = entry.call_id.clone();
(None, exit_code, Some(*entry), call_id)
}
SessionStatus::Unknown => {
return Err(UnifiedExecError::UnknownSessionId { session_id });
}
};
let response = UnifiedExecResponse {
event_call_id,
chunk_id,
wall_time,
output,
session_id,
exit_code,
original_token_count,
};
if let (Some(exit), Some(entry)) = (response.exit_code, completion_entry) {
let total_duration = Instant::now().saturating_duration_since(entry.started_at);
Self::emit_exec_end_from_entry(entry, response.output.clone(), exit, total_duration)
.await;
}
Ok(response)
}
async fn refresh_session_state(&self, session_id: i32) -> SessionStatus {
let mut sessions = self.sessions.lock().await;
let Some(entry) = sessions.get(&session_id) else {
return SessionStatus::Unknown;
};
let exit_code = entry.session.exit_code();
if entry.session.has_exited() {
let Some(entry) = sessions.remove(&session_id) else {
return SessionStatus::Unknown;
};
SessionStatus::Exited {
exit_code,
entry: Box::new(entry),
}
} else {
SessionStatus::Alive {
exit_code,
call_id: entry.call_id.clone(),
}
}
}
async fn prepare_session_handles(
&self,
session_id: i32,
) -> Result<(mpsc::Sender<Vec<u8>>, OutputBuffer, Arc<Notify>), UnifiedExecError> {
let sessions = self.sessions.lock().await;
let (output_buffer, output_notify, writer_tx) =
if let Some(entry) = sessions.get(&session_id) {
let (buffer, notify) = entry.session.output_handles();
(buffer, notify, entry.session.writer_sender())
} else {
return Err(UnifiedExecError::UnknownSessionId { session_id });
};
Ok((writer_tx, output_buffer, output_notify))
}
async fn send_input(
writer_tx: &mpsc::Sender<Vec<u8>>,
data: &[u8],
) -> Result<(), UnifiedExecError> {
writer_tx
.send(data.to_vec())
.await
.map_err(|_| UnifiedExecError::WriteToStdin)
}
async fn store_session(
&self,
session: UnifiedExecSession,
context: &UnifiedExecContext,
command: &str,
cwd: PathBuf,
started_at: Instant,
) -> i32 {
let session_id = self
.next_session_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let entry = SessionEntry {
session,
session_ref: Arc::clone(&context.session),
turn_ref: Arc::clone(&context.turn),
call_id: context.call_id.clone(),
command: command.to_string(),
cwd,
started_at,
};
self.sessions.lock().await.insert(session_id, entry);
session_id
}
async fn emit_exec_end_from_entry(
entry: SessionEntry,
aggregated_output: String,
exit_code: i32,
duration: Duration,
) {
let output = ExecToolCallOutput {
exit_code,
stdout: StreamOutput::new(aggregated_output.clone()),
stderr: StreamOutput::new(String::new()),
aggregated_output: StreamOutput::new(aggregated_output),
duration,
timed_out: false,
};
let event_ctx = ToolEventCtx::new(
entry.session_ref.as_ref(),
entry.turn_ref.as_ref(),
&entry.call_id,
None,
);
let emitter = ToolEmitter::unified_exec(entry.command, entry.cwd, true);
emitter
.emit(event_ctx, ToolEventStage::Success(output))
.await;
}
async fn emit_exec_end_from_context(
context: &UnifiedExecContext,
command: String,
cwd: PathBuf,
aggregated_output: String,
exit_code: i32,
duration: Duration,
) {
let output = ExecToolCallOutput {
exit_code,
stdout: StreamOutput::new(aggregated_output.clone()),
stderr: StreamOutput::new(String::new()),
aggregated_output: StreamOutput::new(aggregated_output),
duration,
timed_out: false,
};
let event_ctx = ToolEventCtx::new(
context.session.as_ref(),
context.turn.as_ref(),
&context.call_id,
None,
);
let emitter = ToolEmitter::unified_exec(command, cwd, true);
emitter
.emit(event_ctx, ToolEventStage::Success(output))
.await;
}
pub(crate) async fn open_session_with_exec_env(
&self,
env: &ExecEnv,
) -> Result<UnifiedExecSession, UnifiedExecError> {
let (program, args) = env
.command
.split_first()
.ok_or(UnifiedExecError::MissingCommandLine)?;
let spawned = codex_utils_pty::spawn_pty_process(
program,
args,
env.cwd.as_path(),
&env.env,
&env.arg0,
)
.await
.map_err(|err| UnifiedExecError::create_session(err.to_string()))?;
UnifiedExecSession::from_spawned(spawned, env.sandbox).await
}
pub(super) async fn open_session_with_sandbox(
&self,
command: Vec<String>,
cwd: PathBuf,
context: &UnifiedExecContext,
) -> Result<UnifiedExecSession, UnifiedExecError> {
let mut orchestrator = ToolOrchestrator::new();
let mut runtime = UnifiedExecRuntime::new(self);
let req = UnifiedExecToolRequest::new(
command,
cwd,
create_env(&context.turn.shell_environment_policy),
);
let tool_ctx = ToolCtx {
session: context.session.as_ref(),
turn: context.turn.as_ref(),
call_id: context.call_id.clone(),
tool_name: "exec_command".to_string(),
};
orchestrator
.run(
&mut runtime,
&req,
&tool_ctx,
context.turn.as_ref(),
context.turn.approval_policy,
)
.await
.map_err(|e| UnifiedExecError::create_session(format!("{e:?}")))
}
pub(super) async fn collect_output_until_deadline(
output_buffer: &OutputBuffer,
output_notify: &Arc<Notify>,
deadline: Instant,
) -> Vec<u8> {
let mut collected: Vec<u8> = Vec::with_capacity(4096);
loop {
let drained_chunks;
let mut wait_for_output = None;
{
let mut guard = output_buffer.lock().await;
drained_chunks = guard.drain();
if drained_chunks.is_empty() {
wait_for_output = Some(output_notify.notified());
}
}
if drained_chunks.is_empty() {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining == Duration::ZERO {
break;
}
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
tokio::pin!(notified);
tokio::select! {
_ = &mut notified => {}
_ = tokio::time::sleep(remaining) => break,
}
continue;
}
for chunk in drained_chunks {
collected.extend_from_slice(&chunk);
}
if Instant::now() >= deadline {
break;
}
}
collected
}
}
enum SessionStatus {
Alive {
exit_code: Option<i32>,
call_id: String,
},
Exited {
exit_code: Option<i32>,
entry: Box<SessionEntry>,
},
Unknown,
}