[codex][app-server] introduce codex/event/raw_item events (#5578)

This commit is contained in:
Anton Panasenko
2025-10-24 15:41:52 -07:00
committed by GitHub
parent e2e1b65da6
commit 6af83d86ff
13 changed files with 300 additions and 28 deletions

View File

@@ -570,7 +570,6 @@ impl Session {
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = initial_history.get_event_msgs();
sess.record_initial_history(initial_history).await;
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
@@ -589,6 +588,9 @@ impl Session {
sess.send_event_raw(event).await;
}
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
sess.record_initial_history(initial_history).await;
Ok(sess)
}
@@ -609,7 +611,7 @@ impl Session {
InitialHistory::New => {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context);
self.record_conversation_items(&items).await;
self.record_conversation_items(&turn_context, &items).await;
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
@@ -886,9 +888,14 @@ impl Session {
/// Records input items: always append to conversation history and
/// persist these response items to rollout.
pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) {
pub(crate) async fn record_conversation_items(
&self,
turn_context: &TurnContext,
items: &[ResponseItem],
) {
self.record_into_history(items).await;
self.persist_rollout_response_items(items).await;
self.send_raw_response_items(turn_context, items).await;
}
fn reconstruct_history_from_rollout(
@@ -938,6 +945,13 @@ impl Session {
self.persist_rollout_items(&rollout_items).await;
}
async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
for item in items {
self.send_event(turn_context, EventMsg::RawResponseItem(item.clone()))
.await;
}
}
pub(crate) fn build_initial_context(&self, turn_context: &TurnContext) -> Vec<ResponseItem> {
let mut items = Vec::<ResponseItem>::with_capacity(2);
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
@@ -1033,7 +1047,7 @@ impl Session {
) {
let response_item: ResponseItem = response_input.clone().into();
// Add to conversation history and persist response item to rollout
self.record_conversation_items(std::slice::from_ref(&response_item))
self.record_conversation_items(turn_context, std::slice::from_ref(&response_item))
.await;
// Derive user message events and persist only UserMessage to rollout
@@ -1224,8 +1238,11 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
if let Some(env_item) = sess
.build_environment_update_item(previous_context.as_ref(), &current_context)
{
sess.record_conversation_items(std::slice::from_ref(&env_item))
.await;
sess.record_conversation_items(
&current_context,
std::slice::from_ref(&env_item),
)
.await;
}
sess.spawn_task(Arc::clone(&current_context), items, RegularTask)
@@ -1597,7 +1614,8 @@ pub(crate) async fn run_task(
}
review_thread_history.get_history()
} else {
sess.record_conversation_items(&pending_input).await;
sess.record_conversation_items(&turn_context, &pending_input)
.await;
sess.history_snapshot().await
};
@@ -1644,6 +1662,7 @@ pub(crate) async fn run_task(
is_review_mode,
&mut review_thread_history,
&sess,
&turn_context,
)
.await;
@@ -1692,6 +1711,7 @@ pub(crate) async fn run_task(
is_review_mode,
&mut review_thread_history,
&sess,
&turn_context,
)
.await;
// Aborted turn is reported via a different event.
@@ -2202,11 +2222,14 @@ pub(crate) async fn exit_review_mode(
}
session
.record_conversation_items(&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_message }],
}])
.record_conversation_items(
&turn_context,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_message }],
}],
)
.await;
}
@@ -2801,13 +2824,19 @@ mod tests {
EventMsg::ExitedReviewMode(ev) => assert!(ev.review_output.is_none()),
other => panic!("unexpected first event: {other:?}"),
}
let second = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for second event")
.expect("second event");
match second.msg {
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
other => panic!("unexpected second event: {other:?}"),
loop {
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for next event")
.expect("event");
match evt.msg {
EventMsg::RawResponseItem(_) => continue,
EventMsg::TurnAborted(e) => {
assert_eq!(TurnAbortReason::Interrupted, e.reason);
break;
}
other => panic!("unexpected second event: {other:?}"),
}
}
let history = sess.history_snapshot().await;