Introduce rollout items (#3380)
This PR introduces Rollout items. This enable us to rollout eventmsgs and session meta. This is mostly #3214 with rebase on main
This commit is contained in:
@@ -10,6 +10,7 @@ use std::time::Duration;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::event_mapping::map_response_item_to_event_messages;
|
||||
use crate::rollout::recorder::RolloutItem;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use codex_apply_patch::ApplyPatchAction;
|
||||
@@ -203,9 +204,6 @@ impl Codex {
|
||||
error!("Failed to create session: {e:#}");
|
||||
CodexErr::InternalAgentDied
|
||||
})?;
|
||||
session
|
||||
.record_initial_history(&turn_context, conversation_history)
|
||||
.await;
|
||||
let conversation_id = session.conversation_id;
|
||||
|
||||
// This task will run until Op::Shutdown is received.
|
||||
@@ -494,13 +492,9 @@ impl Session {
|
||||
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = match &initial_history {
|
||||
InitialHistory::New => None,
|
||||
InitialHistory::Forked(items) => Some(sess.build_initial_messages(items)),
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
Some(sess.build_initial_messages(&resumed_history.history))
|
||||
}
|
||||
};
|
||||
let initial_messages = initial_history.get_event_msgs();
|
||||
sess.record_initial_history(&turn_context, initial_history)
|
||||
.await;
|
||||
|
||||
let events = std::iter::once(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
@@ -515,9 +509,7 @@ impl Session {
|
||||
})
|
||||
.chain(post_session_configured_error_events.into_iter());
|
||||
for event in events {
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
error!("failed to send event: {e:?}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
Ok((sess, turn_context))
|
||||
@@ -547,53 +539,33 @@ impl Session {
|
||||
) {
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
self.record_initial_history_new(turn_context).await;
|
||||
// Build and record initial items (user instructions + environment context)
|
||||
let items = self.build_initial_context(turn_context);
|
||||
self.record_conversation_items(&items).await;
|
||||
}
|
||||
InitialHistory::Forked(items) => {
|
||||
self.record_initial_history_from_items(items).await;
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
self.record_initial_history_from_items(resumed_history.history)
|
||||
.await;
|
||||
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
|
||||
let rollout_items = conversation_history.get_rollout_items();
|
||||
let persist = matches!(conversation_history, InitialHistory::Forked(_));
|
||||
|
||||
// Always add response items to conversation history
|
||||
let response_items = conversation_history.get_response_items();
|
||||
if !response_items.is_empty() {
|
||||
self.record_into_history(&response_items);
|
||||
}
|
||||
|
||||
// If persisting, persist all rollout items as-is (recorder filters)
|
||||
if persist && !rollout_items.is_empty() {
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_initial_history_new(&self, turn_context: &TurnContext) {
|
||||
// record the initial user instructions and environment context,
|
||||
// regardless of whether we restored items.
|
||||
// TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages.
|
||||
let mut conversation_items = Vec::<ResponseItem>::with_capacity(2);
|
||||
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
|
||||
conversation_items.push(UserInstructions::new(user_instructions.to_string()).into());
|
||||
}
|
||||
conversation_items.push(ResponseItem::from(EnvironmentContext::new(
|
||||
Some(turn_context.cwd.clone()),
|
||||
Some(turn_context.approval_policy),
|
||||
Some(turn_context.sandbox_policy.clone()),
|
||||
Some(self.user_shell.clone()),
|
||||
)));
|
||||
self.record_conversation_items(&conversation_items).await;
|
||||
}
|
||||
|
||||
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
|
||||
self.record_conversation_items_internal(&items, false).await;
|
||||
}
|
||||
|
||||
/// build the initial messages vector for SessionConfigured by converting
|
||||
/// ResponseItems into EventMsg.
|
||||
fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec<EventMsg> {
|
||||
items
|
||||
.iter()
|
||||
.flat_map(|item| {
|
||||
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Sends the given event to the client and swallows the send event, if
|
||||
/// any, logging it as an error.
|
||||
/// Persist the event to rollout and send it to clients.
|
||||
pub(crate) async fn send_event(&self, event: Event) {
|
||||
// Persist the event into rollout (recorder filters as needed)
|
||||
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
if let Err(e) = self.tx_event.send(event).await {
|
||||
error!("failed to send tool call event: {e}");
|
||||
}
|
||||
@@ -627,7 +599,7 @@ impl Session {
|
||||
reason,
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
rx_approve
|
||||
}
|
||||
|
||||
@@ -659,7 +631,7 @@ impl Session {
|
||||
grant_root,
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
rx_approve
|
||||
}
|
||||
|
||||
@@ -683,36 +655,76 @@ impl Session {
|
||||
state.approved_commands.insert(cmd);
|
||||
}
|
||||
|
||||
/// Records items to both the rollout and the chat completions/ZDR
|
||||
/// transcript, if enabled.
|
||||
/// Records input items: always append to conversation history and
|
||||
/// persist these response items to rollout.
|
||||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||
self.record_conversation_items_internal(items, true).await;
|
||||
self.record_into_history(items);
|
||||
self.persist_rollout_response_items(items).await;
|
||||
}
|
||||
|
||||
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
|
||||
debug!("Recording items for conversation: {items:?}");
|
||||
if persist {
|
||||
self.record_state_snapshot(items).await;
|
||||
/// Append ResponseItems to the in-memory conversation history only.
|
||||
fn record_into_history(&self, items: &[ResponseItem]) {
|
||||
self.state
|
||||
.lock_unchecked()
|
||||
.history
|
||||
.record_items(items.iter());
|
||||
}
|
||||
|
||||
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
|
||||
let rollout_items: Vec<RolloutItem> = items
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(RolloutItem::ResponseItem)
|
||||
.collect();
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
}
|
||||
|
||||
fn build_initial_context(&self, turn_context: &TurnContext) -> Vec<ResponseItem> {
|
||||
let mut items = Vec::<ResponseItem>::with_capacity(2);
|
||||
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
|
||||
items.push(UserInstructions::new(user_instructions.to_string()).into());
|
||||
}
|
||||
|
||||
self.state.lock_unchecked().history.record_items(items);
|
||||
items.push(ResponseItem::from(EnvironmentContext::new(
|
||||
Some(turn_context.cwd.clone()),
|
||||
Some(turn_context.approval_policy),
|
||||
Some(turn_context.sandbox_policy.clone()),
|
||||
Some(self.user_shell.clone()),
|
||||
)));
|
||||
items
|
||||
}
|
||||
|
||||
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
|
||||
let snapshot = { crate::rollout::SessionStateSnapshot {} };
|
||||
|
||||
async fn persist_rollout_items(&self, items: &[RolloutItem]) {
|
||||
let recorder = {
|
||||
let guard = self.rollout.lock_unchecked();
|
||||
guard.as_ref().cloned()
|
||||
};
|
||||
if let Some(rec) = recorder
|
||||
&& let Err(e) = rec.record_items(items).await
|
||||
{
|
||||
error!("failed to record rollout items: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(rec) = recorder {
|
||||
if let Err(e) = rec.record_state(snapshot).await {
|
||||
error!("failed to record rollout state: {e:#}");
|
||||
}
|
||||
if let Err(e) = rec.record_items(items).await {
|
||||
error!("failed to record rollout items: {e:#}");
|
||||
}
|
||||
/// Record a user input item to conversation history and also persist a
|
||||
/// corresponding UserMessage EventMsg to rollout.
|
||||
async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) {
|
||||
let response_item: ResponseItem = response_input.clone().into();
|
||||
// Add to conversation history and persist response item to rollout
|
||||
self.record_conversation_items(std::slice::from_ref(&response_item))
|
||||
.await;
|
||||
|
||||
// Derive user message events and persist only UserMessage to rollout
|
||||
let msgs =
|
||||
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning);
|
||||
let user_msgs: Vec<RolloutItem> = msgs
|
||||
.into_iter()
|
||||
.filter_map(|m| match m {
|
||||
EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
if !user_msgs.is_empty() {
|
||||
self.persist_rollout_items(&user_msgs).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -755,7 +767,7 @@ impl Session {
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
|
||||
async fn on_exec_command_end(
|
||||
@@ -802,7 +814,7 @@ impl Session {
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
|
||||
// If this is an apply_patch, after we emit the end patch, emit a second event
|
||||
// with the full turn diff if there is one.
|
||||
@@ -814,7 +826,7 @@ impl Session {
|
||||
id: sub_id.into(),
|
||||
msg,
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -880,7 +892,7 @@ impl Session {
|
||||
message: message.into(),
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
|
||||
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
|
||||
@@ -890,7 +902,7 @@ impl Session {
|
||||
message: message.into(),
|
||||
}),
|
||||
};
|
||||
let _ = self.tx_event.send(event).await;
|
||||
self.send_event(event).await;
|
||||
}
|
||||
|
||||
/// Build the full turn input by concatenating the current conversation
|
||||
@@ -1053,9 +1065,9 @@ impl AgentTask {
|
||||
id: self.sub_id,
|
||||
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
|
||||
};
|
||||
let tx_event = self.sess.tx_event.clone();
|
||||
let sess = self.sess.clone();
|
||||
tokio::spawn(async move {
|
||||
tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1260,7 +1272,7 @@ async fn submission_loop(
|
||||
|
||||
Op::GetHistoryEntryRequest { offset, log_id } => {
|
||||
let config = config.clone();
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sess_clone = sess.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -1288,13 +1300,10 @@ async fn submission_loop(
|
||||
),
|
||||
};
|
||||
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send GetHistoryEntryResponse event: {e}");
|
||||
}
|
||||
sess_clone.send_event(event).await;
|
||||
});
|
||||
}
|
||||
Op::ListMcpTools => {
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
// This is a cheap lookup from the connection manager's cache.
|
||||
@@ -1305,12 +1314,9 @@ async fn submission_loop(
|
||||
crate::protocol::McpListToolsResponseEvent { tools },
|
||||
),
|
||||
};
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send McpListToolsResponse event: {e}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
Op::ListCustomPrompts => {
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
let custom_prompts: Vec<CustomPrompt> =
|
||||
@@ -1326,9 +1332,7 @@ async fn submission_loop(
|
||||
custom_prompts,
|
||||
}),
|
||||
};
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send ListCustomPromptsResponse event: {e}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
Op::Compact => {
|
||||
// Create a summarization request as user input
|
||||
@@ -1364,22 +1368,17 @@ async fn submission_loop(
|
||||
message: "Failed to shutdown rollout recorder".to_string(),
|
||||
}),
|
||||
};
|
||||
if let Err(e) = sess.tx_event.send(event).await {
|
||||
warn!("failed to send error message: {e:?}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
let event = Event {
|
||||
id: sub.id.clone(),
|
||||
msg: EventMsg::ShutdownComplete,
|
||||
};
|
||||
if let Err(e) = sess.tx_event.send(event).await {
|
||||
warn!("failed to send Shutdown event: {e}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
break;
|
||||
}
|
||||
Op::GetHistory => {
|
||||
let tx_event = sess.tx_event.clone();
|
||||
let sub_id = sub.id.clone();
|
||||
|
||||
let event = Event {
|
||||
@@ -1389,9 +1388,7 @@ async fn submission_loop(
|
||||
entries: sess.state.lock_unchecked().history.contents(),
|
||||
}),
|
||||
};
|
||||
if let Err(e) = tx_event.send(event).await {
|
||||
warn!("failed to send ConversationHistory event: {e}");
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
_ => {
|
||||
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
|
||||
@@ -1429,12 +1426,10 @@ async fn run_task(
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
}),
|
||||
};
|
||||
if sess.tx_event.send(event).await.is_err() {
|
||||
return;
|
||||
}
|
||||
sess.send_event(event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
|
||||
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
|
||||
.await;
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
@@ -1603,7 +1598,7 @@ async fn run_task(
|
||||
message: e.to_string(),
|
||||
}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
// let the user continue the conversation
|
||||
break;
|
||||
}
|
||||
@@ -1614,7 +1609,7 @@ async fn run_task(
|
||||
id: sub_id,
|
||||
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
async fn run_turn(
|
||||
@@ -1812,13 +1807,12 @@ async fn try_run_turn(
|
||||
st.token_info = info.clone();
|
||||
info
|
||||
};
|
||||
sess.tx_event
|
||||
.send(Event {
|
||||
let _ = sess
|
||||
.send_event(Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
|
||||
})
|
||||
.await
|
||||
.ok();
|
||||
.await;
|
||||
|
||||
let unified_diff = turn_diff_tracker.get_unified_diff();
|
||||
if let Ok(Some(unified_diff)) = unified_diff {
|
||||
@@ -1827,7 +1821,7 @@ async fn try_run_turn(
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
let _ = sess.tx_event.send(event).await;
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
return Ok(output);
|
||||
@@ -1837,21 +1831,21 @@ async fn try_run_turn(
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta(delta) => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryPartAdded => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningContentDelta(delta) => {
|
||||
if sess.show_raw_agent_reasoning {
|
||||
@@ -1861,7 +1855,7 @@ async fn try_run_turn(
|
||||
AgentReasoningRawContentDeltaEvent { delta },
|
||||
),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1882,9 +1876,7 @@ async fn run_compact_task(
|
||||
model_context_window,
|
||||
}),
|
||||
};
|
||||
if sess.tx_event.send(start_event).await.is_err() {
|
||||
return;
|
||||
}
|
||||
sess.send_event(start_event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
let turn_input: Vec<ResponseItem> =
|
||||
@@ -2062,7 +2054,7 @@ async fn handle_response_item(
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user