feat: truncate on compact (#4942)
Truncate the message during compaction if it is just too large Do it iteratively as tokenization is basically free on server-side
This commit is contained in:
@@ -70,14 +70,10 @@ async fn run_compact_task_inner(
|
|||||||
input: Vec<InputItem>,
|
input: Vec<InputItem>,
|
||||||
) {
|
) {
|
||||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||||
let turn_input = sess
|
let mut turn_input = sess
|
||||||
.turn_input_with_history(vec![initial_input_for_turn.clone().into()])
|
.turn_input_with_history(vec![initial_input_for_turn.clone().into()])
|
||||||
.await;
|
.await;
|
||||||
|
let mut truncated_count = 0usize;
|
||||||
let prompt = Prompt {
|
|
||||||
input: turn_input,
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let max_retries = turn_context.client.get_provider().stream_max_retries();
|
let max_retries = turn_context.client.get_provider().stream_max_retries();
|
||||||
let mut retries = 0;
|
let mut retries = 0;
|
||||||
@@ -93,17 +89,36 @@ async fn run_compact_task_inner(
|
|||||||
sess.persist_rollout_items(&[rollout_item]).await;
|
sess.persist_rollout_items(&[rollout_item]).await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
let prompt = Prompt {
|
||||||
|
input: turn_input.clone(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
let attempt_result =
|
let attempt_result =
|
||||||
drain_to_completed(&sess, turn_context.as_ref(), &sub_id, &prompt).await;
|
drain_to_completed(&sess, turn_context.as_ref(), &sub_id, &prompt).await;
|
||||||
|
|
||||||
match attempt_result {
|
match attempt_result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
|
if truncated_count > 0 {
|
||||||
|
sess.notify_background_event(
|
||||||
|
&sub_id,
|
||||||
|
format!(
|
||||||
|
"Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(CodexErr::Interrupted) => {
|
Err(CodexErr::Interrupted) => {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Err(e @ CodexErr::ContextWindowExceeded) => {
|
Err(e @ CodexErr::ContextWindowExceeded) => {
|
||||||
|
if turn_input.len() > 1 {
|
||||||
|
turn_input.remove(0);
|
||||||
|
truncated_count += 1;
|
||||||
|
retries = 0;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
sess.set_total_tokens_full(&sub_id, turn_context.as_ref())
|
sess.set_total_tokens_full(&sub_id, turn_context.as_ref())
|
||||||
.await;
|
.await;
|
||||||
let event = Event {
|
let event = Event {
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ use core_test_support::responses::ev_function_call;
|
|||||||
use core_test_support::responses::mount_sse_once_match;
|
use core_test_support::responses::mount_sse_once_match;
|
||||||
use core_test_support::responses::mount_sse_sequence;
|
use core_test_support::responses::mount_sse_sequence;
|
||||||
use core_test_support::responses::sse;
|
use core_test_support::responses::sse;
|
||||||
|
use core_test_support::responses::sse_failed;
|
||||||
use core_test_support::responses::start_mock_server;
|
use core_test_support::responses::start_mock_server;
|
||||||
use pretty_assertions::assert_eq;
|
use pretty_assertions::assert_eq;
|
||||||
// --- Test helpers -----------------------------------------------------------
|
// --- Test helpers -----------------------------------------------------------
|
||||||
@@ -38,6 +39,8 @@ const SECOND_LARGE_REPLY: &str = "SECOND_LARGE_REPLY";
|
|||||||
const FIRST_AUTO_SUMMARY: &str = "FIRST_AUTO_SUMMARY";
|
const FIRST_AUTO_SUMMARY: &str = "FIRST_AUTO_SUMMARY";
|
||||||
const SECOND_AUTO_SUMMARY: &str = "SECOND_AUTO_SUMMARY";
|
const SECOND_AUTO_SUMMARY: &str = "SECOND_AUTO_SUMMARY";
|
||||||
const FINAL_REPLY: &str = "FINAL_REPLY";
|
const FINAL_REPLY: &str = "FINAL_REPLY";
|
||||||
|
const CONTEXT_LIMIT_MESSAGE: &str =
|
||||||
|
"Your input exceeds the context window of this model. Please adjust your input and try again.";
|
||||||
const DUMMY_FUNCTION_NAME: &str = "unsupported_tool";
|
const DUMMY_FUNCTION_NAME: &str = "unsupported_tool";
|
||||||
const DUMMY_CALL_ID: &str = "call-multi-auto";
|
const DUMMY_CALL_ID: &str = "call-multi-auto";
|
||||||
|
|
||||||
@@ -622,6 +625,130 @@ async fn auto_compact_stops_after_failed_attempt() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn manual_compact_retries_after_context_window_error() {
|
||||||
|
skip_if_no_network!();
|
||||||
|
|
||||||
|
let server = start_mock_server().await;
|
||||||
|
|
||||||
|
let user_turn = sse(vec![
|
||||||
|
ev_assistant_message("m1", FIRST_REPLY),
|
||||||
|
ev_completed("r1"),
|
||||||
|
]);
|
||||||
|
let compact_failed = sse_failed(
|
||||||
|
"resp-fail",
|
||||||
|
"context_length_exceeded",
|
||||||
|
CONTEXT_LIMIT_MESSAGE,
|
||||||
|
);
|
||||||
|
let compact_succeeds = sse(vec![
|
||||||
|
ev_assistant_message("m2", SUMMARY_TEXT),
|
||||||
|
ev_completed("r2"),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let request_log = mount_sse_sequence(
|
||||||
|
&server,
|
||||||
|
vec![
|
||||||
|
user_turn.clone(),
|
||||||
|
compact_failed.clone(),
|
||||||
|
compact_succeeds.clone(),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let model_provider = ModelProviderInfo {
|
||||||
|
base_url: Some(format!("{}/v1", server.uri())),
|
||||||
|
..built_in_model_providers()["openai"].clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
let home = TempDir::new().unwrap();
|
||||||
|
let mut config = load_default_config_for_test(&home);
|
||||||
|
config.model_provider = model_provider;
|
||||||
|
config.model_auto_compact_token_limit = Some(200_000);
|
||||||
|
let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"))
|
||||||
|
.new_conversation(config)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.conversation;
|
||||||
|
|
||||||
|
codex
|
||||||
|
.submit(Op::UserInput {
|
||||||
|
items: vec![InputItem::Text {
|
||||||
|
text: "first turn".into(),
|
||||||
|
}],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||||
|
|
||||||
|
codex.submit(Op::Compact).await.unwrap();
|
||||||
|
|
||||||
|
let EventMsg::BackgroundEvent(event) =
|
||||||
|
wait_for_event(&codex, |ev| matches!(ev, EventMsg::BackgroundEvent(_))).await
|
||||||
|
else {
|
||||||
|
panic!("expected background event after compact retry");
|
||||||
|
};
|
||||||
|
assert!(
|
||||||
|
event.message.contains("Trimmed 1 older conversation item"),
|
||||||
|
"background event should mention trimmed item count: {}",
|
||||||
|
event.message
|
||||||
|
);
|
||||||
|
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||||
|
|
||||||
|
let requests = request_log.requests();
|
||||||
|
assert_eq!(
|
||||||
|
requests.len(),
|
||||||
|
3,
|
||||||
|
"expected user turn and two compact attempts"
|
||||||
|
);
|
||||||
|
|
||||||
|
let compact_attempt = requests[1].body_json();
|
||||||
|
let retry_attempt = requests[2].body_json();
|
||||||
|
|
||||||
|
let compact_input = compact_attempt["input"]
|
||||||
|
.as_array()
|
||||||
|
.unwrap_or_else(|| panic!("compact attempt missing input array: {compact_attempt}"));
|
||||||
|
let retry_input = retry_attempt["input"]
|
||||||
|
.as_array()
|
||||||
|
.unwrap_or_else(|| panic!("retry attempt missing input array: {retry_attempt}"));
|
||||||
|
assert_eq!(
|
||||||
|
compact_input
|
||||||
|
.last()
|
||||||
|
.and_then(|item| item.get("content"))
|
||||||
|
.and_then(|v| v.as_array())
|
||||||
|
.and_then(|items| items.first())
|
||||||
|
.and_then(|entry| entry.get("text"))
|
||||||
|
.and_then(|text| text.as_str()),
|
||||||
|
Some(SUMMARIZATION_PROMPT),
|
||||||
|
"compact attempt should include summarization prompt"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
retry_input
|
||||||
|
.last()
|
||||||
|
.and_then(|item| item.get("content"))
|
||||||
|
.and_then(|v| v.as_array())
|
||||||
|
.and_then(|items| items.first())
|
||||||
|
.and_then(|entry| entry.get("text"))
|
||||||
|
.and_then(|text| text.as_str()),
|
||||||
|
Some(SUMMARIZATION_PROMPT),
|
||||||
|
"retry attempt should include summarization prompt"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
retry_input.len(),
|
||||||
|
compact_input.len().saturating_sub(1),
|
||||||
|
"retry should drop exactly one history item (before {} vs after {})",
|
||||||
|
compact_input.len(),
|
||||||
|
retry_input.len()
|
||||||
|
);
|
||||||
|
if let (Some(first_before), Some(first_after)) = (compact_input.first(), retry_input.first()) {
|
||||||
|
assert_ne!(
|
||||||
|
first_before, first_after,
|
||||||
|
"retry should drop the oldest conversation item"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
panic!("expected non-empty compact inputs");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_events() {
|
async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_events() {
|
||||||
skip_if_no_network!();
|
skip_if_no_network!();
|
||||||
|
|||||||
Reference in New Issue
Block a user