ignore agent message deltas for the review mode (#5937)
The deltas produce the whole json output. ignore them.
This commit is contained in:
@@ -4,6 +4,8 @@ use async_trait::async_trait;
|
|||||||
use codex_protocol::items::TurnItem;
|
use codex_protocol::items::TurnItem;
|
||||||
use codex_protocol::models::ContentItem;
|
use codex_protocol::models::ContentItem;
|
||||||
use codex_protocol::models::ResponseItem;
|
use codex_protocol::models::ResponseItem;
|
||||||
|
use codex_protocol::protocol::AgentMessageContentDeltaEvent;
|
||||||
|
use codex_protocol::protocol::AgentMessageDeltaEvent;
|
||||||
use codex_protocol::protocol::Event;
|
use codex_protocol::protocol::Event;
|
||||||
use codex_protocol::protocol::EventMsg;
|
use codex_protocol::protocol::EventMsg;
|
||||||
use codex_protocol::protocol::ExitedReviewModeEvent;
|
use codex_protocol::protocol::ExitedReviewModeEvent;
|
||||||
@@ -111,13 +113,15 @@ async fn process_review_events(
|
|||||||
}
|
}
|
||||||
prev_agent_message = Some(event);
|
prev_agent_message = Some(event);
|
||||||
}
|
}
|
||||||
// Suppress ItemCompleted for assistant messages: forwarding it would
|
// Suppress ItemCompleted only for assistant messages: forwarding it
|
||||||
// trigger legacy AgentMessage via as_legacy_events(), which this
|
// would trigger legacy AgentMessage via as_legacy_events(), which this
|
||||||
// review flow intentionally hides in favor of structured output.
|
// review flow intentionally hides in favor of structured output.
|
||||||
EventMsg::ItemCompleted(ItemCompletedEvent {
|
EventMsg::ItemCompleted(ItemCompletedEvent {
|
||||||
item: TurnItem::AgentMessage(_),
|
item: TurnItem::AgentMessage(_),
|
||||||
..
|
..
|
||||||
}) => {}
|
})
|
||||||
|
| EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { .. })
|
||||||
|
| EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent { .. }) => {}
|
||||||
EventMsg::TaskComplete(task_complete) => {
|
EventMsg::TaskComplete(task_complete) => {
|
||||||
// Parse review output from the last agent message (if present).
|
// Parse review output from the last agent message (if present).
|
||||||
let out = task_complete
|
let out = task_complete
|
||||||
|
|||||||
@@ -204,6 +204,85 @@ async fn review_op_with_plain_text_emits_review_fallback() {
|
|||||||
server.verify().await;
|
server.verify().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ensure review flow suppresses assistant-specific streaming/completion events:
|
||||||
|
/// - AgentMessageContentDelta
|
||||||
|
/// - AgentMessageDelta (legacy)
|
||||||
|
/// - ItemCompleted for TurnItem::AgentMessage
|
||||||
|
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
|
||||||
|
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
|
||||||
|
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
|
||||||
|
async fn review_filters_agent_message_related_events() {
|
||||||
|
skip_if_no_network!();
|
||||||
|
|
||||||
|
// Stream simulating a typing assistant message with deltas and finalization.
|
||||||
|
let sse_raw = r#"[
|
||||||
|
{"type":"response.output_item.added", "item":{
|
||||||
|
"type":"message", "role":"assistant", "id":"msg-1",
|
||||||
|
"content":[{"type":"output_text","text":""}]
|
||||||
|
}},
|
||||||
|
{"type":"response.output_text.delta", "delta":"Hi"},
|
||||||
|
{"type":"response.output_text.delta", "delta":" there"},
|
||||||
|
{"type":"response.output_item.done", "item":{
|
||||||
|
"type":"message", "role":"assistant", "id":"msg-1",
|
||||||
|
"content":[{"type":"output_text","text":"Hi there"}]
|
||||||
|
}},
|
||||||
|
{"type":"response.completed", "response": {"id": "__ID__"}}
|
||||||
|
]"#;
|
||||||
|
let server = start_responses_server_with_sse(sse_raw, 1).await;
|
||||||
|
let codex_home = TempDir::new().unwrap();
|
||||||
|
let codex = new_conversation_for_server(&server, &codex_home, |_| {}).await;
|
||||||
|
|
||||||
|
codex
|
||||||
|
.submit(Op::Review {
|
||||||
|
review_request: ReviewRequest {
|
||||||
|
prompt: "Filter streaming events".to_string(),
|
||||||
|
user_facing_hint: "Filter streaming events".to_string(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut saw_entered = false;
|
||||||
|
let mut saw_exited = false;
|
||||||
|
|
||||||
|
// Drain until TaskComplete; assert filtered events never surface.
|
||||||
|
wait_for_event_with_timeout(
|
||||||
|
&codex,
|
||||||
|
|event| match event {
|
||||||
|
EventMsg::TaskComplete(_) => true,
|
||||||
|
EventMsg::EnteredReviewMode(_) => {
|
||||||
|
saw_entered = true;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
EventMsg::ExitedReviewMode(_) => {
|
||||||
|
saw_exited = true;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
// The following must be filtered by review flow
|
||||||
|
EventMsg::AgentMessageContentDelta(_) => {
|
||||||
|
panic!("unexpected AgentMessageContentDelta surfaced during review")
|
||||||
|
}
|
||||||
|
EventMsg::AgentMessageDelta(_) => {
|
||||||
|
panic!("unexpected AgentMessageDelta surfaced during review")
|
||||||
|
}
|
||||||
|
EventMsg::ItemCompleted(ev) => match &ev.item {
|
||||||
|
codex_protocol::items::TurnItem::AgentMessage(_) => {
|
||||||
|
panic!(
|
||||||
|
"unexpected ItemCompleted for TurnItem::AgentMessage surfaced during review"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
_ => false,
|
||||||
|
},
|
||||||
|
_ => false,
|
||||||
|
},
|
||||||
|
tokio::time::Duration::from_secs(5),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(saw_entered && saw_exited, "missing review lifecycle events");
|
||||||
|
|
||||||
|
server.verify().await;
|
||||||
|
}
|
||||||
|
|
||||||
/// When the model returns structured JSON in a review, ensure no AgentMessage
|
/// When the model returns structured JSON in a review, ensure no AgentMessage
|
||||||
/// is emitted; the UI consumes the structured result via ExitedReviewMode.
|
/// is emitted; the UI consumes the structured result via ExitedReviewMode.
|
||||||
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
|
// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
|
||||||
|
|||||||
Reference in New Issue
Block a user