use std::sync::Arc; use super::Session; use super::TurnContext; use super::filter_model_visible_history; use super::get_last_assistant_message_from_turn; use crate::Prompt; use crate::client_common::ResponseEvent; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::protocol::AgentMessageEvent; use crate::protocol::CompactedItem; use crate::protocol::ErrorEvent; use crate::protocol::EventMsg; use crate::protocol::TaskStartedEvent; use crate::protocol::TurnContextItem; use crate::state::TaskKind; use crate::truncate::truncate_middle; use crate::util::backoff; use askama::Template; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::RolloutItem; use codex_protocol::user_input::UserInput; use futures::prelude::*; use tracing::error; pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; #[derive(Template)] #[template(path = "compact/history_bridge.md", escape = "none")] struct HistoryBridgeTemplate<'a> { user_messages_text: &'a str, summary_text: &'a str, } pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, ) { let input = vec![UserInput::Text { text: SUMMARIZATION_PROMPT.to_string(), }]; run_compact_task_inner(sess, turn_context, input).await; } pub(crate) async fn run_compact_task( sess: Arc, turn_context: Arc, input: Vec, ) -> Option { let start_event = EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }); sess.send_event(&turn_context, start_event).await; run_compact_task_inner(sess.clone(), turn_context, input).await; None } async fn run_compact_task_inner( sess: Arc, turn_context: Arc, input: Vec, ) { let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let mut history = sess.clone_history().await; history.record_items(&[initial_input_for_turn.into()]); let mut truncated_count = 0usize; let max_retries = turn_context.client.get_provider().stream_max_retries(); let mut retries = 0; let rollout_item = RolloutItem::TurnContext(TurnContextItem { cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, sandbox_policy: turn_context.sandbox_policy.clone(), model: turn_context.client.get_model(), effort: turn_context.client.get_reasoning_effort(), summary: turn_context.client.get_reasoning_summary(), }); sess.persist_rollout_items(&[rollout_item]).await; loop { let turn_input = history.get_history(); let prompt_input = filter_model_visible_history(turn_input.clone()); let prompt = Prompt { input: prompt_input.clone(), ..Default::default() }; let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await; match attempt_result { Ok(()) => { if truncated_count > 0 { sess.notify_background_event( turn_context.as_ref(), format!( "Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window." ), ) .await; } break; } Err(CodexErr::Interrupted) => { return; } Err(e @ CodexErr::ContextWindowExceeded) => { if prompt_input.len() > 1 { // Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact. error!( "Context window exceeded while compacting; removing oldest history item. Error: {e}" ); history.remove_first_item(); truncated_count += 1; retries = 0; continue; } sess.set_total_tokens_full(turn_context.as_ref()).await; let event = EventMsg::Error(ErrorEvent { message: e.to_string(), }); sess.send_event(&turn_context, event).await; return; } Err(e) => { if retries < max_retries { retries += 1; let delay = backoff(retries); sess.notify_stream_error( turn_context.as_ref(), format!("Reconnecting... {retries}/{max_retries}"), ) .await; tokio::time::sleep(delay).await; continue; } else { let event = EventMsg::Error(ErrorEvent { message: e.to_string(), }); sess.send_event(&turn_context, event).await; return; } } } } let history_snapshot = sess.history_snapshot().await; let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default(); let user_messages = collect_user_messages(&history_snapshot); let initial_context = sess.build_initial_context(turn_context.as_ref()); let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text); let ghost_snapshots: Vec = history_snapshot .iter() .filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. })) .cloned() .collect(); new_history.extend(ghost_snapshots); sess.replace_history(new_history).await; let rollout_item = RolloutItem::Compacted(CompactedItem { message: summary_text.clone(), }); sess.persist_rollout_items(&[rollout_item]).await; let event = EventMsg::AgentMessage(AgentMessageEvent { message: "Compact task completed".to_string(), }); sess.send_event(&turn_context, event).await; } pub fn content_items_to_text(content: &[ContentItem]) -> Option { let mut pieces = Vec::new(); for item in content { match item { ContentItem::InputText { text } | ContentItem::OutputText { text } => { if !text.is_empty() { pieces.push(text.as_str()); } } ContentItem::InputImage { .. } => {} } } if pieces.is_empty() { None } else { Some(pieces.join("\n")) } } pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec { items .iter() .filter_map(|item| match crate::event_mapping::parse_turn_item(item) { Some(TurnItem::UserMessage(user)) => Some(user.message()), _ => None, }) .collect() } pub(crate) fn build_compacted_history( initial_context: Vec, user_messages: &[String], summary_text: &str, ) -> Vec { build_compacted_history_with_limit( initial_context, user_messages, summary_text, COMPACT_USER_MESSAGE_MAX_TOKENS * 4, ) } fn build_compacted_history_with_limit( mut history: Vec, user_messages: &[String], summary_text: &str, max_bytes: usize, ) -> Vec { let mut user_messages_text = if user_messages.is_empty() { "(none)".to_string() } else { user_messages.join("\n\n") }; // Truncate the concatenated prior user messages so the bridge message // stays well under the context window (approx. 4 bytes/token). if user_messages_text.len() > max_bytes { user_messages_text = truncate_middle(&user_messages_text, max_bytes).0; } let summary_text = if summary_text.is_empty() { "(no summary available)".to_string() } else { summary_text.to_string() }; let Ok(bridge) = HistoryBridgeTemplate { user_messages_text: &user_messages_text, summary_text: &summary_text, } .render() else { return vec![]; }; history.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: bridge }], }); history } async fn drain_to_completed( sess: &Session, turn_context: &TurnContext, prompt: &Prompt, ) -> CodexResult<()> { let mut stream = turn_context .client .clone() .stream_with_task_kind(prompt, TaskKind::Compact) .await?; loop { let maybe_event = stream.next().await; let Some(event) = maybe_event else { return Err(CodexErr::Stream( "stream closed before response.completed".into(), None, )); }; match event { Ok(ResponseEvent::OutputItemDone(item)) => { sess.record_into_history(std::slice::from_ref(&item)).await; } Ok(ResponseEvent::RateLimits(snapshot)) => { sess.update_rate_limits(turn_context, snapshot).await; } Ok(ResponseEvent::Completed { token_usage, .. }) => { sess.update_token_usage_info(turn_context, token_usage.as_ref()) .await; return Ok(()); } Ok(_) => continue, Err(e) => return Err(e), } } } #[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq; #[test] fn content_items_to_text_joins_non_empty_segments() { let items = vec![ ContentItem::InputText { text: "hello".to_string(), }, ContentItem::OutputText { text: String::new(), }, ContentItem::OutputText { text: "world".to_string(), }, ]; let joined = content_items_to_text(&items); assert_eq!(Some("hello\nworld".to_string()), joined); } #[test] fn content_items_to_text_ignores_image_only_content() { let items = vec![ContentItem::InputImage { image_url: "file://image.png".to_string(), }]; let joined = content_items_to_text(&items); assert_eq!(None, joined); } #[test] fn collect_user_messages_extracts_user_text_only() { let items = vec![ ResponseItem::Message { id: Some("assistant".to_string()), role: "assistant".to_string(), content: vec![ContentItem::OutputText { text: "ignored".to_string(), }], }, ResponseItem::Message { id: Some("user".to_string()), role: "user".to_string(), content: vec![ContentItem::InputText { text: "first".to_string(), }], }, ResponseItem::Other, ]; let collected = collect_user_messages(&items); assert_eq!(vec!["first".to_string()], collected); } #[test] fn collect_user_messages_filters_session_prefix_entries() { let items = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "do things".to_string(), }], }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "cwd=/tmp".to_string(), }], }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "real user message".to_string(), }], }, ]; let collected = collect_user_messages(&items); assert_eq!(vec!["real user message".to_string()], collected); } #[test] fn build_compacted_history_truncates_overlong_user_messages() { // Use a small truncation limit so the test remains fast while still validating // that oversized user content is truncated. let max_bytes = 128; let big = "X".repeat(max_bytes + 50); let history = super::build_compacted_history_with_limit( Vec::new(), std::slice::from_ref(&big), "SUMMARY", max_bytes, ); // Expect exactly one bridge message added to history (plus any initial context we provided, which is none). assert_eq!(history.len(), 1); // Extract the text content of the bridge message. let bridge_text = match &history[0] { ResponseItem::Message { role, content, .. } if role == "user" => { content_items_to_text(content).unwrap_or_default() } other => panic!("unexpected item in history: {other:?}"), }; // The bridge should contain the truncation marker and not the full original payload. assert!( bridge_text.contains("tokens truncated"), "expected truncation marker in bridge message" ); assert!( !bridge_text.contains(&big), "bridge should not include the full oversized user text" ); assert!( bridge_text.contains("SUMMARY"), "bridge should include the provided summary text" ); } }