From 674e3d3c90d78508602c720c0f2d304ec5715a26 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 11 Sep 2025 11:08:51 -0700 Subject: [PATCH] Add Compact and Turn Context to the rollout items (#3444) Adding compact and turn context to the rollout items based on #3440 --- codex-rs/core/src/codex.rs | 28 ++++++++++- codex-rs/core/src/conversation_history.rs | 21 +++++++++ codex-rs/core/src/rollout/list.rs | 6 +++ codex-rs/core/src/rollout/policy.rs | 6 ++- codex-rs/core/src/rollout/recorder.rs | 6 +++ codex-rs/core/tests/suite/compact.rs | 57 +++++++++++++++++++++-- codex-rs/protocol/src/protocol.rs | 17 +++++++ 7 files changed, 132 insertions(+), 9 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 46d155be..adfaca19 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -19,11 +19,13 @@ use codex_apply_patch::ApplyPatchAction; use codex_apply_patch::MaybeApplyPatchVerified; use codex_apply_patch::maybe_parse_apply_patch_verified; use codex_protocol::mcp_protocol::ConversationId; +use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::ConversationPathResponseEvent; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::TaskStartedEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; +use codex_protocol::protocol::TurnContextItem; use futures::prelude::*; use mcp_types::CallToolResult; use serde::Deserialize; @@ -1786,6 +1788,15 @@ async fn try_run_turn( }) }; + let rollout_item = RolloutItem::TurnContext(TurnContextItem { + cwd: turn_context.cwd.clone(), + approval_policy: turn_context.approval_policy, + sandbox_policy: turn_context.sandbox_policy.clone(), + model: turn_context.client.get_model().clone(), + effort: turn_context.client.get_reasoning_effort(), + summary: turn_context.client.get_reasoning_summary(), + }); + sess.persist_rollout_items(&[rollout_item]).await; let mut stream = turn_context.client.clone().stream(&prompt).await?; let mut output = Vec::new(); @@ -1968,10 +1979,14 @@ async fn run_compact_task( sess.remove_task(&sub_id); - { + let rollout_item = { let mut state = sess.state.lock_unchecked(); state.history.keep_last_messages(1); - } + RolloutItem::Compacted(CompactedItem { + message: state.history.last_agent_message(), + }) + }; + sess.persist_rollout_items(&[rollout_item]).await; let event = Event { id: sub_id.clone(), @@ -2997,6 +3012,15 @@ async fn drain_to_completed( sub_id: &str, prompt: &Prompt, ) -> CodexResult<()> { + let rollout_item = RolloutItem::TurnContext(TurnContextItem { + cwd: turn_context.cwd.clone(), + approval_policy: turn_context.approval_policy, + sandbox_policy: turn_context.sandbox_policy.clone(), + model: turn_context.client.get_model(), + effort: turn_context.client.get_reasoning_effort(), + summary: turn_context.client.get_reasoning_summary(), + }); + sess.persist_rollout_items(&[rollout_item]).await; let mut stream = turn_context.client.clone().stream(prompt).await?; loop { let maybe_event = stream.next().await; diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index ed15bd41..3cdbbb33 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -1,3 +1,4 @@ +use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; /// Transcript of conversation history @@ -59,6 +60,26 @@ impl ConversationHistory { kept.reverse(); self.items = kept; } + + pub(crate) fn last_agent_message(&self) -> String { + for item in self.items.iter().rev() { + if let ResponseItem::Message { role, content, .. } = item + && role == "assistant" + { + return content + .iter() + .find_map(|ci| { + if let ContentItem::OutputText { text } = ci { + Some(text.clone()) + } else { + None + } + }) + .unwrap_or_default(); + } + } + String::new() + } } /// Anything that is not a system message or "reasoning" message is considered diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 417638e7..3d696451 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -318,6 +318,12 @@ async fn read_head_and_flags( head.push(val); } } + RolloutItem::TurnContext(_) => { + // Not included in `head`; skip. + } + RolloutItem::Compacted(_) => { + // Not included in `head`; skip. + } RolloutItem::EventMsg(ev) => { if matches!(ev, EventMsg::UserMessage(_)) { saw_user_event = true; diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index c0d6d3af..c1a5cb23 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -8,8 +8,10 @@ pub(crate) fn is_persisted_response_item(item: &RolloutItem) -> bool { match item { RolloutItem::ResponseItem(item) => should_persist_response_item(item), RolloutItem::EventMsg(ev) => should_persist_event_msg(ev), - // Always persist session meta - RolloutItem::SessionMeta(_) => true, + // Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns). + RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => { + true + } } } diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 63495bd8..580a41ed 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -238,6 +238,12 @@ impl RolloutRecorder { RolloutItem::ResponseItem(item) => { items.push(RolloutItem::ResponseItem(item)); } + RolloutItem::Compacted(item) => { + items.push(RolloutItem::Compacted(item)); + } + RolloutItem::TurnContext(item) => { + items.push(RolloutItem::TurnContext(item)); + } RolloutItem::EventMsg(_ev) => { items.push(RolloutItem::EventMsg(_ev)); } diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index f5e854d2..524f819b 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -3,10 +3,13 @@ use codex_core::CodexAuth; use codex_core::ConversationManager; use codex_core::ModelProviderInfo; +use codex_core::NewConversation; use codex_core::built_in_model_providers; use codex_core::protocol::EventMsg; use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_core::protocol::RolloutItem; +use codex_core::protocol::RolloutLine; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; use core_test_support::load_default_config_for_test; use core_test_support::wait_for_event; @@ -142,11 +145,12 @@ async fn summarize_context_three_requests_and_instructions() { let mut config = load_default_config_for_test(&home); config.model_provider = model_provider; let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy")); - let codex = conversation_manager - .new_conversation(config) - .await - .unwrap() - .conversation; + let NewConversation { + conversation: codex, + session_configured, + .. + } = conversation_manager.new_conversation(config).await.unwrap(); + let rollout_path = session_configured.rollout_path; // 1) Normal user input – should hit server once. codex @@ -248,4 +252,47 @@ async fn summarize_context_three_requests_and_instructions() { !messages.iter().any(|(_, t)| t.contains(SUMMARIZE_TRIGGER)), "third request should not include the summarize trigger" ); + + // Shut down Codex to flush rollout entries before inspecting the file. + codex.submit(Op::Shutdown).await.unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; + + // Verify rollout contains APITurn entries for each API call and a Compacted entry. + let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| { + panic!( + "failed to read rollout file {}: {e}", + rollout_path.display() + ) + }); + let mut api_turn_count = 0usize; + let mut saw_compacted_summary = false; + for line in text.lines() { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + let Ok(entry): Result = serde_json::from_str(trimmed) else { + continue; + }; + match entry.item { + RolloutItem::TurnContext(_) => { + api_turn_count += 1; + } + RolloutItem::Compacted(ci) => { + if ci.message == SUMMARY_TEXT { + saw_compacted_summary = true; + } + } + _ => {} + } + } + + assert!( + api_turn_count == 3, + "expected three APITurn entries in rollout" + ); + assert!( + saw_compacted_summary, + "expected a Compacted entry containing the summarizer output" + ); } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index be312285..2eb3d1e8 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -897,9 +897,26 @@ pub struct SessionMetaLine { pub enum RolloutItem { SessionMeta(SessionMetaLine), ResponseItem(ResponseItem), + Compacted(CompactedItem), + TurnContext(TurnContextItem), EventMsg(EventMsg), } +#[derive(Serialize, Deserialize, Clone, Debug, TS)] +pub struct CompactedItem { + pub message: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug, TS)] +pub struct TurnContextItem { + pub cwd: PathBuf, + pub approval_policy: AskForApproval, + pub sandbox_policy: SandboxPolicy, + pub model: String, + pub effort: ReasoningEffortConfig, + pub summary: ReasoningSummaryConfig, +} + #[derive(Serialize, Deserialize, Clone)] pub struct RolloutLine { pub timestamp: String,