Add Compact and Turn Context to the rollout items (#3444)
Adding compact and turn context to the rollout items based on #3440
This commit is contained in:
@@ -19,11 +19,13 @@ use codex_apply_patch::ApplyPatchAction;
|
||||
use codex_apply_patch::MaybeApplyPatchVerified;
|
||||
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::ConversationPathResponseEvent;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::TaskStartedEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use futures::prelude::*;
|
||||
use mcp_types::CallToolResult;
|
||||
use serde::Deserialize;
|
||||
@@ -1786,6 +1788,15 @@ async fn try_run_turn(
|
||||
})
|
||||
};
|
||||
|
||||
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
|
||||
cwd: turn_context.cwd.clone(),
|
||||
approval_policy: turn_context.approval_policy,
|
||||
sandbox_policy: turn_context.sandbox_policy.clone(),
|
||||
model: turn_context.client.get_model().clone(),
|
||||
effort: turn_context.client.get_reasoning_effort(),
|
||||
summary: turn_context.client.get_reasoning_summary(),
|
||||
});
|
||||
sess.persist_rollout_items(&[rollout_item]).await;
|
||||
let mut stream = turn_context.client.clone().stream(&prompt).await?;
|
||||
|
||||
let mut output = Vec::new();
|
||||
@@ -1968,10 +1979,14 @@ async fn run_compact_task(
|
||||
|
||||
sess.remove_task(&sub_id);
|
||||
|
||||
{
|
||||
let rollout_item = {
|
||||
let mut state = sess.state.lock_unchecked();
|
||||
state.history.keep_last_messages(1);
|
||||
}
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: state.history.last_agent_message(),
|
||||
})
|
||||
};
|
||||
sess.persist_rollout_items(&[rollout_item]).await;
|
||||
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
@@ -2997,6 +3012,15 @@ async fn drain_to_completed(
|
||||
sub_id: &str,
|
||||
prompt: &Prompt,
|
||||
) -> CodexResult<()> {
|
||||
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
|
||||
cwd: turn_context.cwd.clone(),
|
||||
approval_policy: turn_context.approval_policy,
|
||||
sandbox_policy: turn_context.sandbox_policy.clone(),
|
||||
model: turn_context.client.get_model(),
|
||||
effort: turn_context.client.get_reasoning_effort(),
|
||||
summary: turn_context.client.get_reasoning_summary(),
|
||||
});
|
||||
sess.persist_rollout_items(&[rollout_item]).await;
|
||||
let mut stream = turn_context.client.clone().stream(prompt).await?;
|
||||
loop {
|
||||
let maybe_event = stream.next().await;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
/// Transcript of conversation history
|
||||
@@ -59,6 +60,26 @@ impl ConversationHistory {
|
||||
kept.reverse();
|
||||
self.items = kept;
|
||||
}
|
||||
|
||||
pub(crate) fn last_agent_message(&self) -> String {
|
||||
for item in self.items.iter().rev() {
|
||||
if let ResponseItem::Message { role, content, .. } = item
|
||||
&& role == "assistant"
|
||||
{
|
||||
return content
|
||||
.iter()
|
||||
.find_map(|ci| {
|
||||
if let ContentItem::OutputText { text } = ci {
|
||||
Some(text.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or_default();
|
||||
}
|
||||
}
|
||||
String::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Anything that is not a system message or "reasoning" message is considered
|
||||
|
||||
@@ -318,6 +318,12 @@ async fn read_head_and_flags(
|
||||
head.push(val);
|
||||
}
|
||||
}
|
||||
RolloutItem::TurnContext(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::Compacted(_) => {
|
||||
// Not included in `head`; skip.
|
||||
}
|
||||
RolloutItem::EventMsg(ev) => {
|
||||
if matches!(ev, EventMsg::UserMessage(_)) {
|
||||
saw_user_event = true;
|
||||
|
||||
@@ -8,8 +8,10 @@ pub(crate) fn is_persisted_response_item(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(item) => should_persist_response_item(item),
|
||||
RolloutItem::EventMsg(ev) => should_persist_event_msg(ev),
|
||||
// Always persist session meta
|
||||
RolloutItem::SessionMeta(_) => true,
|
||||
// Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns).
|
||||
RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -238,6 +238,12 @@ impl RolloutRecorder {
|
||||
RolloutItem::ResponseItem(item) => {
|
||||
items.push(RolloutItem::ResponseItem(item));
|
||||
}
|
||||
RolloutItem::Compacted(item) => {
|
||||
items.push(RolloutItem::Compacted(item));
|
||||
}
|
||||
RolloutItem::TurnContext(item) => {
|
||||
items.push(RolloutItem::TurnContext(item));
|
||||
}
|
||||
RolloutItem::EventMsg(_ev) => {
|
||||
items.push(RolloutItem::EventMsg(_ev));
|
||||
}
|
||||
|
||||
@@ -3,10 +3,13 @@
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::ModelProviderInfo;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::built_in_model_providers;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::RolloutItem;
|
||||
use codex_core::protocol::RolloutLine;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::wait_for_event;
|
||||
@@ -142,11 +145,12 @@ async fn summarize_context_three_requests_and_instructions() {
|
||||
let mut config = load_default_config_for_test(&home);
|
||||
config.model_provider = model_provider;
|
||||
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
|
||||
let codex = conversation_manager
|
||||
.new_conversation(config)
|
||||
.await
|
||||
.unwrap()
|
||||
.conversation;
|
||||
let NewConversation {
|
||||
conversation: codex,
|
||||
session_configured,
|
||||
..
|
||||
} = conversation_manager.new_conversation(config).await.unwrap();
|
||||
let rollout_path = session_configured.rollout_path;
|
||||
|
||||
// 1) Normal user input – should hit server once.
|
||||
codex
|
||||
@@ -248,4 +252,47 @@ async fn summarize_context_three_requests_and_instructions() {
|
||||
!messages.iter().any(|(_, t)| t.contains(SUMMARIZE_TRIGGER)),
|
||||
"third request should not include the summarize trigger"
|
||||
);
|
||||
|
||||
// Shut down Codex to flush rollout entries before inspecting the file.
|
||||
codex.submit(Op::Shutdown).await.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
// Verify rollout contains APITurn entries for each API call and a Compacted entry.
|
||||
let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"failed to read rollout file {}: {e}",
|
||||
rollout_path.display()
|
||||
)
|
||||
});
|
||||
let mut api_turn_count = 0usize;
|
||||
let mut saw_compacted_summary = false;
|
||||
for line in text.lines() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Ok(entry): Result<RolloutLine, _> = serde_json::from_str(trimmed) else {
|
||||
continue;
|
||||
};
|
||||
match entry.item {
|
||||
RolloutItem::TurnContext(_) => {
|
||||
api_turn_count += 1;
|
||||
}
|
||||
RolloutItem::Compacted(ci) => {
|
||||
if ci.message == SUMMARY_TEXT {
|
||||
saw_compacted_summary = true;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
api_turn_count == 3,
|
||||
"expected three APITurn entries in rollout"
|
||||
);
|
||||
assert!(
|
||||
saw_compacted_summary,
|
||||
"expected a Compacted entry containing the summarizer output"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -897,9 +897,26 @@ pub struct SessionMetaLine {
|
||||
pub enum RolloutItem {
|
||||
SessionMeta(SessionMetaLine),
|
||||
ResponseItem(ResponseItem),
|
||||
Compacted(CompactedItem),
|
||||
TurnContext(TurnContextItem),
|
||||
EventMsg(EventMsg),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, TS)]
|
||||
pub struct CompactedItem {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, TS)]
|
||||
pub struct TurnContextItem {
|
||||
pub cwd: PathBuf,
|
||||
pub approval_policy: AskForApproval,
|
||||
pub sandbox_policy: SandboxPolicy,
|
||||
pub model: String,
|
||||
pub effort: ReasoningEffortConfig,
|
||||
pub summary: ReasoningSummaryConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct RolloutLine {
|
||||
pub timestamp: String,
|
||||
|
||||
Reference in New Issue
Block a user