feat: stream exec stdout events (#1786)

## Summary
- stream command stdout as `ExecCommandStdout` events
- forward streamed stdout to clients and ignore in human output
processor
- adjust call sites for new streaming API
This commit is contained in:
aibrahim-oai
2025-08-01 13:04:34 -07:00
committed by GitHub
parent 8360c6a3ec
commit bc7beddaa2
11 changed files with 238 additions and 5 deletions

View File

@@ -10,6 +10,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use async_channel::Sender;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::BufReader;
@@ -19,10 +20,15 @@ use tokio::sync::Notify;
use crate::error::CodexErr;
use crate::error::Result;
use crate::error::SandboxErr;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandStderrDeltaEvent;
use crate::protocol::ExecCommandStdoutDeltaEvent;
use crate::protocol::SandboxPolicy;
use crate::seatbelt::spawn_command_under_seatbelt;
use crate::spawn::StdioPolicy;
use crate::spawn::spawn_child_async;
use serde_bytes::ByteBuf;
// Maximum we send for each stream, which is either:
// - 10KiB OR
@@ -56,18 +62,26 @@ pub enum SandboxType {
LinuxSeccomp,
}
#[derive(Clone)]
pub struct StdoutStream {
pub sub_id: String,
pub call_id: String,
pub tx_event: Sender<Event>,
}
pub async fn process_exec_tool_call(
params: ExecParams,
sandbox_type: SandboxType,
ctrl_c: Arc<Notify>,
sandbox_policy: &SandboxPolicy,
codex_linux_sandbox_exe: &Option<PathBuf>,
stdout_stream: Option<StdoutStream>,
) -> Result<ExecToolCallOutput> {
let start = Instant::now();
let raw_output_result: std::result::Result<RawExecToolCallOutput, CodexErr> = match sandbox_type
{
SandboxType::None => exec(params, sandbox_policy, ctrl_c).await,
SandboxType::None => exec(params, sandbox_policy, ctrl_c, stdout_stream.clone()).await,
SandboxType::MacosSeatbelt => {
let ExecParams {
command,
@@ -83,7 +97,7 @@ pub async fn process_exec_tool_call(
env,
)
.await?;
consume_truncated_output(child, ctrl_c, timeout_ms).await
consume_truncated_output(child, ctrl_c, timeout_ms, stdout_stream.clone()).await
}
SandboxType::LinuxSeccomp => {
let ExecParams {
@@ -106,7 +120,7 @@ pub async fn process_exec_tool_call(
)
.await?;
consume_truncated_output(child, ctrl_c, timeout_ms).await
consume_truncated_output(child, ctrl_c, timeout_ms, stdout_stream).await
}
};
let duration = start.elapsed();
@@ -233,6 +247,7 @@ async fn exec(
}: ExecParams,
sandbox_policy: &SandboxPolicy,
ctrl_c: Arc<Notify>,
stdout_stream: Option<StdoutStream>,
) -> Result<RawExecToolCallOutput> {
let (program, args) = command.split_first().ok_or_else(|| {
CodexErr::Io(io::Error::new(
@@ -251,7 +266,7 @@ async fn exec(
env,
)
.await?;
consume_truncated_output(child, ctrl_c, timeout_ms).await
consume_truncated_output(child, ctrl_c, timeout_ms, stdout_stream).await
}
/// Consumes the output of a child process, truncating it so it is suitable for
@@ -260,6 +275,7 @@ pub(crate) async fn consume_truncated_output(
mut child: Child,
ctrl_c: Arc<Notify>,
timeout_ms: Option<u64>,
stdout_stream: Option<StdoutStream>,
) -> Result<RawExecToolCallOutput> {
// Both stdout and stderr were configured with `Stdio::piped()`
// above, therefore `take()` should normally return `Some`. If it doesn't
@@ -280,11 +296,15 @@ pub(crate) async fn consume_truncated_output(
BufReader::new(stdout_reader),
MAX_STREAM_OUTPUT,
MAX_STREAM_OUTPUT_LINES,
stdout_stream.clone(),
false,
));
let stderr_handle = tokio::spawn(read_capped(
BufReader::new(stderr_reader),
MAX_STREAM_OUTPUT,
MAX_STREAM_OUTPUT_LINES,
stdout_stream.clone(),
true,
));
let interrupted = ctrl_c.notified();
@@ -318,10 +338,12 @@ pub(crate) async fn consume_truncated_output(
})
}
async fn read_capped<R: AsyncRead + Unpin>(
async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
max_output: usize,
max_lines: usize,
stream: Option<StdoutStream>,
is_stderr: bool,
) -> io::Result<Vec<u8>> {
let mut buf = Vec::with_capacity(max_output.min(8 * 1024));
let mut tmp = [0u8; 8192];
@@ -335,6 +357,27 @@ async fn read_capped<R: AsyncRead + Unpin>(
break;
}
if let Some(stream) = &stream {
let chunk = tmp[..n].to_vec();
let msg = if is_stderr {
EventMsg::ExecCommandStderrDelta(ExecCommandStderrDeltaEvent {
call_id: stream.call_id.clone(),
chunk: ByteBuf::from(chunk),
})
} else {
EventMsg::ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent {
call_id: stream.call_id.clone(),
chunk: ByteBuf::from(chunk),
})
};
let event = Event {
id: stream.sub_id.clone(),
msg,
};
#[allow(clippy::let_unit_value)]
let _ = stream.tx_event.send(event).await;
}
// Copy into the buffer only while we still have byte and line budget.
if remaining_bytes > 0 && remaining_lines > 0 {
let mut copy_len = 0;