From a6b94715482017e32dcda1bb5c8c5c73af874e77 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 23 Oct 2025 18:51:34 +0100 Subject: [PATCH] feat: end events on unified exec (#5551) --- codex-rs/core/src/tools/events.rs | 45 ++- .../core/src/tools/handlers/unified_exec.rs | 29 +- codex-rs/core/src/unified_exec/mod.rs | 42 ++- .../core/src/unified_exec/session_manager.rs | 194 ++++++++-- codex-rs/core/tests/suite/unified_exec.rs | 345 ++++++++++++++++++ 5 files changed, 602 insertions(+), 53 deletions(-) diff --git a/codex-rs/core/src/tools/events.rs b/codex-rs/core/src/tools/events.rs index d45ac7cd..55782955 100644 --- a/codex-rs/core/src/tools/events.rs +++ b/codex-rs/core/src/tools/events.rs @@ -200,10 +200,51 @@ impl ToolEmitter { ) => { emit_patch_end(ctx, String::new(), (*message).to_string(), false).await; } - (Self::UnifiedExec { command, cwd, .. }, _) => { - // TODO(jif) add end and failures. + (Self::UnifiedExec { command, cwd, .. }, ToolEventStage::Begin) => { emit_exec_command_begin(ctx, &[command.to_string()], cwd.as_path()).await; } + (Self::UnifiedExec { .. }, ToolEventStage::Success(output)) => { + emit_exec_end( + ctx, + output.stdout.text.clone(), + output.stderr.text.clone(), + output.aggregated_output.text.clone(), + output.exit_code, + output.duration, + format_exec_output_str(&output), + ) + .await; + } + ( + Self::UnifiedExec { .. }, + ToolEventStage::Failure(ToolEventFailure::Output(output)), + ) => { + emit_exec_end( + ctx, + output.stdout.text.clone(), + output.stderr.text.clone(), + output.aggregated_output.text.clone(), + output.exit_code, + output.duration, + format_exec_output_str(&output), + ) + .await; + } + ( + Self::UnifiedExec { .. }, + ToolEventStage::Failure(ToolEventFailure::Message(message)), + ) => { + emit_exec_end( + ctx, + String::new(), + (*message).to_string(), + (*message).to_string(), + -1, + Duration::ZERO, + format_exec_output(&message), + ) + .await; + } } } diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index 0ad3739c..7d110212 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -5,6 +5,9 @@ use serde::Deserialize; use serde::Serialize; use crate::function_tool::FunctionCallError; +use crate::protocol::EventMsg; +use crate::protocol::ExecCommandOutputDeltaEvent; +use crate::protocol::ExecOutputStream; use crate::tools::context::ToolInvocation; use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; @@ -87,11 +90,7 @@ impl ToolHandler for UnifiedExecHandler { }; let manager: &UnifiedExecSessionManager = &session.services.unified_exec_manager; - let context = UnifiedExecContext { - session: &session, - turn: turn.as_ref(), - call_id: &call_id, - }; + let context = UnifiedExecContext::new(session.clone(), turn.clone(), call_id.clone()); let response = match tool_name.as_str() { "exec_command" => { @@ -101,8 +100,12 @@ impl ToolHandler for UnifiedExecHandler { )) })?; - let event_ctx = - ToolEventCtx::new(context.session, context.turn, context.call_id, None); + let event_ctx = ToolEventCtx::new( + context.session.as_ref(), + context.turn.as_ref(), + &context.call_id, + None, + ); let emitter = ToolEmitter::unified_exec(args.cmd.clone(), context.turn.cwd.clone(), true); emitter.emit(event_ctx, ToolEventStage::Begin).await; @@ -148,6 +151,18 @@ impl ToolHandler for UnifiedExecHandler { } }; + // Emit a delta event with the chunk of output we just produced, if any. + if !response.output.is_empty() { + let delta = ExecCommandOutputDeltaEvent { + call_id: response.event_call_id.clone(), + stream: ExecOutputStream::Stdout, + chunk: response.output.as_bytes().to_vec(), + }; + session + .send_event(turn.as_ref(), EventMsg::ExecCommandOutputDelta(delta)) + .await; + } + let content = serialize_response(&response).map_err(|err| { FunctionCallError::RespondToModel(format!( "failed to serialize unified exec output: {err:?}" diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index d575929d..a1e2c0c4 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -22,6 +22,8 @@ //! - `session_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling. use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; use std::sync::atomic::AtomicI32; use std::time::Duration; @@ -45,10 +47,20 @@ pub(crate) const MAX_YIELD_TIME_MS: u64 = 30_000; pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000; pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB -pub(crate) struct UnifiedExecContext<'a> { - pub session: &'a Session, - pub turn: &'a TurnContext, - pub call_id: &'a str, +pub(crate) struct UnifiedExecContext { + pub session: Arc, + pub turn: Arc, + pub call_id: String, +} + +impl UnifiedExecContext { + pub fn new(session: Arc, turn: Arc, call_id: String) -> Self { + Self { + session, + turn, + call_id, + } + } } #[derive(Debug)] @@ -70,6 +82,7 @@ pub(crate) struct WriteStdinRequest<'a> { #[derive(Debug, Clone, PartialEq)] pub(crate) struct UnifiedExecResponse { + pub event_call_id: String, pub chunk_id: String, pub wall_time: Duration, pub output: String, @@ -78,10 +91,20 @@ pub(crate) struct UnifiedExecResponse { pub original_token_count: Option, } -#[derive(Debug, Default)] +#[derive(Default)] pub(crate) struct UnifiedExecSessionManager { next_session_id: AtomicI32, - sessions: Mutex>, + sessions: Mutex>, +} + +struct SessionEntry { + session: session::UnifiedExecSession, + session_ref: Arc, + turn_ref: Arc, + call_id: String, + command: String, + cwd: PathBuf, + started_at: tokio::time::Instant, } pub(crate) fn clamp_yield_time(yield_time_ms: Option) -> u64 { @@ -163,11 +186,8 @@ mod tests { cmd: &str, yield_time_ms: Option, ) -> Result { - let context = UnifiedExecContext { - session, - turn: turn.as_ref(), - call_id: "call", - }; + let context = + UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string()); session .services diff --git a/codex-rs/core/src/unified_exec/session_manager.rs b/codex-rs/core/src/unified_exec/session_manager.rs index 77124a7d..7f4727d0 100644 --- a/codex-rs/core/src/unified_exec/session_manager.rs +++ b/codex-rs/core/src/unified_exec/session_manager.rs @@ -5,8 +5,13 @@ use tokio::sync::mpsc; use tokio::time::Duration; use tokio::time::Instant; +use crate::exec::ExecToolCallOutput; +use crate::exec::StreamOutput; use crate::exec_env::create_env; use crate::sandboxing::ExecEnv; +use crate::tools::events::ToolEmitter; +use crate::tools::events::ToolEventCtx; +use crate::tools::events::ToolEventStage; use crate::tools::orchestrator::ToolOrchestrator; use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest; use crate::tools::runtimes::unified_exec::UnifiedExecRuntime; @@ -14,6 +19,7 @@ use crate::tools::sandboxing::ToolCtx; use super::ExecCommandRequest; use super::MIN_YIELD_TIME_MS; +use super::SessionEntry; use super::UnifiedExecContext; use super::UnifiedExecError; use super::UnifiedExecResponse; @@ -30,7 +36,7 @@ impl UnifiedExecSessionManager { pub(crate) async fn exec_command( &self, request: ExecCommandRequest<'_>, - context: &UnifiedExecContext<'_>, + context: &UnifiedExecContext, ) -> Result { let shell_flag = if request.login { "-lc" } else { "-c" }; let command = vec![ @@ -59,17 +65,36 @@ impl UnifiedExecSessionManager { let session_id = if session.has_exited() { None } else { - Some(self.store_session(session).await) + Some( + self.store_session(session, context, request.command, start) + .await, + ) }; - Ok(UnifiedExecResponse { + let response = UnifiedExecResponse { + event_call_id: context.call_id.clone(), chunk_id, wall_time, output, session_id, exit_code, original_token_count, - }) + }; + + // If the command completed during this call, emit an ExecCommandEnd via the emitter. + if response.session_id.is_none() { + let exit = response.exit_code.unwrap_or(-1); + Self::emit_exec_end_from_context( + context, + request.command.to_string(), + response.output.clone(), + exit, + response.wall_time, + ) + .await; + } + + Ok(response) } pub(crate) async fn write_stdin( @@ -98,37 +123,60 @@ impl UnifiedExecSessionManager { let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens); let chunk_id = generate_chunk_id(); - let (session_id, exit_code) = self.refresh_session_state(session_id).await; + let status = self.refresh_session_state(session_id).await; + let (session_id, exit_code, completion_entry, event_call_id) = match status { + SessionStatus::Alive { exit_code, call_id } => { + (Some(session_id), exit_code, None, call_id) + } + SessionStatus::Exited { exit_code, entry } => { + let call_id = entry.call_id.clone(); + (None, exit_code, Some(*entry), call_id) + } + SessionStatus::Unknown => { + return Err(UnifiedExecError::UnknownSessionId { session_id }); + } + }; - Ok(UnifiedExecResponse { + let response = UnifiedExecResponse { + event_call_id, chunk_id, wall_time, output, session_id, exit_code, original_token_count, - }) - } + }; - async fn refresh_session_state(&self, session_id: i32) -> (Option, Option) { - let mut sessions = self.sessions.lock().await; - if !sessions.contains_key(&session_id) { - return (None, None); + if let (Some(exit), Some(entry)) = (response.exit_code, completion_entry) { + let total_duration = Instant::now().saturating_duration_since(entry.started_at); + Self::emit_exec_end_from_entry(entry, response.output.clone(), exit, total_duration) + .await; } - let has_exited = sessions - .get(&session_id) - .map(UnifiedExecSession::has_exited) - .unwrap_or(false); - let exit_code = sessions - .get(&session_id) - .and_then(UnifiedExecSession::exit_code); + Ok(response) + } - if has_exited { - sessions.remove(&session_id); - (None, exit_code) + async fn refresh_session_state(&self, session_id: i32) -> SessionStatus { + let mut sessions = self.sessions.lock().await; + let Some(entry) = sessions.get(&session_id) else { + return SessionStatus::Unknown; + }; + + let exit_code = entry.session.exit_code(); + + if entry.session.has_exited() { + let Some(entry) = sessions.remove(&session_id) else { + return SessionStatus::Unknown; + }; + SessionStatus::Exited { + exit_code, + entry: Box::new(entry), + } } else { - (Some(session_id), exit_code) + SessionStatus::Alive { + exit_code, + call_id: entry.call_id.clone(), + } } } @@ -138,9 +186,9 @@ impl UnifiedExecSessionManager { ) -> Result<(mpsc::Sender>, OutputBuffer, Arc), UnifiedExecError> { let sessions = self.sessions.lock().await; let (output_buffer, output_notify, writer_tx) = - if let Some(session) = sessions.get(&session_id) { - let (buffer, notify) = session.output_handles(); - (buffer, notify, session.writer_sender()) + if let Some(entry) = sessions.get(&session_id) { + let (buffer, notify) = entry.session.output_handles(); + (buffer, notify, entry.session.writer_sender()) } else { return Err(UnifiedExecError::UnknownSessionId { session_id }); }; @@ -158,14 +206,82 @@ impl UnifiedExecSessionManager { .map_err(|_| UnifiedExecError::WriteToStdin) } - async fn store_session(&self, session: UnifiedExecSession) -> i32 { + async fn store_session( + &self, + session: UnifiedExecSession, + context: &UnifiedExecContext, + command: &str, + started_at: Instant, + ) -> i32 { let session_id = self .next_session_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - self.sessions.lock().await.insert(session_id, session); + let entry = SessionEntry { + session, + session_ref: Arc::clone(&context.session), + turn_ref: Arc::clone(&context.turn), + call_id: context.call_id.clone(), + command: command.to_string(), + cwd: context.turn.cwd.clone(), + started_at, + }; + self.sessions.lock().await.insert(session_id, entry); session_id } + async fn emit_exec_end_from_entry( + entry: SessionEntry, + aggregated_output: String, + exit_code: i32, + duration: Duration, + ) { + let output = ExecToolCallOutput { + exit_code, + stdout: StreamOutput::new(aggregated_output.clone()), + stderr: StreamOutput::new(String::new()), + aggregated_output: StreamOutput::new(aggregated_output), + duration, + timed_out: false, + }; + let event_ctx = ToolEventCtx::new( + entry.session_ref.as_ref(), + entry.turn_ref.as_ref(), + &entry.call_id, + None, + ); + let emitter = ToolEmitter::unified_exec(entry.command, entry.cwd, true); + emitter + .emit(event_ctx, ToolEventStage::Success(output)) + .await; + } + + async fn emit_exec_end_from_context( + context: &UnifiedExecContext, + command: String, + aggregated_output: String, + exit_code: i32, + duration: Duration, + ) { + let output = ExecToolCallOutput { + exit_code, + stdout: StreamOutput::new(aggregated_output.clone()), + stderr: StreamOutput::new(String::new()), + aggregated_output: StreamOutput::new(aggregated_output), + duration, + timed_out: false, + }; + let event_ctx = ToolEventCtx::new( + context.session.as_ref(), + context.turn.as_ref(), + &context.call_id, + None, + ); + let emitter = ToolEmitter::unified_exec(command, context.turn.cwd.clone(), true); + emitter + .emit(event_ctx, ToolEventStage::Success(output)) + .await; + } + pub(crate) async fn open_session_with_exec_env( &self, env: &ExecEnv, @@ -184,7 +300,7 @@ impl UnifiedExecSessionManager { pub(super) async fn open_session_with_sandbox( &self, command: Vec, - context: &UnifiedExecContext<'_>, + context: &UnifiedExecContext, ) -> Result { let mut orchestrator = ToolOrchestrator::new(); let mut runtime = UnifiedExecRuntime::new(self); @@ -194,9 +310,9 @@ impl UnifiedExecSessionManager { create_env(&context.turn.shell_environment_policy), ); let tool_ctx = ToolCtx { - session: context.session, - turn: context.turn, - call_id: context.call_id.to_string(), + session: context.session.as_ref(), + turn: context.turn.as_ref(), + call_id: context.call_id.clone(), tool_name: "exec_command".to_string(), }; orchestrator @@ -204,7 +320,7 @@ impl UnifiedExecSessionManager { &mut runtime, &req, &tool_ctx, - context.turn, + context.turn.as_ref(), context.turn.approval_policy, ) .await @@ -255,3 +371,15 @@ impl UnifiedExecSessionManager { collected } } + +enum SessionStatus { + Alive { + exit_code: Option, + call_id: String, + }, + Exited { + exit_code: Option, + entry: Box, + }, + Unknown, +} diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 710eb0bc..5d5c21d0 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -133,6 +133,247 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_emits_exec_command_end_event() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.use_experimental_unified_exec_tool = true; + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let call_id = "uexec-end-event"; + let args = json!({ + "cmd": "/bin/echo END-EVENT".to_string(), + "yield_time_ms": 250, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-1", "finished"), + ev_completed("resp-2"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "emit end event".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + let end_event = wait_for_event_match(&codex, |msg| match msg { + EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => Some(ev.clone()), + _ => None, + }) + .await; + + assert_eq!(end_event.exit_code, 0); + assert!( + end_event.aggregated_output.contains("END-EVENT"), + "expected aggregated output to contain marker" + ); + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.use_experimental_unified_exec_tool = true; + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let call_id = "uexec-delta-1"; + let args = json!({ + "cmd": "printf 'HELLO-UEXEC'", + "yield_time_ms": 250, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-1", "finished"), + ev_completed("resp-2"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "emit delta".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + let delta = wait_for_event_match(&codex, |msg| match msg { + EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == call_id => Some(ev.clone()), + _ => None, + }) + .await; + + let text = String::from_utf8_lossy(&delta.chunk).to_string(); + assert!( + text.contains("HELLO-UEXEC"), + "delta chunk missing expected text: {text:?}" + ); + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.use_experimental_unified_exec_tool = true; + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let open_call_id = "uexec-open"; + let open_args = json!({ + "cmd": "/bin/bash -i", + "yield_time_ms": 200, + }); + + let stdin_call_id = "uexec-stdin-delta"; + let stdin_args = json!({ + "chars": "echo WSTDIN-MARK\\n", + "session_id": 0, + "yield_time_ms": 800, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + open_call_id, + "exec_command", + &serde_json::to_string(&open_args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + stdin_call_id, + "write_stdin", + &serde_json::to_string(&stdin_args)?, + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-3"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "stdin delta".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + // Expect a delta event corresponding to the write_stdin call. + let delta = wait_for_event_match(&codex, |msg| match msg { + EventMsg::ExecCommandOutputDelta(ev) if ev.call_id == open_call_id => { + let text = String::from_utf8_lossy(&ev.chunk); + if text.contains("WSTDIN-MARK") { + Some(ev.clone()) + } else { + None + } + } + _ => None, + }) + .await; + + let text = String::from_utf8_lossy(&delta.chunk).to_string(); + assert!( + text.contains("WSTDIN-MARK"), + "stdin delta chunk missing expected text: {text:?}" + ); + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> { use tokio::time::Duration; @@ -516,6 +757,110 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.use_experimental_unified_exec_tool = true; + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let start_call_id = "uexec-end-on-exit-start"; + let start_args = serde_json::json!({ + "cmd": "/bin/cat", + "yield_time_ms": 200, + }); + + let echo_call_id = "uexec-end-on-exit-echo"; + let echo_args = serde_json::json!({ + "chars": "bye-END\n", + "session_id": 0, + "yield_time_ms": 300, + }); + + let exit_call_id = "uexec-end-on-exit"; + let exit_args = serde_json::json!({ + "chars": "\u{0004}", + "session_id": 0, + "yield_time_ms": 500, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + start_call_id, + "exec_command", + &serde_json::to_string(&start_args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + echo_call_id, + "write_stdin", + &serde_json::to_string(&echo_args)?, + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_function_call( + exit_call_id, + "write_stdin", + &serde_json::to_string(&exit_args)?, + ), + ev_completed("resp-3"), + ]), + sse(vec![ + ev_response_created("resp-4"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-4"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "end on exit".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + // We expect the ExecCommandEnd event to match the initial exec_command call_id. + let end_event = wait_for_event_match(&codex, |msg| match msg { + EventMsg::ExecCommandEnd(ev) if ev.call_id == start_call_id => Some(ev.clone()), + _ => None, + }) + .await; + + assert_eq!(end_event.exit_code, 0); + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn unified_exec_reuses_session_via_stdin() -> Result<()> { skip_if_no_network!(Ok(()));