Prefer wait_for_event over wait_for_event_with_timeout (#6349)

This commit is contained in:
pakrym-oai
2025-11-06 18:11:11 -08:00
committed by GitHub
parent 316352be94
commit e8905f6d20
9 changed files with 124 additions and 356 deletions

View File

@@ -13,7 +13,7 @@ use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event_with_timeout;
use core_test_support::wait_for_event;
use regex_lite::Regex;
use serde_json::json;
@@ -42,8 +42,6 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
let codex = test_codex().build(&server).await.unwrap().codex;
let wait_timeout = Duration::from_secs(5);
// Kick off a turn that triggers the function call.
codex
.submit(Op::UserInput {
@@ -55,22 +53,12 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
.unwrap();
// Wait until the exec begins to avoid a race, then interrupt.
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
wait_timeout,
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
codex.submit(Op::Interrupt).await.unwrap();
// Expect TurnAborted soon after.
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TurnAborted(_)),
wait_timeout,
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
}
/// After an interrupt we expect the next request to the model to include both
@@ -107,8 +95,6 @@ async fn interrupt_tool_records_history_entries() {
let fixture = test_codex().build(&server).await.unwrap();
let codex = Arc::clone(&fixture.codex);
let wait_timeout = Duration::from_millis(100);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
@@ -118,22 +104,12 @@ async fn interrupt_tool_records_history_entries() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
wait_timeout,
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TurnAborted(_)),
wait_timeout,
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
codex
.submit(Op::UserInput {
@@ -144,12 +120,7 @@ async fn interrupt_tool_records_history_entries() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
wait_timeout,
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = response_mock.requests();
assert!(

View File

@@ -23,14 +23,12 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::env;
use std::fs;
use std::path::PathBuf;
use std::time::Duration;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
@@ -423,16 +421,12 @@ async fn expect_exec_approval(
test: &TestCodex,
expected_command: &[String],
) -> ExecApprovalRequestEvent {
let event = wait_for_event_with_timeout(
&test.codex,
|event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TaskComplete(_)
)
},
Duration::from_secs(5),
)
let event = wait_for_event(&test.codex, |event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TaskComplete(_)
)
})
.await;
match event {
@@ -449,16 +443,12 @@ async fn expect_patch_approval(
test: &TestCodex,
expected_call_id: &str,
) -> ApplyPatchApprovalRequestEvent {
let event = wait_for_event_with_timeout(
&test.codex,
|event| {
matches!(
event,
EventMsg::ApplyPatchApprovalRequest(_) | EventMsg::TaskComplete(_)
)
},
Duration::from_secs(5),
)
let event = wait_for_event(&test.codex, |event| {
matches!(
event,
EventMsg::ApplyPatchApprovalRequest(_) | EventMsg::TaskComplete(_)
)
})
.await;
match event {
@@ -472,16 +462,12 @@ async fn expect_patch_approval(
}
async fn wait_for_completion_without_approval(test: &TestCodex) {
let event = wait_for_event_with_timeout(
&test.codex,
|event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TaskComplete(_)
)
},
Duration::from_secs(5),
)
let event = wait_for_event(&test.codex, |event| {
matches!(
event,
EventMsg::ExecApprovalRequest(_) | EventMsg::TaskComplete(_)
)
})
.await;
match event {

View File

@@ -32,7 +32,6 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use futures::StreamExt;
use serde_json::json;
use std::io::Write;
@@ -1117,26 +1116,20 @@ async fn context_window_error_sets_total_tokens_to_model_window() -> anyhow::Res
})
.await?;
use std::time::Duration;
let token_event = wait_for_event_with_timeout(
&codex,
|event| {
matches!(
event,
EventMsg::TokenCount(payload)
if payload.info.as_ref().is_some_and(|info| {
info.model_context_window == Some(info.total_token_usage.total_tokens)
&& info.total_token_usage.total_tokens > 0
})
)
},
Duration::from_secs(5),
)
let token_event = wait_for_event(&codex, |event| {
matches!(
event,
EventMsg::TokenCount(payload)
if payload.info.as_ref().is_some_and(|info| {
info.model_context_window == Some(info.total_token_usage.total_tokens)
&& info.total_token_usage.total_tokens > 0
})
)
})
.await;
let EventMsg::TokenCount(token_payload) = token_event else {
unreachable!("wait_for_event_with_timeout returned unexpected event");
unreachable!("wait_for_event returned unexpected event");
};
let info = token_payload

View File

@@ -14,8 +14,7 @@ use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event_with_timeout;
use std::time::Duration;
use core_test_support::wait_for_event;
use tracing_test::traced_test;
use core_test_support::responses::ev_local_shell_call;
@@ -38,12 +37,7 @@ async fn responses_api_emits_api_request_event() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -84,12 +78,7 @@ async fn process_sse_emits_tracing_for_output_item() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -128,12 +117,7 @@ async fn process_sse_emits_failed_event_on_parse_error() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -173,12 +157,7 @@ async fn process_sse_records_failed_event_when_stream_closes_without_completed()
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -230,12 +209,7 @@ async fn process_sse_failed_event_records_response_error_message() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -285,12 +259,7 @@ async fn process_sse_failed_event_logs_parse_error() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -335,12 +304,7 @@ async fn process_sse_failed_event_logs_missing_error() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -385,12 +349,7 @@ async fn process_sse_failed_event_logs_response_completed_parse_error() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -440,12 +399,7 @@ async fn process_sse_emits_completed_telemetry() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
logs_assert(|lines: &[&str]| {
lines
@@ -500,12 +454,7 @@ async fn handle_response_item_records_tool_result_for_custom_tool_call() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(|lines: &[&str]| {
let line = lines
@@ -564,12 +513,7 @@ async fn handle_response_item_records_tool_result_for_function_call() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(|lines: &[&str]| {
let line = lines
@@ -638,12 +582,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_missing_ids()
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(|lines: &[&str]| {
let line = lines
@@ -696,12 +635,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_call() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(|lines: &[&str]| {
let line = lines
@@ -794,12 +728,7 @@ async fn handle_container_exec_autoapprove_from_config_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(tool_decision_assertion(
"auto_config_call",
@@ -840,12 +769,7 @@ async fn handle_container_exec_user_approved_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecApprovalRequest(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
codex
.submit(Op::ExecApproval {
@@ -855,12 +779,7 @@ async fn handle_container_exec_user_approved_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(tool_decision_assertion(
"user_approved_call",
@@ -902,12 +821,7 @@ async fn handle_container_exec_user_approved_for_session_records_tool_decision()
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecApprovalRequest(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
codex
.submit(Op::ExecApproval {
@@ -917,12 +831,7 @@ async fn handle_container_exec_user_approved_for_session_records_tool_decision()
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(tool_decision_assertion(
"user_approved_session_call",
@@ -964,12 +873,7 @@ async fn handle_sandbox_error_user_approves_retry_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecApprovalRequest(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
codex
.submit(Op::ExecApproval {
@@ -979,12 +883,7 @@ async fn handle_sandbox_error_user_approves_retry_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(tool_decision_assertion(
"sandbox_retry_call",
@@ -1026,12 +925,7 @@ async fn handle_container_exec_user_denies_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecApprovalRequest(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
codex
.submit(Op::ExecApproval {
@@ -1041,12 +935,7 @@ async fn handle_container_exec_user_denies_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(tool_decision_assertion(
"user_denied_call",
@@ -1088,12 +977,7 @@ async fn handle_sandbox_error_user_approves_for_session_records_tool_decision()
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecApprovalRequest(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
codex
.submit(Op::ExecApproval {
@@ -1103,12 +987,7 @@ async fn handle_sandbox_error_user_approves_for_session_records_tool_decision()
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(tool_decision_assertion(
"sandbox_session_call",
@@ -1150,12 +1029,7 @@ async fn handle_sandbox_error_user_denies_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecApprovalRequest(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
codex
.submit(Op::ExecApproval {
@@ -1165,12 +1039,7 @@ async fn handle_sandbox_error_user_denies_records_tool_decision() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TokenCount(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
logs_assert(tool_decision_assertion(
"sandbox_deny_call",

View File

@@ -23,7 +23,6 @@ use core_test_support::load_default_config_for_test;
use core_test_support::load_sse_fixture_with_id_from_str;
use core_test_support::skip_if_no_network;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use std::sync::Arc;
@@ -246,37 +245,31 @@ async fn review_filters_agent_message_related_events() {
let mut saw_exited = false;
// Drain until TaskComplete; assert filtered events never surface.
wait_for_event_with_timeout(
&codex,
|event| match event {
EventMsg::TaskComplete(_) => true,
EventMsg::EnteredReviewMode(_) => {
saw_entered = true;
false
wait_for_event(&codex, |event| match event {
EventMsg::TaskComplete(_) => true,
EventMsg::EnteredReviewMode(_) => {
saw_entered = true;
false
}
EventMsg::ExitedReviewMode(_) => {
saw_exited = true;
false
}
// The following must be filtered by review flow
EventMsg::AgentMessageContentDelta(_) => {
panic!("unexpected AgentMessageContentDelta surfaced during review")
}
EventMsg::AgentMessageDelta(_) => {
panic!("unexpected AgentMessageDelta surfaced during review")
}
EventMsg::ItemCompleted(ev) => match &ev.item {
codex_protocol::items::TurnItem::AgentMessage(_) => {
panic!("unexpected ItemCompleted for TurnItem::AgentMessage surfaced during review")
}
EventMsg::ExitedReviewMode(_) => {
saw_exited = true;
false
}
// The following must be filtered by review flow
EventMsg::AgentMessageContentDelta(_) => {
panic!("unexpected AgentMessageContentDelta surfaced during review")
}
EventMsg::AgentMessageDelta(_) => {
panic!("unexpected AgentMessageDelta surfaced during review")
}
EventMsg::ItemCompleted(ev) => match &ev.item {
codex_protocol::items::TurnItem::AgentMessage(_) => {
panic!(
"unexpected ItemCompleted for TurnItem::AgentMessage surfaced during review"
)
}
_ => false,
},
_ => false,
},
tokio::time::Duration::from_secs(5),
)
_ => false,
})
.await;
assert!(saw_entered && saw_exited, "missing review lifecycle events");
@@ -335,25 +328,21 @@ async fn review_does_not_emit_agent_message_on_structured_output() {
// Drain events until TaskComplete; ensure none are AgentMessage.
let mut saw_entered = false;
let mut saw_exited = false;
wait_for_event_with_timeout(
&codex,
|event| match event {
EventMsg::TaskComplete(_) => true,
EventMsg::AgentMessage(_) => {
panic!("unexpected AgentMessage during review with structured output")
}
EventMsg::EnteredReviewMode(_) => {
saw_entered = true;
false
}
EventMsg::ExitedReviewMode(_) => {
saw_exited = true;
false
}
_ => false,
},
tokio::time::Duration::from_secs(5),
)
wait_for_event(&codex, |event| match event {
EventMsg::TaskComplete(_) => true,
EventMsg::AgentMessage(_) => {
panic!("unexpected AgentMessage during review with structured output")
}
EventMsg::EnteredReviewMode(_) => {
saw_entered = true;
false
}
EventMsg::ExitedReviewMode(_) => {
saw_exited = true;
false
}
_ => false,
})
.await;
assert!(saw_entered && saw_exited, "missing review lifecycle events");

View File

@@ -25,7 +25,6 @@ use core_test_support::responses::mount_sse_once_match;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use escargot::CargoBuild;
use mcp_types::ContentBlock;
use serde_json::Value;
@@ -125,11 +124,9 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> {
})
.await?;
let begin_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpToolCallBegin(_)),
Duration::from_secs(10),
)
let begin_event = wait_for_event(&fixture.codex, |ev| {
matches!(ev, EventMsg::McpToolCallBegin(_))
})
.await;
let EventMsg::McpToolCallBegin(begin) = begin_event else {
@@ -268,11 +265,9 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
.await?;
// Wait for tool begin/end and final completion.
let begin_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpToolCallBegin(_)),
Duration::from_secs(10),
)
let begin_event = wait_for_event(&fixture.codex, |ev| {
matches!(ev, EventMsg::McpToolCallBegin(_))
})
.await;
let EventMsg::McpToolCallBegin(begin) = begin_event else {
unreachable!("begin");
@@ -465,11 +460,9 @@ async fn stdio_image_completions_round_trip() -> anyhow::Result<()> {
})
.await?;
let begin_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpToolCallBegin(_)),
Duration::from_secs(10),
)
let begin_event = wait_for_event(&fixture.codex, |ev| {
matches!(ev, EventMsg::McpToolCallBegin(_))
})
.await;
let EventMsg::McpToolCallBegin(begin) = begin_event else {
unreachable!("begin");
@@ -609,11 +602,9 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> {
})
.await?;
let begin_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpToolCallBegin(_)),
Duration::from_secs(10),
)
let begin_event = wait_for_event(&fixture.codex, |ev| {
matches!(ev, EventMsg::McpToolCallBegin(_))
})
.await;
let EventMsg::McpToolCallBegin(begin) = begin_event else {
@@ -762,11 +753,9 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
})
.await?;
let begin_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpToolCallBegin(_)),
Duration::from_secs(10),
)
let begin_event = wait_for_event(&fixture.codex, |ev| {
matches!(ev, EventMsg::McpToolCallBegin(_))
})
.await;
let EventMsg::McpToolCallBegin(begin) = begin_event else {
@@ -947,11 +936,9 @@ async fn streamable_http_with_oauth_round_trip() -> anyhow::Result<()> {
})
.await?;
let begin_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpToolCallBegin(_)),
Duration::from_secs(10),
)
let begin_event = wait_for_event(&fixture.codex, |ev| {
matches!(ev, EventMsg::McpToolCallBegin(_))
})
.await;
let EventMsg::McpToolCallBegin(begin) = begin_event else {

View File

@@ -1,5 +1,3 @@
use std::time::Duration;
use codex_core::ModelProviderInfo;
use codex_core::WireApi;
use codex_core::protocol::EventMsg;
@@ -9,7 +7,7 @@ use core_test_support::load_sse_fixture_with_id;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event_with_timeout;
use core_test_support::wait_for_event;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
@@ -96,19 +94,9 @@ async fn continue_after_stream_error() {
.unwrap();
// Expect an Error followed by TaskComplete so the session is released.
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::Error(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::Error(_))).await;
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// 2) Second turn: now send another prompt that should succeed using the
// mock server SSE stream. If the agent failed to clear the running task on
@@ -122,10 +110,5 @@ async fn continue_after_stream_error() {
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
Duration::from_secs(5),
)
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
}

View File

@@ -1,8 +1,6 @@
//! Verifies that the agent retries when the SSE stream terminates before
//! delivering a `response.completed` event.
use std::time::Duration;
use codex_core::ModelProviderInfo;
use codex_core::WireApi;
use codex_core::protocol::EventMsg;
@@ -13,7 +11,7 @@ use core_test_support::load_sse_fixture_with_id;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event_with_timeout;
use core_test_support::wait_for_event;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
@@ -103,10 +101,5 @@ async fn retries_on_early_close() {
.unwrap();
// Wait until TaskComplete (should succeed after retry).
wait_for_event_with_timeout(
&codex,
|event| matches!(event, EventMsg::TaskComplete(_)),
Duration::from_secs(10),
)
.await;
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
}

View File

@@ -25,7 +25,6 @@ use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use core_test_support::wait_for_event_with_timeout;
use regex_lite::Regex;
use serde_json::Value;
use serde_json::json;
@@ -482,8 +481,6 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> {
use tokio::time::Duration;
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
@@ -559,7 +556,7 @@ async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> {
let mut begin_events = Vec::new();
loop {
let event_msg = wait_for_event_with_timeout(&codex, |_| true, Duration::from_secs(2)).await;
let event_msg = wait_for_event(&codex, |_| true).await;
match event_msg {
EventMsg::ExecCommandBegin(event) => begin_events.push(event),
EventMsg::TaskComplete(_) => break,