diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index 08576cad..286eefa6 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -4,6 +4,8 @@ use async_trait::async_trait; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::AgentMessageContentDeltaEvent; +use codex_protocol::protocol::AgentMessageDeltaEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExitedReviewModeEvent; @@ -111,13 +113,15 @@ async fn process_review_events( } prev_agent_message = Some(event); } - // Suppress ItemCompleted for assistant messages: forwarding it would - // trigger legacy AgentMessage via as_legacy_events(), which this + // Suppress ItemCompleted only for assistant messages: forwarding it + // would trigger legacy AgentMessage via as_legacy_events(), which this // review flow intentionally hides in favor of structured output. EventMsg::ItemCompleted(ItemCompletedEvent { item: TurnItem::AgentMessage(_), .. - }) => {} + }) + | EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { .. }) + | EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent { .. }) => {} EventMsg::TaskComplete(task_complete) => { // Parse review output from the last agent message (if present). let out = task_complete diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index fdf7669d..093fd992 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -204,6 +204,85 @@ async fn review_op_with_plain_text_emits_review_fallback() { server.verify().await; } +/// Ensure review flow suppresses assistant-specific streaming/completion events: +/// - AgentMessageContentDelta +/// - AgentMessageDelta (legacy) +/// - ItemCompleted for TurnItem::AgentMessage +// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts. +#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))] +#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn review_filters_agent_message_related_events() { + skip_if_no_network!(); + + // Stream simulating a typing assistant message with deltas and finalization. + let sse_raw = r#"[ + {"type":"response.output_item.added", "item":{ + "type":"message", "role":"assistant", "id":"msg-1", + "content":[{"type":"output_text","text":""}] + }}, + {"type":"response.output_text.delta", "delta":"Hi"}, + {"type":"response.output_text.delta", "delta":" there"}, + {"type":"response.output_item.done", "item":{ + "type":"message", "role":"assistant", "id":"msg-1", + "content":[{"type":"output_text","text":"Hi there"}] + }}, + {"type":"response.completed", "response": {"id": "__ID__"}} + ]"#; + let server = start_responses_server_with_sse(sse_raw, 1).await; + let codex_home = TempDir::new().unwrap(); + let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await; + + codex + .submit(Op::Review { + review_request: ReviewRequest { + prompt: "Filter streaming events".to_string(), + user_facing_hint: "Filter streaming events".to_string(), + }, + }) + .await + .unwrap(); + + let mut saw_entered = false; + let mut saw_exited = false; + + // Drain until TaskComplete; assert filtered events never surface. + wait_for_event_with_timeout( + &codex, + |event| match event { + EventMsg::TaskComplete(_) => true, + EventMsg::EnteredReviewMode(_) => { + saw_entered = true; + false + } + EventMsg::ExitedReviewMode(_) => { + saw_exited = true; + false + } + // The following must be filtered by review flow + EventMsg::AgentMessageContentDelta(_) => { + panic!("unexpected AgentMessageContentDelta surfaced during review") + } + EventMsg::AgentMessageDelta(_) => { + panic!("unexpected AgentMessageDelta surfaced during review") + } + EventMsg::ItemCompleted(ev) => match &ev.item { + codex_protocol::items::TurnItem::AgentMessage(_) => { + panic!( + "unexpected ItemCompleted for TurnItem::AgentMessage surfaced during review" + ) + } + _ => false, + }, + _ => false, + }, + tokio::time::Duration::from_secs(5), + ) + .await; + assert!(saw_entered && saw_exited, "missing review lifecycle events"); + + server.verify().await; +} + /// When the model returns structured JSON in a review, ensure no AgentMessage /// is emitted; the UI consumes the structured result via ExitedReviewMode. // Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.