use std::sync::Arc; use super::Session; use super::TurnContext; use super::get_last_assistant_message_from_turn; use crate::Prompt; use crate::client_common::ResponseEvent; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::protocol::AgentMessageEvent; use crate::protocol::CompactedItem; use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::InputItem; use crate::protocol::InputMessageKind; use crate::protocol::TaskStartedEvent; use crate::protocol::TurnContextItem; use crate::state::TaskKind; use crate::truncate::truncate_middle; use crate::util::backoff; use askama::Template; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::RolloutItem; use futures::prelude::*; pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; #[derive(Template)] #[template(path = "compact/history_bridge.md", escape = "none")] struct HistoryBridgeTemplate<'a> { user_messages_text: &'a str, summary_text: &'a str, } pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, ) { let sub_id = sess.next_internal_sub_id(); let input = vec![InputItem::Text { text: SUMMARIZATION_PROMPT.to_string(), }]; run_compact_task_inner(sess, turn_context, sub_id, input).await; } pub(crate) async fn run_compact_task( sess: Arc, turn_context: Arc, sub_id: String, input: Vec, ) -> Option { let start_event = Event { id: sub_id.clone(), msg: EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }), }; sess.send_event(start_event).await; run_compact_task_inner(sess.clone(), turn_context, sub_id.clone(), input).await; None } async fn run_compact_task_inner( sess: Arc, turn_context: Arc, sub_id: String, input: Vec, ) { let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let mut turn_input = sess .turn_input_with_history(vec![initial_input_for_turn.clone().into()]) .await; let mut truncated_count = 0usize; let max_retries = turn_context.client.get_provider().stream_max_retries(); let mut retries = 0; let rollout_item = RolloutItem::TurnContext(TurnContextItem { cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, sandbox_policy: turn_context.sandbox_policy.clone(), model: turn_context.client.get_model(), effort: turn_context.client.get_reasoning_effort(), summary: turn_context.client.get_reasoning_summary(), }); sess.persist_rollout_items(&[rollout_item]).await; loop { let prompt = Prompt { input: turn_input.clone(), ..Default::default() }; let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &sub_id, &prompt).await; match attempt_result { Ok(()) => { if truncated_count > 0 { sess.notify_background_event( &sub_id, format!( "Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window." ), ) .await; } break; } Err(CodexErr::Interrupted) => { return; } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input.len() > 1 { turn_input.remove(0); truncated_count += 1; retries = 0; continue; } sess.set_total_tokens_full(&sub_id, turn_context.as_ref()) .await; let event = Event { id: sub_id.clone(), msg: EventMsg::Error(ErrorEvent { message: e.to_string(), }), }; sess.send_event(event).await; return; } Err(e) => { if retries < max_retries { retries += 1; let delay = backoff(retries); sess.notify_stream_error( &sub_id, format!("Re-connecting... {retries}/{max_retries}"), ) .await; tokio::time::sleep(delay).await; continue; } else { let event = Event { id: sub_id.clone(), msg: EventMsg::Error(ErrorEvent { message: e.to_string(), }), }; sess.send_event(event).await; return; } } } } let history_snapshot = sess.history_snapshot().await; let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default(); let user_messages = collect_user_messages(&history_snapshot); let initial_context = sess.build_initial_context(turn_context.as_ref()); let new_history = build_compacted_history(initial_context, &user_messages, &summary_text); sess.replace_history(new_history).await; let rollout_item = RolloutItem::Compacted(CompactedItem { message: summary_text.clone(), }); sess.persist_rollout_items(&[rollout_item]).await; let event = Event { id: sub_id.clone(), msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Compact task completed".to_string(), }), }; sess.send_event(event).await; } pub fn content_items_to_text(content: &[ContentItem]) -> Option { let mut pieces = Vec::new(); for item in content { match item { ContentItem::InputText { text } | ContentItem::OutputText { text } => { if !text.is_empty() { pieces.push(text.as_str()); } } ContentItem::InputImage { .. } => {} } } if pieces.is_empty() { None } else { Some(pieces.join("\n")) } } pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec { items .iter() .filter_map(|item| match item { ResponseItem::Message { role, content, .. } if role == "user" => { content_items_to_text(content) } _ => None, }) .filter(|text| !is_session_prefix_message(text)) .collect() } pub fn is_session_prefix_message(text: &str) -> bool { matches!( InputMessageKind::from(("user", text)), InputMessageKind::UserInstructions | InputMessageKind::EnvironmentContext ) } pub(crate) fn build_compacted_history( initial_context: Vec, user_messages: &[String], summary_text: &str, ) -> Vec { let mut history = initial_context; let mut user_messages_text = if user_messages.is_empty() { "(none)".to_string() } else { user_messages.join("\n\n") }; // Truncate the concatenated prior user messages so the bridge message // stays well under the context window (approx. 4 bytes/token). let max_bytes = COMPACT_USER_MESSAGE_MAX_TOKENS * 4; if user_messages_text.len() > max_bytes { user_messages_text = truncate_middle(&user_messages_text, max_bytes).0; } let summary_text = if summary_text.is_empty() { "(no summary available)".to_string() } else { summary_text.to_string() }; let Ok(bridge) = HistoryBridgeTemplate { user_messages_text: &user_messages_text, summary_text: &summary_text, } .render() else { return vec![]; }; history.push(ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: bridge }], }); history } async fn drain_to_completed( sess: &Session, turn_context: &TurnContext, sub_id: &str, prompt: &Prompt, ) -> CodexResult<()> { let mut stream = turn_context .client .clone() .stream_with_task_kind(prompt, TaskKind::Compact) .await?; loop { let maybe_event = stream.next().await; let Some(event) = maybe_event else { return Err(CodexErr::Stream( "stream closed before response.completed".into(), None, )); }; match event { Ok(ResponseEvent::OutputItemDone(item)) => { sess.record_into_history(std::slice::from_ref(&item)).await; } Ok(ResponseEvent::RateLimits(snapshot)) => { sess.update_rate_limits(sub_id, snapshot).await; } Ok(ResponseEvent::Completed { token_usage, .. }) => { sess.update_token_usage_info(sub_id, turn_context, token_usage.as_ref()) .await; return Ok(()); } Ok(_) => continue, Err(e) => return Err(e), } } } #[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq; #[test] fn content_items_to_text_joins_non_empty_segments() { let items = vec![ ContentItem::InputText { text: "hello".to_string(), }, ContentItem::OutputText { text: String::new(), }, ContentItem::OutputText { text: "world".to_string(), }, ]; let joined = content_items_to_text(&items); assert_eq!(Some("hello\nworld".to_string()), joined); } #[test] fn content_items_to_text_ignores_image_only_content() { let items = vec![ContentItem::InputImage { image_url: "file://image.png".to_string(), }]; let joined = content_items_to_text(&items); assert_eq!(None, joined); } #[test] fn collect_user_messages_extracts_user_text_only() { let items = vec![ ResponseItem::Message { id: Some("assistant".to_string()), role: "assistant".to_string(), content: vec![ContentItem::OutputText { text: "ignored".to_string(), }], }, ResponseItem::Message { id: Some("user".to_string()), role: "user".to_string(), content: vec![ ContentItem::InputText { text: "first".to_string(), }, ContentItem::OutputText { text: "second".to_string(), }, ], }, ResponseItem::Other, ]; let collected = collect_user_messages(&items); assert_eq!(vec!["first\nsecond".to_string()], collected); } #[test] fn collect_user_messages_filters_session_prefix_entries() { let items = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "do things".to_string(), }], }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "cwd=/tmp".to_string(), }], }, ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "real user message".to_string(), }], }, ]; let collected = collect_user_messages(&items); assert_eq!(vec!["real user message".to_string()], collected); } #[test] fn build_compacted_history_truncates_overlong_user_messages() { // Prepare a very large prior user message so the aggregated // `user_messages_text` exceeds the truncation threshold used by // `build_compacted_history` (80k bytes). let big = "X".repeat(200_000); let history = build_compacted_history(Vec::new(), std::slice::from_ref(&big), "SUMMARY"); // Expect exactly one bridge message added to history (plus any initial context we provided, which is none). assert_eq!(history.len(), 1); // Extract the text content of the bridge message. let bridge_text = match &history[0] { ResponseItem::Message { role, content, .. } if role == "user" => { content_items_to_text(content).unwrap_or_default() } other => panic!("unexpected item in history: {other:?}"), }; // The bridge should contain the truncation marker and not the full original payload. assert!( bridge_text.contains("tokens truncated"), "expected truncation marker in bridge message" ); assert!( !bridge_text.contains(&big), "bridge should not include the full oversized user text" ); assert!( bridge_text.contains("SUMMARY"), "bridge should include the provided summary text" ); } }