From 6af83d86ffd6176b1949f877cae76735f4584a9c Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Fri, 24 Oct 2025 15:41:52 -0700 Subject: [PATCH] [codex][app-server] introduce codex/event/raw_item events (#5578) --- codex-rs/app-server-protocol/src/protocol.rs | 2 + .../app-server/src/codex_message_processor.rs | 10 +- .../suite/codex_message_processor_flow.rs | 15 +- .../tests/suite/create_conversation.rs | 5 +- codex-rs/app-server/tests/suite/interrupt.rs | 5 +- .../app-server/tests/suite/send_message.rs | 211 +++++++++++++++++- codex-rs/core/src/codex.rs | 67 ++++-- codex-rs/core/src/response_processing.rs | 4 +- codex-rs/core/src/rollout/policy.rs | 1 + .../src/event_processor_with_human_output.rs | 1 + codex-rs/mcp-server/src/codex_tool_runner.rs | 1 + codex-rs/protocol/src/protocol.rs | 2 + codex-rs/tui/src/chatwidget.rs | 4 +- 13 files changed, 300 insertions(+), 28 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol.rs b/codex-rs/app-server-protocol/src/protocol.rs index 7340871f..71a4d77f 100644 --- a/codex-rs/app-server-protocol/src/protocol.rs +++ b/codex-rs/app-server-protocol/src/protocol.rs @@ -717,6 +717,8 @@ pub struct SendUserMessageResponse {} #[serde(rename_all = "camelCase")] pub struct AddConversationListenerParams { pub conversation_id: ConversationId, + #[serde(default)] + pub experimental_raw_events: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index ff1a43f5..b2b242fb 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1256,7 +1256,10 @@ impl CodexMessageProcessor { request_id: RequestId, params: AddConversationListenerParams, ) { - let AddConversationListenerParams { conversation_id } = params; + let AddConversationListenerParams { + conversation_id, + experimental_raw_events, + } = params; let Ok(conversation) = self .conversation_manager .get_conversation(conversation_id) @@ -1293,6 +1296,11 @@ impl CodexMessageProcessor { } }; + if let EventMsg::RawResponseItem(_) = &event.msg + && !experimental_raw_events { + continue; + } + // For now, we send a notification for every event, // JSON-serializing the `Event` as-is, but these should // be migrated to be variants of `ServerNotification` diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index 6d3e2b42..5c010bf0 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -103,7 +103,10 @@ async fn test_codex_jsonrpc_conversation_flow() { // 2) addConversationListener let add_listener_id = mcp - .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) .await .expect("send addConversationListener"); let add_listener_resp: JSONRPCResponse = timeout( @@ -252,7 +255,10 @@ async fn test_send_user_turn_changes_approval_policy_behavior() { // 2) addConversationListener let add_listener_id = mcp - .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) .await .expect("send addConversationListener"); let _: AddConversationSubscriptionResponse = @@ -459,7 +465,10 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() { .expect("deserialize newConversation response"); let add_listener_id = mcp - .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) .await .expect("send addConversationListener"); timeout( diff --git a/codex-rs/app-server/tests/suite/create_conversation.rs b/codex-rs/app-server/tests/suite/create_conversation.rs index 37a0db84..f507baa3 100644 --- a/codex-rs/app-server/tests/suite/create_conversation.rs +++ b/codex-rs/app-server/tests/suite/create_conversation.rs @@ -67,7 +67,10 @@ async fn test_conversation_create_and_send_message_ok() { // Add a listener so we receive notifications for this conversation (not strictly required for this test). let add_listener_id = mcp - .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) .await .expect("send addConversationListener"); let _sub: AddConversationSubscriptionResponse = diff --git a/codex-rs/app-server/tests/suite/interrupt.rs b/codex-rs/app-server/tests/suite/interrupt.rs index 2500d20f..1c4c05e3 100644 --- a/codex-rs/app-server/tests/suite/interrupt.rs +++ b/codex-rs/app-server/tests/suite/interrupt.rs @@ -88,7 +88,10 @@ async fn shell_command_interruption() -> anyhow::Result<()> { // 2) addConversationListener let add_listener_id = mcp - .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) .await?; let _add_listener_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, diff --git a/codex-rs/app-server/tests/suite/send_message.rs b/codex-rs/app-server/tests/suite/send_message.rs index 22fb02dc..b3f04c33 100644 --- a/codex-rs/app-server/tests/suite/send_message.rs +++ b/codex-rs/app-server/tests/suite/send_message.rs @@ -15,6 +15,8 @@ use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserMessageResponse; use codex_protocol::ConversationId; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::time::timeout; @@ -62,7 +64,10 @@ async fn test_send_message_success() { // 2) addConversationListener let add_listener_id = mcp - .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: false, + }) .await .expect("send addConversationListener"); let add_listener_resp: JSONRPCResponse = timeout( @@ -124,6 +129,105 @@ async fn send_message(message: &str, conversation_id: ConversationId, mcp: &mut .expect("should have conversationId"), &serde_json::Value::String(conversation_id.to_string()) ); + + let raw_attempt = tokio::time::timeout( + std::time::Duration::from_millis(200), + mcp.read_stream_until_notification_message("codex/event/raw_response_item"), + ) + .await; + assert!( + raw_attempt.is_err(), + "unexpected raw item notification when not opted in" + ); +} + +#[tokio::test] +async fn test_send_message_raw_notifications_opt_in() { + let responses = vec![ + create_final_assistant_message_sse_response("Done").expect("build mock assistant message"), + ]; + let server = create_mock_chat_completions_server(responses).await; + + let codex_home = TempDir::new().expect("create temp dir"); + create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml"); + + let mut mcp = McpProcess::new(codex_home.path()) + .await + .expect("spawn mcp process"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timed out") + .expect("init failed"); + + let new_conv_id = mcp + .send_new_conversation_request(NewConversationParams::default()) + .await + .expect("send newConversation"); + let new_conv_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), + ) + .await + .expect("newConversation timeout") + .expect("newConversation resp"); + let NewConversationResponse { + conversation_id, .. + } = to_response::<_>(new_conv_resp).expect("deserialize newConversation response"); + + let add_listener_id = mcp + .send_add_conversation_listener_request(AddConversationListenerParams { + conversation_id, + experimental_raw_events: true, + }) + .await + .expect("send addConversationListener"); + let add_listener_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), + ) + .await + .expect("addConversationListener timeout") + .expect("addConversationListener resp"); + let AddConversationSubscriptionResponse { subscription_id: _ } = + to_response::<_>(add_listener_resp).expect("deserialize addConversationListener response"); + + let send_id = mcp + .send_send_user_message_request(SendUserMessageParams { + conversation_id, + items: vec![InputItem::Text { + text: "Hello".to_string(), + }], + }) + .await + .expect("send sendUserMessage"); + + let instructions = read_raw_response_item(&mut mcp, conversation_id).await; + assert_instructions_message(&instructions); + + let environment = read_raw_response_item(&mut mcp, conversation_id).await; + assert_environment_message(&environment); + + let response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(send_id)), + ) + .await + .expect("sendUserMessage response timeout") + .expect("sendUserMessage response error"); + let _ok: SendUserMessageResponse = to_response::(response) + .expect("deserialize sendUserMessage response"); + + let user_message = read_raw_response_item(&mut mcp, conversation_id).await; + assert_user_message(&user_message, "Hello"); + + let assistant_message = read_raw_response_item(&mut mcp, conversation_id).await; + assert_assistant_message(&assistant_message, "Done"); + + let _ = tokio::time::timeout( + std::time::Duration::from_millis(250), + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await; } #[tokio::test] @@ -184,3 +288,108 @@ stream_max_retries = 0 ), ) } + +#[expect(clippy::expect_used)] +async fn read_raw_response_item( + mcp: &mut McpProcess, + conversation_id: ConversationId, +) -> ResponseItem { + let raw_notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/raw_response_item"), + ) + .await + .expect("codex/event/raw_response_item notification timeout") + .expect("codex/event/raw_response_item notification resp"); + + let serde_json::Value::Object(params) = raw_notification + .params + .expect("codex/event/raw_response_item should have params") + else { + panic!("codex/event/raw_response_item should have params"); + }; + + let conversation_id_value = params + .get("conversationId") + .and_then(|value| value.as_str()) + .expect("raw response item should include conversationId"); + + assert_eq!( + conversation_id_value, + conversation_id.to_string(), + "raw response item conversation mismatch" + ); + + let msg_value = params + .get("msg") + .cloned() + .expect("raw response item should include msg payload"); + + serde_json::from_value(msg_value).expect("deserialize raw response item") +} + +fn assert_instructions_message(item: &ResponseItem) { + match item { + ResponseItem::Message { role, content, .. } => { + assert_eq!(role, "user"); + let texts = content_texts(content); + assert!( + texts + .iter() + .any(|text| text.contains("")), + "expected instructions message, got {texts:?}" + ); + } + other => panic!("expected instructions message, got {other:?}"), + } +} + +fn assert_environment_message(item: &ResponseItem) { + match item { + ResponseItem::Message { role, content, .. } => { + assert_eq!(role, "user"); + let texts = content_texts(content); + assert!( + texts + .iter() + .any(|text| text.contains("")), + "expected environment context message, got {texts:?}" + ); + } + other => panic!("expected environment message, got {other:?}"), + } +} + +fn assert_user_message(item: &ResponseItem, expected_text: &str) { + match item { + ResponseItem::Message { role, content, .. } => { + assert_eq!(role, "user"); + let texts = content_texts(content); + assert_eq!(texts, vec![expected_text]); + } + other => panic!("expected user message, got {other:?}"), + } +} + +fn assert_assistant_message(item: &ResponseItem, expected_text: &str) { + match item { + ResponseItem::Message { role, content, .. } => { + assert_eq!(role, "assistant"); + let texts = content_texts(content); + assert_eq!(texts, vec![expected_text]); + } + other => panic!("expected assistant message, got {other:?}"), + } +} + +fn content_texts(content: &[ContentItem]) -> Vec<&str> { + content + .iter() + .filter_map(|item| match item { + ContentItem::InputText { text } | ContentItem::OutputText { text } => { + Some(text.as_str()) + } + _ => None, + }) + .collect() +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 4eba28c4..d89ce5f8 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -570,7 +570,6 @@ impl Session { // Dispatch the SessionConfiguredEvent first and then report any errors. // If resuming, include converted initial messages in the payload so UIs can render them immediately. let initial_messages = initial_history.get_event_msgs(); - sess.record_initial_history(initial_history).await; let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), @@ -589,6 +588,9 @@ impl Session { sess.send_event_raw(event).await; } + // record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted. + sess.record_initial_history(initial_history).await; + Ok(sess) } @@ -609,7 +611,7 @@ impl Session { InitialHistory::New => { // Build and record initial items (user instructions + environment context) let items = self.build_initial_context(&turn_context); - self.record_conversation_items(&items).await; + self.record_conversation_items(&turn_context, &items).await; } InitialHistory::Resumed(_) | InitialHistory::Forked(_) => { let rollout_items = conversation_history.get_rollout_items(); @@ -886,9 +888,14 @@ impl Session { /// Records input items: always append to conversation history and /// persist these response items to rollout. - pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) { + pub(crate) async fn record_conversation_items( + &self, + turn_context: &TurnContext, + items: &[ResponseItem], + ) { self.record_into_history(items).await; self.persist_rollout_response_items(items).await; + self.send_raw_response_items(turn_context, items).await; } fn reconstruct_history_from_rollout( @@ -938,6 +945,13 @@ impl Session { self.persist_rollout_items(&rollout_items).await; } + async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) { + for item in items { + self.send_event(turn_context, EventMsg::RawResponseItem(item.clone())) + .await; + } + } + pub(crate) fn build_initial_context(&self, turn_context: &TurnContext) -> Vec { let mut items = Vec::::with_capacity(2); if let Some(user_instructions) = turn_context.user_instructions.as_deref() { @@ -1033,7 +1047,7 @@ impl Session { ) { let response_item: ResponseItem = response_input.clone().into(); // Add to conversation history and persist response item to rollout - self.record_conversation_items(std::slice::from_ref(&response_item)) + self.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) .await; // Derive user message events and persist only UserMessage to rollout @@ -1224,8 +1238,11 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv if let Some(env_item) = sess .build_environment_update_item(previous_context.as_ref(), ¤t_context) { - sess.record_conversation_items(std::slice::from_ref(&env_item)) - .await; + sess.record_conversation_items( + ¤t_context, + std::slice::from_ref(&env_item), + ) + .await; } sess.spawn_task(Arc::clone(¤t_context), items, RegularTask) @@ -1597,7 +1614,8 @@ pub(crate) async fn run_task( } review_thread_history.get_history() } else { - sess.record_conversation_items(&pending_input).await; + sess.record_conversation_items(&turn_context, &pending_input) + .await; sess.history_snapshot().await }; @@ -1644,6 +1662,7 @@ pub(crate) async fn run_task( is_review_mode, &mut review_thread_history, &sess, + &turn_context, ) .await; @@ -1692,6 +1711,7 @@ pub(crate) async fn run_task( is_review_mode, &mut review_thread_history, &sess, + &turn_context, ) .await; // Aborted turn is reported via a different event. @@ -2202,11 +2222,14 @@ pub(crate) async fn exit_review_mode( } session - .record_conversation_items(&[ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { text: user_message }], - }]) + .record_conversation_items( + &turn_context, + &[ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { text: user_message }], + }], + ) .await; } @@ -2801,13 +2824,19 @@ mod tests { EventMsg::ExitedReviewMode(ev) => assert!(ev.review_output.is_none()), other => panic!("unexpected first event: {other:?}"), } - let second = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) - .await - .expect("timeout waiting for second event") - .expect("second event"); - match second.msg { - EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), - other => panic!("unexpected second event: {other:?}"), + loop { + let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for next event") + .expect("event"); + match evt.msg { + EventMsg::RawResponseItem(_) => continue, + EventMsg::TurnAborted(e) => { + assert_eq!(TurnAbortReason::Interrupted, e.reason); + break; + } + other => panic!("unexpected second event: {other:?}"), + } } let history = sess.history_snapshot().await; diff --git a/codex-rs/core/src/response_processing.rs b/codex-rs/core/src/response_processing.rs index b9139ce6..e18fdd45 100644 --- a/codex-rs/core/src/response_processing.rs +++ b/codex-rs/core/src/response_processing.rs @@ -1,4 +1,5 @@ use crate::codex::Session; +use crate::codex::TurnContext; use crate::conversation_history::ConversationHistory; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; @@ -13,6 +14,7 @@ pub(crate) async fn process_items( is_review_mode: bool, review_thread_history: &mut ConversationHistory, sess: &Session, + turn_context: &TurnContext, ) -> (Vec, Vec) { let mut items_to_record_in_conversation_history = Vec::::new(); let mut responses = Vec::::new(); @@ -104,7 +106,7 @@ pub(crate) async fn process_items( if is_review_mode { review_thread_history.record_items(items_to_record_in_conversation_history.iter()); } else { - sess.record_conversation_items(&items_to_record_in_conversation_history) + sess.record_conversation_items(turn_context, &items_to_record_in_conversation_history) .await; } } diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index fdf1f5cc..ea2954fa 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -50,6 +50,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::AgentReasoningDelta(_) | EventMsg::AgentReasoningRawContentDelta(_) | EventMsg::AgentReasoningSectionBreak(_) + | EventMsg::RawResponseItem(_) | EventMsg::SessionConfigured(_) | EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 9a85c5de..b07cd16d 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -519,6 +519,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { EventMsg::AgentReasoningRawContentDelta(_) => {} EventMsg::ItemStarted(_) => {} EventMsg::ItemCompleted(_) => {} + EventMsg::RawResponseItem(_) => {} } CodexStatus::Running } diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 05d653af..a6af754d 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -285,6 +285,7 @@ async fn run_codex_tool_session_inner( | EventMsg::UserMessage(_) | EventMsg::ShutdownComplete | EventMsg::ViewImageToolCall(_) + | EventMsg::RawResponseItem(_) | EventMsg::EnteredReviewMode(_) | EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index cd10c278..2334561e 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -527,6 +527,8 @@ pub enum EventMsg { /// Exited review mode with an optional final result to apply. ExitedReviewMode(ExitedReviewModeEvent), + RawResponseItem(ResponseItem), + ItemStarted(ItemStartedEvent), ItemCompleted(ItemCompletedEvent), } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 64f680d2..833ae5d8 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1498,7 +1498,9 @@ impl ChatWidget { self.on_entered_review_mode(review_request) } EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review), - EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) => {} + EventMsg::RawResponseItem(_) + | EventMsg::ItemStarted(_) + | EventMsg::ItemCompleted(_) => {} } }