collabse stdout and stderr delta events into one (#1787)

This commit is contained in:
aibrahim-oai
2025-08-01 14:00:19 -07:00
committed by GitHub
parent bc7beddaa2
commit f20de21cb6
6 changed files with 36 additions and 35 deletions

View File

@@ -22,8 +22,8 @@ use crate::error::Result;
use crate::error::SandboxErr; use crate::error::SandboxErr;
use crate::protocol::Event; use crate::protocol::Event;
use crate::protocol::EventMsg; use crate::protocol::EventMsg;
use crate::protocol::ExecCommandStderrDeltaEvent; use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecCommandStdoutDeltaEvent; use crate::protocol::ExecOutputStream;
use crate::protocol::SandboxPolicy; use crate::protocol::SandboxPolicy;
use crate::seatbelt::spawn_command_under_seatbelt; use crate::seatbelt::spawn_command_under_seatbelt;
use crate::spawn::StdioPolicy; use crate::spawn::StdioPolicy;
@@ -359,17 +359,15 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
if let Some(stream) = &stream { if let Some(stream) = &stream {
let chunk = tmp[..n].to_vec(); let chunk = tmp[..n].to_vec();
let msg = if is_stderr { let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
EventMsg::ExecCommandStderrDelta(ExecCommandStderrDeltaEvent { call_id: stream.call_id.clone(),
call_id: stream.call_id.clone(), stream: if is_stderr {
chunk: ByteBuf::from(chunk), ExecOutputStream::Stderr
}) } else {
} else { ExecOutputStream::Stdout
EventMsg::ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent { },
call_id: stream.call_id.clone(), chunk: ByteBuf::from(chunk),
chunk: ByteBuf::from(chunk), });
})
};
let event = Event { let event = Event {
id: stream.sub_id.clone(), id: stream.sub_id.clone(),
msg, msg,

View File

@@ -324,11 +324,8 @@ pub enum EventMsg {
/// Notification that the server is about to execute a command. /// Notification that the server is about to execute a command.
ExecCommandBegin(ExecCommandBeginEvent), ExecCommandBegin(ExecCommandBeginEvent),
/// Incremental chunk of stdout from a running command. /// Incremental chunk of output from a running command.
ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent), ExecCommandOutputDelta(ExecCommandOutputDeltaEvent),
/// Incremental chunk of stderr from a running command.
ExecCommandStderrDelta(ExecCommandStderrDeltaEvent),
ExecCommandEnd(ExecCommandEndEvent), ExecCommandEnd(ExecCommandEndEvent),
@@ -484,19 +481,19 @@ pub struct ExecCommandEndEvent {
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecCommandStdoutDeltaEvent { #[serde(rename_all = "snake_case")]
/// Identifier for the ExecCommandBegin that produced this chunk. pub enum ExecOutputStream {
pub call_id: String, Stdout,
/// Raw stdout bytes (may not be valid UTF-8). Stderr,
#[serde(with = "serde_bytes")]
pub chunk: ByteBuf,
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecCommandStderrDeltaEvent { pub struct ExecCommandOutputDeltaEvent {
/// Identifier for the ExecCommandBegin that produced this chunk. /// Identifier for the ExecCommandBegin that produced this chunk.
pub call_id: String, pub call_id: String,
/// Raw stderr bytes (may not be valid UTF-8). /// Which stream produced this chunk.
pub stream: ExecOutputStream,
/// Raw bytes from the stream (may not be valid UTF-8).
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub chunk: ByteBuf, pub chunk: ByteBuf,
} }

View File

@@ -11,15 +11,19 @@ use codex_core::exec::StdoutStream;
use codex_core::exec::process_exec_tool_call; use codex_core::exec::process_exec_tool_call;
use codex_core::protocol::Event; use codex_core::protocol::Event;
use codex_core::protocol::EventMsg; use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandStderrDeltaEvent; use codex_core::protocol::ExecCommandOutputDeltaEvent;
use codex_core::protocol::ExecCommandStdoutDeltaEvent; use codex_core::protocol::ExecOutputStream;
use codex_core::protocol::SandboxPolicy; use codex_core::protocol::SandboxPolicy;
use tokio::sync::Notify; use tokio::sync::Notify;
fn collect_stdout_events(rx: Receiver<Event>) -> Vec<u8> { fn collect_stdout_events(rx: Receiver<Event>) -> Vec<u8> {
let mut out = Vec::new(); let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() { while let Ok(ev) = rx.try_recv() {
if let EventMsg::ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent { chunk, .. }) = ev.msg if let EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
stream: ExecOutputStream::Stdout,
chunk,
..
}) = ev.msg
{ {
out.extend_from_slice(&chunk); out.extend_from_slice(&chunk);
} }
@@ -126,7 +130,11 @@ async fn test_exec_stderr_stream_events_echo() {
// Collect only stderr delta events // Collect only stderr delta events
let mut err = Vec::new(); let mut err = Vec::new();
while let Ok(ev) = rx.try_recv() { while let Ok(ev) = rx.try_recv() {
if let EventMsg::ExecCommandStderrDelta(ExecCommandStderrDeltaEvent { chunk, .. }) = ev.msg if let EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
stream: ExecOutputStream::Stderr,
chunk,
..
}) = ev.msg
{ {
err.extend_from_slice(&chunk); err.extend_from_slice(&chunk);
} }

View File

@@ -239,8 +239,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
cwd.to_string_lossy(), cwd.to_string_lossy(),
); );
} }
EventMsg::ExecCommandStdoutDelta(_) => {} EventMsg::ExecCommandOutputDelta(_) => {}
EventMsg::ExecCommandStderrDelta(_) => {}
EventMsg::ExecCommandEnd(ExecCommandEndEvent { EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id, call_id,
stdout, stdout,

View File

@@ -258,8 +258,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_) | EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_) | EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandStdoutDelta(_) | EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandStderrDelta(_)
| EventMsg::ExecCommandEnd(_) | EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_) | EventMsg::BackgroundEvent(_)
| EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyBegin(_)

View File

@@ -374,7 +374,7 @@ impl ChatWidget<'_> {
); );
self.add_to_history(HistoryCell::new_active_exec_command(command)); self.add_to_history(HistoryCell::new_active_exec_command(command));
} }
EventMsg::ExecCommandStdoutDelta(_) => {} EventMsg::ExecCommandOutputDelta(_) => {}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent { EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: _, call_id: _,
auto_approved, auto_approved,