chore: align unified_exec (#5442)

Align `unified_exec` with b implementation
This commit is contained in:
jif-oai
2025-10-22 11:50:18 +01:00
committed by GitHub
parent 53cadb4df6
commit 00b1e130b3
8 changed files with 1060 additions and 596 deletions

View File

@@ -23,7 +23,10 @@
use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::time::Duration;
use rand::Rng;
use rand::rng;
use tokio::sync::Mutex;
use crate::codex::Session;
@@ -36,27 +39,43 @@ mod session_manager;
pub(crate) use errors::UnifiedExecError;
pub(crate) use session::UnifiedExecSession;
const DEFAULT_TIMEOUT_MS: u64 = 1_000;
const MAX_TIMEOUT_MS: u64 = 60_000;
const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 128 * 1024; // 128 KiB
pub(crate) const DEFAULT_YIELD_TIME_MS: u64 = 10_000;
pub(crate) const MIN_YIELD_TIME_MS: u64 = 250;
pub(crate) const MAX_YIELD_TIME_MS: u64 = 30_000;
pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
pub(crate) struct UnifiedExecContext<'a> {
pub session: &'a Session,
pub turn: &'a TurnContext,
pub call_id: &'a str,
pub session_id: Option<i32>,
}
#[derive(Debug)]
pub(crate) struct UnifiedExecRequest<'a> {
pub input_chunks: &'a [String],
pub timeout_ms: Option<u64>,
pub(crate) struct ExecCommandRequest<'a> {
pub command: &'a str,
pub shell: &'a str,
pub login: bool,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
#[derive(Debug)]
pub(crate) struct WriteStdinRequest<'a> {
pub session_id: i32,
pub input: &'a str,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct UnifiedExecResult {
pub session_id: Option<i32>,
pub(crate) struct UnifiedExecResponse {
pub chunk_id: String,
pub wall_time: Duration,
pub output: String,
pub session_id: Option<i32>,
pub exit_code: Option<i32>,
pub original_token_count: Option<usize>,
}
#[derive(Debug, Default)]
@@ -65,16 +84,66 @@ pub(crate) struct UnifiedExecSessionManager {
sessions: Mutex<HashMap<i32, session::UnifiedExecSession>>,
}
pub(crate) fn clamp_yield_time(yield_time_ms: Option<u64>) -> u64 {
match yield_time_ms {
Some(value) => value.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS),
None => DEFAULT_YIELD_TIME_MS,
}
}
pub(crate) fn resolve_max_tokens(max_tokens: Option<usize>) -> usize {
max_tokens.unwrap_or(DEFAULT_MAX_OUTPUT_TOKENS)
}
pub(crate) fn generate_chunk_id() -> String {
let mut rng = rng();
(0..6)
.map(|_| format!("{:x}", rng.random_range(0..16)))
.collect()
}
pub(crate) fn truncate_output_to_tokens(
output: &str,
max_tokens: usize,
) -> (String, Option<usize>) {
if max_tokens == 0 {
let total_tokens = output.chars().count();
let message = format!("{total_tokens} tokens truncated…");
return (message, Some(total_tokens));
}
let tokens: Vec<char> = output.chars().collect();
let total_tokens = tokens.len();
if total_tokens <= max_tokens {
return (output.to_string(), None);
}
let half = max_tokens / 2;
if half == 0 {
let truncated = total_tokens.saturating_sub(max_tokens);
let message = format!("{truncated} tokens truncated…");
return (message, Some(total_tokens));
}
let truncated = total_tokens.saturating_sub(half * 2);
let mut truncated_output = String::new();
truncated_output.extend(&tokens[..half]);
truncated_output.push_str(&format!("{truncated} tokens truncated…"));
truncated_output.extend(&tokens[total_tokens - half..]);
(truncated_output, Some(total_tokens))
}
#[cfg(test)]
#[cfg(unix)]
mod tests {
use super::*;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::make_session_and_context;
use crate::protocol::AskForApproval;
use crate::protocol::SandboxPolicy;
use crate::unified_exec::ExecCommandRequest;
use crate::unified_exec::WriteStdinRequest;
use core_test_support::skip_if_sandbox;
use std::sync::Arc;
use tokio::time::Duration;
@@ -88,34 +157,52 @@ mod tests {
(Arc::new(session), Arc::new(turn))
}
async fn run_unified_exec_request(
async fn exec_command(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
session_id: Option<i32>,
input: Vec<String>,
timeout_ms: Option<u64>,
) -> Result<UnifiedExecResult, UnifiedExecError> {
let request_input = input;
let request = UnifiedExecRequest {
input_chunks: &request_input,
timeout_ms,
cmd: &str,
yield_time_ms: Option<u64>,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let context = UnifiedExecContext {
session,
turn: turn.as_ref(),
call_id: "call",
};
session
.services
.unified_exec_manager
.handle_request(
request,
UnifiedExecContext {
session,
turn: turn.as_ref(),
call_id: "call",
session_id,
.exec_command(
ExecCommandRequest {
command: cmd,
shell: "/bin/bash",
login: true,
yield_time_ms,
max_output_tokens: None,
},
&context,
)
.await
}
async fn write_stdin(
session: &Arc<Session>,
session_id: i32,
input: &str,
yield_time_ms: Option<u64>,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
session
.services
.unified_exec_manager
.write_stdin(WriteStdinRequest {
session_id,
input,
yield_time_ms,
max_output_tokens: None,
})
.await
}
#[test]
fn push_chunk_trims_only_excess_bytes() {
let mut buffer = OutputBufferState::default();
@@ -140,37 +227,28 @@ mod tests {
let (session, turn) = test_session_and_turn();
let open_shell = run_unified_exec_request(
&session,
&turn,
None,
vec!["bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_id = open_shell.session_id.expect("expected session_id");
run_unified_exec_request(
write_stdin(
&session,
&turn,
Some(session_id),
vec![
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
session_id,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
Some(2_500),
)
.await?;
let out_2 = run_unified_exec_request(
let out_2 = write_stdin(
&session,
&turn,
Some(session_id),
vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
session_id,
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
Some(2_500),
)
.await?;
assert!(out_2.output.contains("codex"));
assert!(
out_2.output.contains("codex"),
"expected environment variable output"
);
Ok(())
}
@@ -181,47 +259,44 @@ mod tests {
let (session, turn) = test_session_and_turn();
let shell_a = run_unified_exec_request(
&session,
&turn,
None,
vec!["/bin/bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let shell_a = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_a = shell_a.session_id.expect("expected session id");
run_unified_exec_request(
write_stdin(
&session,
&turn,
Some(session_a),
vec!["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
session_a,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
Some(2_500),
)
.await?;
let out_2 = run_unified_exec_request(
let out_2 = exec_command(
&session,
&turn,
None,
vec![
"echo".to_string(),
"$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(),
],
"echo $CODEX_INTERACTIVE_SHELL_VAR",
Some(2_500),
)
.await?;
assert!(!out_2.output.contains("codex"));
assert!(
out_2.session_id.is_none(),
"short command should not retain a session"
);
assert!(
!out_2.output.contains("codex"),
"short command should run in a fresh shell"
);
let out_3 = run_unified_exec_request(
let out_3 = write_stdin(
&session,
&turn,
Some(session_a),
vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
session_a,
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
Some(2_500),
)
.await?;
assert!(out_3.output.contains("codex"));
assert!(
out_3.output.contains("codex"),
"session should preserve state"
);
Ok(())
}
@@ -232,45 +307,37 @@ mod tests {
let (session, turn) = test_session_and_turn();
let open_shell = run_unified_exec_request(
&session,
&turn,
None,
vec!["bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_id = open_shell.session_id.expect("expected session id");
run_unified_exec_request(
write_stdin(
&session,
&turn,
Some(session_id),
vec![
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
session_id,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
Some(2_500),
)
.await?;
let out_2 = run_unified_exec_request(
let out_2 = write_stdin(
&session,
&turn,
Some(session_id),
vec!["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
session_id,
"sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n",
Some(10),
)
.await?;
assert!(!out_2.output.contains("codex"));
assert!(
!out_2.output.contains("codex"),
"timeout too short should yield incomplete output"
);
tokio::time::sleep(Duration::from_secs(7)).await;
let out_3 =
run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100))
.await?;
let out_3 = write_stdin(&session, session_id, "", Some(100)).await?;
assert!(out_3.output.contains("codex"));
assert!(
out_3.output.contains("codex"),
"subsequent poll should retrieve output"
);
Ok(())
}
@@ -280,18 +347,9 @@ mod tests {
async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn();
let result = run_unified_exec_request(
&session,
&turn,
None,
vec!["echo".to_string(), "codex".to_string()],
Some(120_000),
)
.await?;
let result = exec_command(&session, &turn, "echo codex", Some(120_000)).await?;
assert!(result.output.starts_with(
"Warning: requested timeout 120000ms exceeds maximum of 60000ms; clamping to 60000ms.\n"
));
assert!(result.session_id.is_none());
assert!(result.output.contains("codex"));
Ok(())
@@ -301,16 +359,12 @@ mod tests {
#[ignore] // Ignored while we have a better way to test this.
async fn completed_commands_do_not_persist_sessions() -> anyhow::Result<()> {
let (session, turn) = test_session_and_turn();
let result = run_unified_exec_request(
&session,
&turn,
None,
vec!["/bin/echo".to_string(), "codex".to_string()],
Some(2_500),
)
.await?;
let result = exec_command(&session, &turn, "echo codex", Some(2_500)).await?;
assert!(result.session_id.is_none());
assert!(
result.session_id.is_none(),
"completed command should not retain session"
);
assert!(result.output.contains("codex"));
assert!(
@@ -332,31 +386,16 @@ mod tests {
let (session, turn) = test_session_and_turn();
let open_shell = run_unified_exec_request(
&session,
&turn,
None,
vec!["/bin/bash".to_string(), "-i".to_string()],
Some(2_500),
)
.await?;
let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?;
let session_id = open_shell.session_id.expect("expected session id");
run_unified_exec_request(
&session,
&turn,
Some(session_id),
vec!["exit\n".to_string()],
Some(2_500),
)
.await?;
write_stdin(&session, session_id, "exit\n", Some(2_500)).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let err =
run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100))
.await
.expect_err("expected unknown session error");
let err = write_stdin(&session, session_id, "", Some(100))
.await
.expect_err("expected unknown session error");
match err {
UnifiedExecError::UnknownSessionId { session_id: err_id } => {

View File

@@ -7,84 +7,163 @@ use tokio::time::Instant;
use crate::exec_env::create_env;
use crate::sandboxing::ExecEnv;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::events::ToolEventStage;
use crate::tools::orchestrator::ToolOrchestrator;
use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest;
use crate::tools::runtimes::unified_exec::UnifiedExecRuntime;
use crate::tools::sandboxing::ToolCtx;
use crate::truncate::truncate_middle;
use super::DEFAULT_TIMEOUT_MS;
use super::MAX_TIMEOUT_MS;
use super::UNIFIED_EXEC_OUTPUT_MAX_BYTES;
use super::ExecCommandRequest;
use super::MIN_YIELD_TIME_MS;
use super::UnifiedExecContext;
use super::UnifiedExecError;
use super::UnifiedExecRequest;
use super::UnifiedExecResult;
use super::UnifiedExecResponse;
use super::UnifiedExecSessionManager;
use super::WriteStdinRequest;
use super::clamp_yield_time;
use super::generate_chunk_id;
use super::resolve_max_tokens;
use super::session::OutputBuffer;
use super::session::UnifiedExecSession;
pub(super) struct SessionAcquisition {
pub(super) session_id: i32,
pub(super) writer_tx: mpsc::Sender<Vec<u8>>,
pub(super) output_buffer: OutputBuffer,
pub(super) output_notify: Arc<Notify>,
pub(super) new_session: Option<UnifiedExecSession>,
pub(super) reuse_requested: bool,
}
use super::truncate_output_to_tokens;
impl UnifiedExecSessionManager {
pub(super) async fn acquire_session(
pub(crate) async fn exec_command(
&self,
request: &UnifiedExecRequest<'_>,
request: ExecCommandRequest<'_>,
context: &UnifiedExecContext<'_>,
) -> Result<SessionAcquisition, UnifiedExecError> {
if let Some(existing_id) = context.session_id {
let mut sessions = self.sessions.lock().await;
match sessions.get(&existing_id) {
Some(session) => {
if session.has_exited() {
sessions.remove(&existing_id);
return Err(UnifiedExecError::UnknownSessionId {
session_id: existing_id,
});
}
let (buffer, notify) = session.output_handles();
let writer_tx = session.writer_sender();
Ok(SessionAcquisition {
session_id: existing_id,
writer_tx,
output_buffer: buffer,
output_notify: notify,
new_session: None,
reuse_requested: true,
})
}
None => Err(UnifiedExecError::UnknownSessionId {
session_id: existing_id,
}),
}
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let shell_flag = if request.login { "-lc" } else { "-c" };
let command = vec![
request.shell.to_string(),
shell_flag.to_string(),
request.command.to_string(),
];
let session = self.open_session_with_sandbox(command, context).await?;
let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms =
clamp_yield_time(Some(request.yield_time_ms.unwrap_or(MIN_YIELD_TIME_MS)));
let start = Instant::now();
let (output_buffer, output_notify) = session.output_handles();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected =
Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens);
let chunk_id = generate_chunk_id();
let exit_code = session.exit_code();
let session_id = if session.has_exited() {
None
} else {
let new_id = self
.next_session_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let managed_session = self
.open_session_with_sandbox(request.input_chunks.to_vec(), context)
.await?;
let (buffer, notify) = managed_session.output_handles();
let writer_tx = managed_session.writer_sender();
Ok(SessionAcquisition {
session_id: new_id,
writer_tx,
output_buffer: buffer,
output_notify: notify,
new_session: Some(managed_session),
reuse_requested: false,
})
Some(self.store_session(session).await)
};
Ok(UnifiedExecResponse {
chunk_id,
wall_time,
output,
session_id,
exit_code,
original_token_count,
})
}
pub(crate) async fn write_stdin(
&self,
request: WriteStdinRequest<'_>,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let session_id = request.session_id;
let (writer_tx, output_buffer, output_notify) =
self.prepare_session_handles(session_id).await?;
if !request.input.is_empty() {
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
let start = Instant::now();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected =
Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens);
let chunk_id = generate_chunk_id();
let (session_id, exit_code) = self.refresh_session_state(session_id).await;
Ok(UnifiedExecResponse {
chunk_id,
wall_time,
output,
session_id,
exit_code,
original_token_count,
})
}
async fn refresh_session_state(&self, session_id: i32) -> (Option<i32>, Option<i32>) {
let mut sessions = self.sessions.lock().await;
if !sessions.contains_key(&session_id) {
return (None, None);
}
let has_exited = sessions
.get(&session_id)
.map(UnifiedExecSession::has_exited)
.unwrap_or(false);
let exit_code = sessions
.get(&session_id)
.and_then(UnifiedExecSession::exit_code);
if has_exited {
sessions.remove(&session_id);
(None, exit_code)
} else {
(Some(session_id), exit_code)
}
}
async fn prepare_session_handles(
&self,
session_id: i32,
) -> Result<(mpsc::Sender<Vec<u8>>, OutputBuffer, Arc<Notify>), UnifiedExecError> {
let sessions = self.sessions.lock().await;
let (output_buffer, output_notify, writer_tx) =
if let Some(session) = sessions.get(&session_id) {
let (buffer, notify) = session.output_handles();
(buffer, notify, session.writer_sender())
} else {
return Err(UnifiedExecError::UnknownSessionId { session_id });
};
Ok((writer_tx, output_buffer, output_notify))
}
async fn send_input(
writer_tx: &mpsc::Sender<Vec<u8>>,
data: &[u8],
) -> Result<(), UnifiedExecError> {
writer_tx
.send(data.to_vec())
.await
.map_err(|_| UnifiedExecError::WriteToStdin)
}
async fn store_session(&self, session: UnifiedExecSession) -> i32 {
let session_id = self
.next_session_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.sessions.lock().await.insert(session_id, session);
session_id
}
pub(crate) async fn open_session_with_exec_env(
@@ -118,7 +197,7 @@ impl UnifiedExecSessionManager {
session: context.session,
turn: context.turn,
call_id: context.call_id.to_string(),
tool_name: "unified_exec".to_string(),
tool_name: "exec_command".to_string(),
};
orchestrator
.run(
@@ -175,124 +254,4 @@ impl UnifiedExecSessionManager {
collected
}
pub(super) async fn should_store_session(&self, acquisition: &SessionAcquisition) -> bool {
if let Some(session) = acquisition.new_session.as_ref() {
!session.has_exited()
} else if acquisition.reuse_requested {
let mut sessions = self.sessions.lock().await;
if let Some(existing) = sessions.get(&acquisition.session_id) {
if existing.has_exited() {
sessions.remove(&acquisition.session_id);
false
} else {
true
}
} else {
false
}
} else {
true
}
}
pub(super) async fn send_input_chunks(
writer_tx: &mpsc::Sender<Vec<u8>>,
chunks: &[String],
) -> Result<(), UnifiedExecError> {
let mut trailing_whitespace = true;
for chunk in chunks {
if chunk.is_empty() {
continue;
}
let leading_whitespace = chunk
.chars()
.next()
.map(char::is_whitespace)
.unwrap_or(true);
if !trailing_whitespace
&& !leading_whitespace
&& writer_tx.send(vec![b' ']).await.is_err()
{
return Err(UnifiedExecError::WriteToStdin);
}
if writer_tx.send(chunk.as_bytes().to_vec()).await.is_err() {
return Err(UnifiedExecError::WriteToStdin);
}
trailing_whitespace = chunk
.chars()
.next_back()
.map(char::is_whitespace)
.unwrap_or(trailing_whitespace);
}
Ok(())
}
pub async fn handle_request(
&self,
request: UnifiedExecRequest<'_>,
context: UnifiedExecContext<'_>,
) -> Result<UnifiedExecResult, UnifiedExecError> {
let (timeout_ms, timeout_warning) = match request.timeout_ms {
Some(requested) if requested > MAX_TIMEOUT_MS => (
MAX_TIMEOUT_MS,
Some(format!(
"Warning: requested timeout {requested}ms exceeds maximum of {MAX_TIMEOUT_MS}ms; clamping to {MAX_TIMEOUT_MS}ms.\n"
)),
),
Some(requested) => (requested, None),
None => (DEFAULT_TIMEOUT_MS, None),
};
if !request.input_chunks.is_empty() {
let event_ctx = ToolEventCtx::new(context.session, context.turn, context.call_id, None);
let emitter =
ToolEmitter::shell(request.input_chunks.to_vec(), context.turn.cwd.clone());
emitter.emit(event_ctx, ToolEventStage::Begin).await;
}
let mut acquisition = self.acquire_session(&request, &context).await?;
if acquisition.reuse_requested {
Self::send_input_chunks(&acquisition.writer_tx, request.input_chunks).await?;
}
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
let collected = Self::collect_output_until_deadline(
&acquisition.output_buffer,
&acquisition.output_notify,
deadline,
)
.await;
let (output, _maybe_tokens) = truncate_middle(
&String::from_utf8_lossy(&collected),
UNIFIED_EXEC_OUTPUT_MAX_BYTES,
);
let output = if let Some(warning) = timeout_warning {
format!("{warning}{output}")
} else {
output
};
let should_store_session = self.should_store_session(&acquisition).await;
let session_id = if should_store_session {
if let Some(session) = acquisition.new_session.take() {
self.sessions
.lock()
.await
.insert(acquisition.session_id, session);
}
Some(acquisition.session_id)
} else {
None
};
Ok(UnifiedExecResult { session_id, output })
}
}