diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index d54fc0b4..66b67ae9 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -24,6 +24,7 @@ use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id_from_str; use core_test_support::skip_if_no_network; use core_test_support::wait_for_event; +use core_test_support::wait_for_event_with_timeout; use pretty_assertions::assert_eq; use std::path::PathBuf; use std::sync::Arc; @@ -260,25 +261,28 @@ async fn review_does_not_emit_agent_message_on_structured_output() { .unwrap(); // Drain events until TaskComplete; ensure none are AgentMessage. - use tokio::time::Duration; - use tokio::time::timeout; let mut saw_entered = false; let mut saw_exited = false; - loop { - let ev = timeout(Duration::from_secs(5), codex.next_event()) - .await - .expect("timeout waiting for event") - .expect("stream ended unexpectedly"); - match ev.msg { - EventMsg::TaskComplete(_) => break, + wait_for_event_with_timeout( + &codex, + |event| match event { + EventMsg::TaskComplete(_) => true, EventMsg::AgentMessage(_) => { panic!("unexpected AgentMessage during review with structured output") } - EventMsg::EnteredReviewMode(_) => saw_entered = true, - EventMsg::ExitedReviewMode(_) => saw_exited = true, - _ => {} - } - } + EventMsg::EnteredReviewMode(_) => { + saw_entered = true; + false + } + EventMsg::ExitedReviewMode(_) => { + saw_exited = true; + false + } + _ => false, + }, + tokio::time::Duration::from_secs(5), + ) + .await; assert!(saw_entered && saw_exited, "missing review lifecycle events"); server.verify().await; diff --git a/codex-rs/core/tests/suite/shell_serialization.rs b/codex-rs/core/tests/suite/shell_serialization.rs index f8f9960b..599a9203 100644 --- a/codex-rs/core/tests/suite/shell_serialization.rs +++ b/codex-rs/core/tests/suite/shell_serialization.rs @@ -17,6 +17,7 @@ use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; use serde_json::Value; use serde_json::json; @@ -38,12 +39,10 @@ async fn submit_turn(test: &TestCodex, prompt: &str, sandbox_policy: SandboxPoli }) .await?; - loop { - let event = test.codex.next_event().await?; - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TaskComplete(_)) + }) + .await; Ok(()) } diff --git a/codex-rs/core/tests/suite/stream_no_completed.rs b/codex-rs/core/tests/suite/stream_no_completed.rs index ce33d807..4ef2b04f 100644 --- a/codex-rs/core/tests/suite/stream_no_completed.rs +++ b/codex-rs/core/tests/suite/stream_no_completed.rs @@ -13,7 +13,7 @@ use core_test_support::load_sse_fixture_with_id; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; -use tokio::time::timeout; +use core_test_support::wait_for_event_with_timeout; use wiremock::Mock; use wiremock::MockServer; use wiremock::Request; @@ -102,13 +102,10 @@ async fn retries_on_early_close() { .unwrap(); // Wait until TaskComplete (should succeed after retry). - loop { - let ev = timeout(Duration::from_secs(10), codex.next_event()) - .await - .unwrap() - .unwrap(); - if matches!(ev.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event_with_timeout( + &codex, + |event| matches!(event, EventMsg::TaskComplete(_)), + Duration::from_secs(10), + ) + .await; } diff --git a/codex-rs/core/tests/suite/tool_harness.rs b/codex-rs/core/tests/suite/tool_harness.rs index 156a7a95..b6feffc9 100644 --- a/codex-rs/core/tests/suite/tool_harness.rs +++ b/codex-rs/core/tests/suite/tool_harness.rs @@ -19,6 +19,7 @@ use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; use serde_json::Value; use serde_json::json; use wiremock::matchers::any; @@ -99,12 +100,7 @@ async fn shell_tool_executes_command_and_streams_output() -> anyhow::Result<()> }) .await?; - loop { - let event = codex.next_event().await.expect("event"); - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; let requests = server.received_requests().await.expect("recorded requests"); assert!(!requests.is_empty(), "expected at least one POST request"); @@ -189,23 +185,21 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> { .await?; let mut saw_plan_update = false; - - loop { - let event = codex.next_event().await.expect("event"); - match event.msg { - EventMsg::PlanUpdate(update) => { - saw_plan_update = true; - assert_eq!(update.explanation.as_deref(), Some("Tool harness check")); - assert_eq!(update.plan.len(), 2); - assert_eq!(update.plan[0].step, "Inspect workspace"); - assert!(matches!(update.plan[0].status, StepStatus::InProgress)); - assert_eq!(update.plan[1].step, "Report results"); - assert!(matches!(update.plan[1].status, StepStatus::Pending)); - } - EventMsg::TaskComplete(_) => break, - _ => {} + wait_for_event(&codex, |event| match event { + EventMsg::PlanUpdate(update) => { + saw_plan_update = true; + assert_eq!(update.explanation.as_deref(), Some("Tool harness check")); + assert_eq!(update.plan.len(), 2); + assert_eq!(update.plan[0].step, "Inspect workspace"); + assert!(matches!(update.plan[0].status, StepStatus::InProgress)); + assert_eq!(update.plan[1].step, "Report results"); + assert!(matches!(update.plan[1].status, StepStatus::Pending)); + false } - } + EventMsg::TaskComplete(_) => true, + _ => false, + }) + .await; assert!(saw_plan_update, "expected PlanUpdate event"); @@ -286,15 +280,15 @@ async fn update_plan_tool_rejects_malformed_payload() -> anyhow::Result<()> { .await?; let mut saw_plan_update = false; - - loop { - let event = codex.next_event().await.expect("event"); - match event.msg { - EventMsg::PlanUpdate(_) => saw_plan_update = true, - EventMsg::TaskComplete(_) => break, - _ => {} + wait_for_event(&codex, |event| match event { + EventMsg::PlanUpdate(_) => { + saw_plan_update = true; + false } - } + EventMsg::TaskComplete(_) => true, + _ => false, + }) + .await; assert!( !saw_plan_update, @@ -393,22 +387,21 @@ async fn apply_patch_tool_executes_and_emits_patch_events() -> anyhow::Result<() let mut saw_patch_begin = false; let mut patch_end_success = None; - - loop { - let event = codex.next_event().await.expect("event"); - match event.msg { - EventMsg::PatchApplyBegin(begin) => { - saw_patch_begin = true; - assert_eq!(begin.call_id, call_id); - } - EventMsg::PatchApplyEnd(end) => { - assert_eq!(end.call_id, call_id); - patch_end_success = Some(end.success); - } - EventMsg::TaskComplete(_) => break, - _ => {} + wait_for_event(&codex, |event| match event { + EventMsg::PatchApplyBegin(begin) => { + saw_patch_begin = true; + assert_eq!(begin.call_id, call_id); + false } - } + EventMsg::PatchApplyEnd(end) => { + assert_eq!(end.call_id, call_id); + patch_end_success = Some(end.success); + false + } + EventMsg::TaskComplete(_) => true, + _ => false, + }) + .await; assert!(saw_patch_begin, "expected PatchApplyBegin event"); let patch_end_success = @@ -521,12 +514,7 @@ async fn apply_patch_reports_parse_diagnostics() -> anyhow::Result<()> { }) .await?; - loop { - let event = codex.next_event().await.expect("event"); - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; let requests = server.received_requests().await.expect("recorded requests"); assert!(!requests.is_empty(), "expected at least one POST request"); diff --git a/codex-rs/core/tests/suite/tools.rs b/codex-rs/core/tests/suite/tools.rs index 99290852..0dcd40c2 100644 --- a/codex-rs/core/tests/suite/tools.rs +++ b/codex-rs/core/tests/suite/tools.rs @@ -19,6 +19,7 @@ use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; use serde_json::Value; use serde_json::json; use wiremock::Request; @@ -46,12 +47,10 @@ async fn submit_turn( }) .await?; - loop { - let event = test.codex.next_event().await?; - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TaskComplete(_)) + }) + .await; Ok(()) } diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index c81c0ba9..0676c25b 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -19,6 +19,7 @@ use core_test_support::skip_if_no_network; use core_test_support::skip_if_sandbox; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; use serde_json::Value; fn extract_output_text(item: &Value) -> Option<&str> { @@ -122,12 +123,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { }) .await?; - loop { - let event = codex.next_event().await.expect("event"); - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; let requests = server.received_requests().await.expect("recorded requests"); assert!(!requests.is_empty(), "expected at least one POST request"); diff --git a/codex-rs/core/tests/suite/view_image.rs b/codex-rs/core/tests/suite/view_image.rs index 92fbf4ad..4a953b98 100644 --- a/codex-rs/core/tests/suite/view_image.rs +++ b/codex-rs/core/tests/suite/view_image.rs @@ -17,6 +17,7 @@ use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; use serde_json::Value; use wiremock::matchers::any; @@ -121,16 +122,20 @@ async fn view_image_tool_attaches_local_image() -> anyhow::Result<()> { .await?; let mut tool_event = None; - loop { - let event = codex.next_event().await.expect("event"); - match event.msg { - EventMsg::ViewImageToolCall(ev) => tool_event = Some(ev), - EventMsg::TaskComplete(_) => break, - _ => {} + wait_for_event(&codex, |event| match event { + EventMsg::ViewImageToolCall(_) => { + tool_event = Some(event.clone()); + false } - } + EventMsg::TaskComplete(_) => true, + _ => false, + }) + .await; - let tool_event = tool_event.expect("view image tool event emitted"); + let tool_event = match tool_event.expect("view image tool event emitted") { + EventMsg::ViewImageToolCall(event) => event, + _ => unreachable!("stored event must be ViewImageToolCall"), + }; assert_eq!(tool_event.call_id, call_id); assert_eq!(tool_event.path, abs_path); @@ -229,12 +234,7 @@ async fn view_image_tool_errors_when_path_is_directory() -> anyhow::Result<()> { }) .await?; - loop { - let event = codex.next_event().await.expect("event"); - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; let requests = server.received_requests().await.expect("recorded requests"); assert!( @@ -314,12 +314,7 @@ async fn view_image_tool_errors_when_file_missing() -> anyhow::Result<()> { }) .await?; - loop { - let event = codex.next_event().await.expect("event"); - if matches!(event.msg, EventMsg::TaskComplete(_)) { - break; - } - } + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; let requests = server.received_requests().await.expect("recorded requests"); assert!(