Add /compact (#1527)

- Add operation to summarize the context so far.
- The operation runs a compact task that summarizes the context.
- The operation clear the previous context to free the context window
- The operation didn't use `run_task` to avoid corrupting the session
- Add /compact in the tui



https://github.com/user-attachments/assets/e06c24e5-dcfb-4806-934a-564d425a919c
This commit is contained in:
aibrahim-oai
2025-07-31 21:34:32 -07:00
committed by GitHub
parent ad0295b893
commit e2c994e32a
8 changed files with 474 additions and 3 deletions

View File

@@ -442,6 +442,12 @@ impl Session {
let _ = self.tx_event.send(event).await;
}
/// Build the full turn input by concatenating the current conversation
/// history with additional items for this turn.
pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
[self.state.lock().unwrap().history.contents(), extra].concat()
}
/// Returns the input if there was no task running to inject into
pub fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
let mut state = self.state.lock().unwrap();
@@ -564,6 +570,25 @@ impl AgentTask {
handle,
}
}
fn compact(
sess: Arc<Session>,
sub_id: String,
input: Vec<InputItem>,
compact_instructions: String,
) -> Self {
let handle = tokio::spawn(run_compact_task(
Arc::clone(&sess),
sub_id.clone(),
input,
compact_instructions,
))
.abort_handle();
Self {
sess,
sub_id,
handle,
}
}
fn abort(self) {
if !self.handle.is_finished() {
@@ -884,6 +909,31 @@ async fn submission_loop(
}
});
}
Op::Compact => {
let sess = match sess.as_ref() {
Some(sess) => sess,
None => {
send_no_session_event(sub.id).await;
continue;
}
};
// Create a summarization request as user input
const SUMMARIZATION_PROMPT: &str = include_str!("../../../SUMMARY.md");
// Attempt to inject input into current task
if let Err(items) = sess.inject_input(vec![InputItem::Text {
text: "Start Summarization".to_string(),
}]) {
let task = AgentTask::compact(
sess.clone(),
sub.id,
items,
SUMMARIZATION_PROMPT.to_string(),
);
sess.set_task(task);
}
}
Op::Shutdown => {
info!("Shutting down Codex instance");
@@ -945,7 +995,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
return;
}
let initial_input_for_turn = ResponseInputItem::from(input);
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
.await;
@@ -966,8 +1016,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
// conversation history on each turn. The rollout file, however, should
// only record the new items that originated in this turn so that it
// represents an append-only log without duplicates.
let turn_input: Vec<ResponseItem> =
[sess.state.lock().unwrap().history.contents(), pending_input].concat();
let turn_input: Vec<ResponseItem> = sess.turn_input_with_history(pending_input);
let turn_input_messages: Vec<String> = turn_input
.iter()
@@ -1293,6 +1342,88 @@ async fn try_run_turn(
}
}
async fn run_compact_task(
sess: Arc<Session>,
sub_id: String,
input: Vec<InputItem>,
compact_instructions: String,
) {
let start_event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskStarted,
};
if sess.tx_event.send(start_event).await.is_err() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let turn_input: Vec<ResponseItem> =
sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]);
let prompt = Prompt {
input: turn_input,
user_instructions: None,
store: !sess.disable_response_storage,
extra_tools: HashMap::new(),
base_instructions_override: Some(compact_instructions.clone()),
};
let max_retries = sess.client.get_provider().stream_max_retries();
let mut retries = 0;
loop {
let attempt_result = drain_to_completed(&sess, &prompt).await;
match attempt_result {
Ok(()) => break,
Err(CodexErr::Interrupted) => return,
Err(e) => {
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}"
),
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: e.to_string(),
}),
};
sess.send_event(event).await;
return;
}
}
}
}
sess.remove_task(&sub_id);
let event = Event {
id: sub_id.clone(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
}),
};
sess.send_event(event).await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
};
sess.send_event(event).await;
let mut state = sess.state.lock().unwrap();
state.history.keep_last_messages(1);
}
async fn handle_response_item(
sess: &Session,
sub_id: &str,
@@ -1858,3 +1989,20 @@ fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<St
}
})
}
async fn drain_to_completed(sess: &Session, prompt: &Prompt) -> CodexResult<()> {
let mut stream = sess.client.clone().stream(prompt).await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
));
};
match event {
Ok(ResponseEvent::Completed { .. }) => return Ok(()),
Ok(_) => continue,
Err(e) => return Err(e),
}
}
}

View File

@@ -30,6 +30,34 @@ impl ConversationHistory {
}
}
}
pub(crate) fn keep_last_messages(&mut self, n: usize) {
if n == 0 {
self.items.clear();
return;
}
// Collect the last N message items (assistant/user), newest to oldest.
let mut kept: Vec<ResponseItem> = Vec::with_capacity(n);
for item in self.items.iter().rev() {
if let ResponseItem::Message { role, content, .. } = item {
kept.push(ResponseItem::Message {
// we need to remove the id or the model will complain that messages are sent without
// their reasonings
id: None,
role: role.clone(),
content: content.clone(),
});
if kept.len() == n {
break;
}
}
}
// Preserve chronological order (oldest to newest) within the kept slice.
kept.reverse();
self.items = kept;
}
}
/// Anything that is not a system message or "reasoning" message is considered

View File

@@ -121,6 +121,10 @@ pub enum Op {
/// Request a single history entry identified by `log_id` + `offset`.
GetHistoryEntryRequest { offset: usize, log_id: u64 },
/// Request the agent to summarize the current conversation context.
/// The agent will use its existing context (either conversation history or previous response id)
/// to generate a summary which will be returned as an AgentMessage event.
Compact,
/// Request to shut down codex instance.
Shutdown,
}