feat: end events on unified exec (#5551)
This commit is contained in:
@@ -200,10 +200,51 @@ impl ToolEmitter {
|
||||
) => {
|
||||
emit_patch_end(ctx, String::new(), (*message).to_string(), false).await;
|
||||
}
|
||||
(Self::UnifiedExec { command, cwd, .. }, _) => {
|
||||
// TODO(jif) add end and failures.
|
||||
(Self::UnifiedExec { command, cwd, .. }, ToolEventStage::Begin) => {
|
||||
emit_exec_command_begin(ctx, &[command.to_string()], cwd.as_path()).await;
|
||||
}
|
||||
(Self::UnifiedExec { .. }, ToolEventStage::Success(output)) => {
|
||||
emit_exec_end(
|
||||
ctx,
|
||||
output.stdout.text.clone(),
|
||||
output.stderr.text.clone(),
|
||||
output.aggregated_output.text.clone(),
|
||||
output.exit_code,
|
||||
output.duration,
|
||||
format_exec_output_str(&output),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
(
|
||||
Self::UnifiedExec { .. },
|
||||
ToolEventStage::Failure(ToolEventFailure::Output(output)),
|
||||
) => {
|
||||
emit_exec_end(
|
||||
ctx,
|
||||
output.stdout.text.clone(),
|
||||
output.stderr.text.clone(),
|
||||
output.aggregated_output.text.clone(),
|
||||
output.exit_code,
|
||||
output.duration,
|
||||
format_exec_output_str(&output),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
(
|
||||
Self::UnifiedExec { .. },
|
||||
ToolEventStage::Failure(ToolEventFailure::Message(message)),
|
||||
) => {
|
||||
emit_exec_end(
|
||||
ctx,
|
||||
String::new(),
|
||||
(*message).to_string(),
|
||||
(*message).to_string(),
|
||||
-1,
|
||||
Duration::ZERO,
|
||||
format_exec_output(&message),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,9 @@ use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::ExecCommandOutputDeltaEvent;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolOutput;
|
||||
use crate::tools::context::ToolPayload;
|
||||
@@ -87,11 +90,7 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
};
|
||||
|
||||
let manager: &UnifiedExecSessionManager = &session.services.unified_exec_manager;
|
||||
let context = UnifiedExecContext {
|
||||
session: &session,
|
||||
turn: turn.as_ref(),
|
||||
call_id: &call_id,
|
||||
};
|
||||
let context = UnifiedExecContext::new(session.clone(), turn.clone(), call_id.clone());
|
||||
|
||||
let response = match tool_name.as_str() {
|
||||
"exec_command" => {
|
||||
@@ -101,8 +100,12 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
))
|
||||
})?;
|
||||
|
||||
let event_ctx =
|
||||
ToolEventCtx::new(context.session, context.turn, context.call_id, None);
|
||||
let event_ctx = ToolEventCtx::new(
|
||||
context.session.as_ref(),
|
||||
context.turn.as_ref(),
|
||||
&context.call_id,
|
||||
None,
|
||||
);
|
||||
let emitter =
|
||||
ToolEmitter::unified_exec(args.cmd.clone(), context.turn.cwd.clone(), true);
|
||||
emitter.emit(event_ctx, ToolEventStage::Begin).await;
|
||||
@@ -148,6 +151,18 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
}
|
||||
};
|
||||
|
||||
// Emit a delta event with the chunk of output we just produced, if any.
|
||||
if !response.output.is_empty() {
|
||||
let delta = ExecCommandOutputDeltaEvent {
|
||||
call_id: response.event_call_id.clone(),
|
||||
stream: ExecOutputStream::Stdout,
|
||||
chunk: response.output.as_bytes().to_vec(),
|
||||
};
|
||||
session
|
||||
.send_event(turn.as_ref(), EventMsg::ExecCommandOutputDelta(delta))
|
||||
.await;
|
||||
}
|
||||
|
||||
let content = serialize_response(&response).map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to serialize unified exec output: {err:?}"
|
||||
|
||||
@@ -22,6 +22,8 @@
|
||||
//! - `session_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -45,10 +47,20 @@ pub(crate) const MAX_YIELD_TIME_MS: u64 = 30_000;
|
||||
pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
|
||||
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
|
||||
|
||||
pub(crate) struct UnifiedExecContext<'a> {
|
||||
pub session: &'a Session,
|
||||
pub turn: &'a TurnContext,
|
||||
pub call_id: &'a str,
|
||||
pub(crate) struct UnifiedExecContext {
|
||||
pub session: Arc<Session>,
|
||||
pub turn: Arc<TurnContext>,
|
||||
pub call_id: String,
|
||||
}
|
||||
|
||||
impl UnifiedExecContext {
|
||||
pub fn new(session: Arc<Session>, turn: Arc<TurnContext>, call_id: String) -> Self {
|
||||
Self {
|
||||
session,
|
||||
turn,
|
||||
call_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -70,6 +82,7 @@ pub(crate) struct WriteStdinRequest<'a> {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub(crate) struct UnifiedExecResponse {
|
||||
pub event_call_id: String,
|
||||
pub chunk_id: String,
|
||||
pub wall_time: Duration,
|
||||
pub output: String,
|
||||
@@ -78,10 +91,20 @@ pub(crate) struct UnifiedExecResponse {
|
||||
pub original_token_count: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Default)]
|
||||
pub(crate) struct UnifiedExecSessionManager {
|
||||
next_session_id: AtomicI32,
|
||||
sessions: Mutex<HashMap<i32, session::UnifiedExecSession>>,
|
||||
sessions: Mutex<HashMap<i32, SessionEntry>>,
|
||||
}
|
||||
|
||||
struct SessionEntry {
|
||||
session: session::UnifiedExecSession,
|
||||
session_ref: Arc<Session>,
|
||||
turn_ref: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
command: String,
|
||||
cwd: PathBuf,
|
||||
started_at: tokio::time::Instant,
|
||||
}
|
||||
|
||||
pub(crate) fn clamp_yield_time(yield_time_ms: Option<u64>) -> u64 {
|
||||
@@ -163,11 +186,8 @@ mod tests {
|
||||
cmd: &str,
|
||||
yield_time_ms: Option<u64>,
|
||||
) -> Result<UnifiedExecResponse, UnifiedExecError> {
|
||||
let context = UnifiedExecContext {
|
||||
session,
|
||||
turn: turn.as_ref(),
|
||||
call_id: "call",
|
||||
};
|
||||
let context =
|
||||
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
|
||||
|
||||
session
|
||||
.services
|
||||
|
||||
@@ -5,8 +5,13 @@ use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec_env::create_env;
|
||||
use crate::sandboxing::ExecEnv;
|
||||
use crate::tools::events::ToolEmitter;
|
||||
use crate::tools::events::ToolEventCtx;
|
||||
use crate::tools::events::ToolEventStage;
|
||||
use crate::tools::orchestrator::ToolOrchestrator;
|
||||
use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest;
|
||||
use crate::tools::runtimes::unified_exec::UnifiedExecRuntime;
|
||||
@@ -14,6 +19,7 @@ use crate::tools::sandboxing::ToolCtx;
|
||||
|
||||
use super::ExecCommandRequest;
|
||||
use super::MIN_YIELD_TIME_MS;
|
||||
use super::SessionEntry;
|
||||
use super::UnifiedExecContext;
|
||||
use super::UnifiedExecError;
|
||||
use super::UnifiedExecResponse;
|
||||
@@ -30,7 +36,7 @@ impl UnifiedExecSessionManager {
|
||||
pub(crate) async fn exec_command(
|
||||
&self,
|
||||
request: ExecCommandRequest<'_>,
|
||||
context: &UnifiedExecContext<'_>,
|
||||
context: &UnifiedExecContext,
|
||||
) -> Result<UnifiedExecResponse, UnifiedExecError> {
|
||||
let shell_flag = if request.login { "-lc" } else { "-c" };
|
||||
let command = vec![
|
||||
@@ -59,17 +65,36 @@ impl UnifiedExecSessionManager {
|
||||
let session_id = if session.has_exited() {
|
||||
None
|
||||
} else {
|
||||
Some(self.store_session(session).await)
|
||||
Some(
|
||||
self.store_session(session, context, request.command, start)
|
||||
.await,
|
||||
)
|
||||
};
|
||||
|
||||
Ok(UnifiedExecResponse {
|
||||
let response = UnifiedExecResponse {
|
||||
event_call_id: context.call_id.clone(),
|
||||
chunk_id,
|
||||
wall_time,
|
||||
output,
|
||||
session_id,
|
||||
exit_code,
|
||||
original_token_count,
|
||||
})
|
||||
};
|
||||
|
||||
// If the command completed during this call, emit an ExecCommandEnd via the emitter.
|
||||
if response.session_id.is_none() {
|
||||
let exit = response.exit_code.unwrap_or(-1);
|
||||
Self::emit_exec_end_from_context(
|
||||
context,
|
||||
request.command.to_string(),
|
||||
response.output.clone(),
|
||||
exit,
|
||||
response.wall_time,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn write_stdin(
|
||||
@@ -98,37 +123,60 @@ impl UnifiedExecSessionManager {
|
||||
let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens);
|
||||
let chunk_id = generate_chunk_id();
|
||||
|
||||
let (session_id, exit_code) = self.refresh_session_state(session_id).await;
|
||||
let status = self.refresh_session_state(session_id).await;
|
||||
let (session_id, exit_code, completion_entry, event_call_id) = match status {
|
||||
SessionStatus::Alive { exit_code, call_id } => {
|
||||
(Some(session_id), exit_code, None, call_id)
|
||||
}
|
||||
SessionStatus::Exited { exit_code, entry } => {
|
||||
let call_id = entry.call_id.clone();
|
||||
(None, exit_code, Some(*entry), call_id)
|
||||
}
|
||||
SessionStatus::Unknown => {
|
||||
return Err(UnifiedExecError::UnknownSessionId { session_id });
|
||||
}
|
||||
};
|
||||
|
||||
Ok(UnifiedExecResponse {
|
||||
let response = UnifiedExecResponse {
|
||||
event_call_id,
|
||||
chunk_id,
|
||||
wall_time,
|
||||
output,
|
||||
session_id,
|
||||
exit_code,
|
||||
original_token_count,
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
async fn refresh_session_state(&self, session_id: i32) -> (Option<i32>, Option<i32>) {
|
||||
let mut sessions = self.sessions.lock().await;
|
||||
if !sessions.contains_key(&session_id) {
|
||||
return (None, None);
|
||||
if let (Some(exit), Some(entry)) = (response.exit_code, completion_entry) {
|
||||
let total_duration = Instant::now().saturating_duration_since(entry.started_at);
|
||||
Self::emit_exec_end_from_entry(entry, response.output.clone(), exit, total_duration)
|
||||
.await;
|
||||
}
|
||||
|
||||
let has_exited = sessions
|
||||
.get(&session_id)
|
||||
.map(UnifiedExecSession::has_exited)
|
||||
.unwrap_or(false);
|
||||
let exit_code = sessions
|
||||
.get(&session_id)
|
||||
.and_then(UnifiedExecSession::exit_code);
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
if has_exited {
|
||||
sessions.remove(&session_id);
|
||||
(None, exit_code)
|
||||
async fn refresh_session_state(&self, session_id: i32) -> SessionStatus {
|
||||
let mut sessions = self.sessions.lock().await;
|
||||
let Some(entry) = sessions.get(&session_id) else {
|
||||
return SessionStatus::Unknown;
|
||||
};
|
||||
|
||||
let exit_code = entry.session.exit_code();
|
||||
|
||||
if entry.session.has_exited() {
|
||||
let Some(entry) = sessions.remove(&session_id) else {
|
||||
return SessionStatus::Unknown;
|
||||
};
|
||||
SessionStatus::Exited {
|
||||
exit_code,
|
||||
entry: Box::new(entry),
|
||||
}
|
||||
} else {
|
||||
(Some(session_id), exit_code)
|
||||
SessionStatus::Alive {
|
||||
exit_code,
|
||||
call_id: entry.call_id.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,9 +186,9 @@ impl UnifiedExecSessionManager {
|
||||
) -> Result<(mpsc::Sender<Vec<u8>>, OutputBuffer, Arc<Notify>), UnifiedExecError> {
|
||||
let sessions = self.sessions.lock().await;
|
||||
let (output_buffer, output_notify, writer_tx) =
|
||||
if let Some(session) = sessions.get(&session_id) {
|
||||
let (buffer, notify) = session.output_handles();
|
||||
(buffer, notify, session.writer_sender())
|
||||
if let Some(entry) = sessions.get(&session_id) {
|
||||
let (buffer, notify) = entry.session.output_handles();
|
||||
(buffer, notify, entry.session.writer_sender())
|
||||
} else {
|
||||
return Err(UnifiedExecError::UnknownSessionId { session_id });
|
||||
};
|
||||
@@ -158,14 +206,82 @@ impl UnifiedExecSessionManager {
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)
|
||||
}
|
||||
|
||||
async fn store_session(&self, session: UnifiedExecSession) -> i32 {
|
||||
async fn store_session(
|
||||
&self,
|
||||
session: UnifiedExecSession,
|
||||
context: &UnifiedExecContext,
|
||||
command: &str,
|
||||
started_at: Instant,
|
||||
) -> i32 {
|
||||
let session_id = self
|
||||
.next_session_id
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
self.sessions.lock().await.insert(session_id, session);
|
||||
let entry = SessionEntry {
|
||||
session,
|
||||
session_ref: Arc::clone(&context.session),
|
||||
turn_ref: Arc::clone(&context.turn),
|
||||
call_id: context.call_id.clone(),
|
||||
command: command.to_string(),
|
||||
cwd: context.turn.cwd.clone(),
|
||||
started_at,
|
||||
};
|
||||
self.sessions.lock().await.insert(session_id, entry);
|
||||
session_id
|
||||
}
|
||||
|
||||
async fn emit_exec_end_from_entry(
|
||||
entry: SessionEntry,
|
||||
aggregated_output: String,
|
||||
exit_code: i32,
|
||||
duration: Duration,
|
||||
) {
|
||||
let output = ExecToolCallOutput {
|
||||
exit_code,
|
||||
stdout: StreamOutput::new(aggregated_output.clone()),
|
||||
stderr: StreamOutput::new(String::new()),
|
||||
aggregated_output: StreamOutput::new(aggregated_output),
|
||||
duration,
|
||||
timed_out: false,
|
||||
};
|
||||
let event_ctx = ToolEventCtx::new(
|
||||
entry.session_ref.as_ref(),
|
||||
entry.turn_ref.as_ref(),
|
||||
&entry.call_id,
|
||||
None,
|
||||
);
|
||||
let emitter = ToolEmitter::unified_exec(entry.command, entry.cwd, true);
|
||||
emitter
|
||||
.emit(event_ctx, ToolEventStage::Success(output))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn emit_exec_end_from_context(
|
||||
context: &UnifiedExecContext,
|
||||
command: String,
|
||||
aggregated_output: String,
|
||||
exit_code: i32,
|
||||
duration: Duration,
|
||||
) {
|
||||
let output = ExecToolCallOutput {
|
||||
exit_code,
|
||||
stdout: StreamOutput::new(aggregated_output.clone()),
|
||||
stderr: StreamOutput::new(String::new()),
|
||||
aggregated_output: StreamOutput::new(aggregated_output),
|
||||
duration,
|
||||
timed_out: false,
|
||||
};
|
||||
let event_ctx = ToolEventCtx::new(
|
||||
context.session.as_ref(),
|
||||
context.turn.as_ref(),
|
||||
&context.call_id,
|
||||
None,
|
||||
);
|
||||
let emitter = ToolEmitter::unified_exec(command, context.turn.cwd.clone(), true);
|
||||
emitter
|
||||
.emit(event_ctx, ToolEventStage::Success(output))
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn open_session_with_exec_env(
|
||||
&self,
|
||||
env: &ExecEnv,
|
||||
@@ -184,7 +300,7 @@ impl UnifiedExecSessionManager {
|
||||
pub(super) async fn open_session_with_sandbox(
|
||||
&self,
|
||||
command: Vec<String>,
|
||||
context: &UnifiedExecContext<'_>,
|
||||
context: &UnifiedExecContext,
|
||||
) -> Result<UnifiedExecSession, UnifiedExecError> {
|
||||
let mut orchestrator = ToolOrchestrator::new();
|
||||
let mut runtime = UnifiedExecRuntime::new(self);
|
||||
@@ -194,9 +310,9 @@ impl UnifiedExecSessionManager {
|
||||
create_env(&context.turn.shell_environment_policy),
|
||||
);
|
||||
let tool_ctx = ToolCtx {
|
||||
session: context.session,
|
||||
turn: context.turn,
|
||||
call_id: context.call_id.to_string(),
|
||||
session: context.session.as_ref(),
|
||||
turn: context.turn.as_ref(),
|
||||
call_id: context.call_id.clone(),
|
||||
tool_name: "exec_command".to_string(),
|
||||
};
|
||||
orchestrator
|
||||
@@ -204,7 +320,7 @@ impl UnifiedExecSessionManager {
|
||||
&mut runtime,
|
||||
&req,
|
||||
&tool_ctx,
|
||||
context.turn,
|
||||
context.turn.as_ref(),
|
||||
context.turn.approval_policy,
|
||||
)
|
||||
.await
|
||||
@@ -255,3 +371,15 @@ impl UnifiedExecSessionManager {
|
||||
collected
|
||||
}
|
||||
}
|
||||
|
||||
enum SessionStatus {
|
||||
Alive {
|
||||
exit_code: Option<i32>,
|
||||
call_id: String,
|
||||
},
|
||||
Exited {
|
||||
exit_code: Option<i32>,
|
||||
entry: Box<SessionEntry>,
|
||||
},
|
||||
Unknown,
|
||||
}
|
||||
|
||||
@@ -133,6 +133,247 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let call_id = "uexec-end-event";
|
||||
let args = json!({
|
||||
"cmd": "/bin/echo END-EVENT".to_string(),
|
||||
"yield_time_ms": 250,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-1", "finished"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "emit end event".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let end_event = wait_for_event_match(&codex, |msg| match msg {
|
||||
EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => Some(ev.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert_eq!(end_event.exit_code, 0);
|
||||
assert!(
|
||||
end_event.aggregated_output.contains("END-EVENT"),
|
||||
"expected aggregated output to contain marker"
|
||||
);
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let call_id = "uexec-delta-1";
|
||||
let args = json!({
|
||||
"cmd": "printf 'HELLO-UEXEC'",
|
||||
"yield_time_ms": 250,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-1", "finished"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "emit delta".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let delta = wait_for_event_match(&codex, |msg| match msg {
|
||||
EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == call_id => Some(ev.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let text = String::from_utf8_lossy(&delta.chunk).to_string();
|
||||
assert!(
|
||||
text.contains("HELLO-UEXEC"),
|
||||
"delta chunk missing expected text: {text:?}"
|
||||
);
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let open_call_id = "uexec-open";
|
||||
let open_args = json!({
|
||||
"cmd": "/bin/bash -i",
|
||||
"yield_time_ms": 200,
|
||||
});
|
||||
|
||||
let stdin_call_id = "uexec-stdin-delta";
|
||||
let stdin_args = json!({
|
||||
"chars": "echo WSTDIN-MARK\\n",
|
||||
"session_id": 0,
|
||||
"yield_time_ms": 800,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
open_call_id,
|
||||
"exec_command",
|
||||
&serde_json::to_string(&open_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
stdin_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&stdin_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "stdin delta".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Expect a delta event corresponding to the write_stdin call.
|
||||
let delta = wait_for_event_match(&codex, |msg| match msg {
|
||||
EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == open_call_id => {
|
||||
let text = String::from_utf8_lossy(&ev.chunk);
|
||||
if text.contains("WSTDIN-MARK") {
|
||||
Some(ev.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let text = String::from_utf8_lossy(&delta.chunk).to_string();
|
||||
assert!(
|
||||
text.contains("WSTDIN-MARK"),
|
||||
"stdin delta chunk missing expected text: {text:?}"
|
||||
);
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> {
|
||||
use tokio::time::Duration;
|
||||
@@ -516,6 +757,110 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let start_call_id = "uexec-end-on-exit-start";
|
||||
let start_args = serde_json::json!({
|
||||
"cmd": "/bin/cat",
|
||||
"yield_time_ms": 200,
|
||||
});
|
||||
|
||||
let echo_call_id = "uexec-end-on-exit-echo";
|
||||
let echo_args = serde_json::json!({
|
||||
"chars": "bye-END\n",
|
||||
"session_id": 0,
|
||||
"yield_time_ms": 300,
|
||||
});
|
||||
|
||||
let exit_call_id = "uexec-end-on-exit";
|
||||
let exit_args = serde_json::json!({
|
||||
"chars": "\u{0004}",
|
||||
"session_id": 0,
|
||||
"yield_time_ms": 500,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
start_call_id,
|
||||
"exec_command",
|
||||
&serde_json::to_string(&start_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
echo_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&echo_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_function_call(
|
||||
exit_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&exit_args)?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-4"),
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
];
|
||||
mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "end on exit".into(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// We expect the ExecCommandEnd event to match the initial exec_command call_id.
|
||||
let end_event = wait_for_event_match(&codex, |msg| match msg {
|
||||
EventMsg::ExecCommandEnd(ev) if ev.call_id == start_call_id => Some(ev.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert_eq!(end_event.exit_code, 0);
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user