From 687a13bbe571a613b2a8f749390e11f0c2d6ad2c Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 8 Oct 2025 18:11:08 +0100 Subject: [PATCH] feat: truncate on compact (#4942) Truncate the message during compaction if it is just too large Do it iteratively as tokenization is basically free on server-side --- codex-rs/core/src/codex/compact.rs | 27 ++++-- codex-rs/core/tests/suite/compact.rs | 127 +++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index 40c9da7b..d35df99c 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -70,14 +70,10 @@ async fn run_compact_task_inner( input: Vec, ) { let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); - let turn_input = sess + let mut turn_input = sess .turn_input_with_history(vec![initial_input_for_turn.clone().into()]) .await; - - let prompt = Prompt { - input: turn_input, - ..Default::default() - }; + let mut truncated_count = 0usize; let max_retries = turn_context.client.get_provider().stream_max_retries(); let mut retries = 0; @@ -93,17 +89,36 @@ async fn run_compact_task_inner( sess.persist_rollout_items(&[rollout_item]).await; loop { + let prompt = Prompt { + input: turn_input.clone(), + ..Default::default() + }; let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &sub_id, &prompt).await; match attempt_result { Ok(()) => { + if truncated_count > 0 { + sess.notify_background_event( + &sub_id, + format!( + "Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window." + ), + ) + .await; + } break; } Err(CodexErr::Interrupted) => { return; } Err(e @ CodexErr::ContextWindowExceeded) => { + if turn_input.len() > 1 { + turn_input.remove(0); + truncated_count += 1; + retries = 0; + continue; + } sess.set_total_tokens_full(&sub_id, turn_context.as_ref()) .await; let event = Event { diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index f6db0834..82750e85 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -22,6 +22,7 @@ use core_test_support::responses::ev_function_call; use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; +use core_test_support::responses::sse_failed; use core_test_support::responses::start_mock_server; use pretty_assertions::assert_eq; // --- Test helpers ----------------------------------------------------------- @@ -38,6 +39,8 @@ const SECOND_LARGE_REPLY: &str = "SECOND_LARGE_REPLY"; const FIRST_AUTO_SUMMARY: &str = "FIRST_AUTO_SUMMARY"; const SECOND_AUTO_SUMMARY: &str = "SECOND_AUTO_SUMMARY"; const FINAL_REPLY: &str = "FINAL_REPLY"; +const CONTEXT_LIMIT_MESSAGE: &str = + "Your input exceeds the context window of this model. Please adjust your input and try again."; const DUMMY_FUNCTION_NAME: &str = "unsupported_tool"; const DUMMY_CALL_ID: &str = "call-multi-auto"; @@ -622,6 +625,130 @@ async fn auto_compact_stops_after_failed_attempt() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn manual_compact_retries_after_context_window_error() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let user_turn = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed("r1"), + ]); + let compact_failed = sse_failed( + "resp-fail", + "context_length_exceeded", + CONTEXT_LIMIT_MESSAGE, + ); + let compact_succeeds = sse(vec![ + ev_assistant_message("m2", SUMMARY_TEXT), + ev_completed("r2"), + ]); + + let request_log = mount_sse_sequence( + &server, + vec![ + user_turn.clone(), + compact_failed.clone(), + compact_succeeds.clone(), + ], + ) + .await; + + let model_provider = ModelProviderInfo { + base_url: Some(format!("{}/v1", server.uri())), + ..built_in_model_providers()["openai"].clone() + }; + + let home = TempDir::new().unwrap(); + let mut config = load_default_config_for_test(&home); + config.model_provider = model_provider; + config.model_auto_compact_token_limit = Some(200_000); + let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy")) + .new_conversation(config) + .await + .unwrap() + .conversation; + + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "first turn".into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + codex.submit(Op::Compact).await.unwrap(); + + let EventMsg::BackgroundEvent(event) = + wait_for_event(&codex, |ev| matches!(ev, EventMsg::BackgroundEvent(_))).await + else { + panic!("expected background event after compact retry"); + }; + assert!( + event.message.contains("Trimmed 1 older conversation item"), + "background event should mention trimmed item count: {}", + event.message + ); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 3, + "expected user turn and two compact attempts" + ); + + let compact_attempt = requests[1].body_json(); + let retry_attempt = requests[2].body_json(); + + let compact_input = compact_attempt["input"] + .as_array() + .unwrap_or_else(|| panic!("compact attempt missing input array: {compact_attempt}")); + let retry_input = retry_attempt["input"] + .as_array() + .unwrap_or_else(|| panic!("retry attempt missing input array: {retry_attempt}")); + assert_eq!( + compact_input + .last() + .and_then(|item| item.get("content")) + .and_then(|v| v.as_array()) + .and_then(|items| items.first()) + .and_then(|entry| entry.get("text")) + .and_then(|text| text.as_str()), + Some(SUMMARIZATION_PROMPT), + "compact attempt should include summarization prompt" + ); + assert_eq!( + retry_input + .last() + .and_then(|item| item.get("content")) + .and_then(|v| v.as_array()) + .and_then(|items| items.first()) + .and_then(|entry| entry.get("text")) + .and_then(|text| text.as_str()), + Some(SUMMARIZATION_PROMPT), + "retry attempt should include summarization prompt" + ); + assert_eq!( + retry_input.len(), + compact_input.len().saturating_sub(1), + "retry should drop exactly one history item (before {} vs after {})", + compact_input.len(), + retry_input.len() + ); + if let (Some(first_before), Some(first_after)) = (compact_input.first(), retry_input.first()) { + assert_ne!( + first_before, first_after, + "retry should drop the oldest conversation item" + ); + } else { + panic!("expected non-empty compact inputs"); + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_events() { skip_if_no_network!();