From e2c994e32a31415e87070bef28ed698968d2e549 Mon Sep 17 00:00:00 2001 From: aibrahim-oai Date: Thu, 31 Jul 2025 21:34:32 -0700 Subject: [PATCH] Add /compact (#1527) - Add operation to summarize the context so far. - The operation runs a compact task that summarizes the context. - The operation clear the previous context to free the context window - The operation didn't use `run_task` to avoid corrupting the session - Add /compact in the tui https://github.com/user-attachments/assets/e06c24e5-dcfb-4806-934a-564d425a919c --- SUMMARY.md | 21 ++ codex-rs/core/src/codex.rs | 154 ++++++++++++- codex-rs/core/src/conversation_history.rs | 28 +++ codex-rs/core/src/protocol.rs | 4 + codex-rs/core/tests/compact.rs | 254 ++++++++++++++++++++++ codex-rs/tui/src/app.rs | 8 + codex-rs/tui/src/chatwidget.rs | 6 + codex-rs/tui/src/slash_command.rs | 2 + 8 files changed, 474 insertions(+), 3 deletions(-) create mode 100644 SUMMARY.md create mode 100644 codex-rs/core/tests/compact.rs diff --git a/SUMMARY.md b/SUMMARY.md new file mode 100644 index 00000000..5cdecdbf --- /dev/null +++ b/SUMMARY.md @@ -0,0 +1,21 @@ +You are a summarization assistant. A conversation follows between a user and a coding-focused AI (Codex). Your task is to generate a clear summary capturing: + +• High-level objective or problem being solved +• Key instructions or design decisions given by the user +• Main code actions or behaviors from the AI +• Important variables, functions, modules, or outputs discussed +• Any unresolved questions or next steps + +Produce the summary in a structured format like: + +**Objective:** … + +**User instructions:** … (bulleted) + +**AI actions / code behavior:** … (bulleted) + +**Important entities:** … (e.g. function names, variables, files) + +**Open issues / next steps:** … (if any) + +**Summary (concise):** (one or two sentences) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 3dd1d513..5d48cd45 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -442,6 +442,12 @@ impl Session { let _ = self.tx_event.send(event).await; } + /// Build the full turn input by concatenating the current conversation + /// history with additional items for this turn. + pub fn turn_input_with_history(&self, extra: Vec) -> Vec { + [self.state.lock().unwrap().history.contents(), extra].concat() + } + /// Returns the input if there was no task running to inject into pub fn inject_input(&self, input: Vec) -> Result<(), Vec> { let mut state = self.state.lock().unwrap(); @@ -564,6 +570,25 @@ impl AgentTask { handle, } } + fn compact( + sess: Arc, + sub_id: String, + input: Vec, + compact_instructions: String, + ) -> Self { + let handle = tokio::spawn(run_compact_task( + Arc::clone(&sess), + sub_id.clone(), + input, + compact_instructions, + )) + .abort_handle(); + Self { + sess, + sub_id, + handle, + } + } fn abort(self) { if !self.handle.is_finished() { @@ -884,6 +909,31 @@ async fn submission_loop( } }); } + Op::Compact => { + let sess = match sess.as_ref() { + Some(sess) => sess, + None => { + send_no_session_event(sub.id).await; + continue; + } + }; + + // Create a summarization request as user input + const SUMMARIZATION_PROMPT: &str = include_str!("../../../SUMMARY.md"); + + // Attempt to inject input into current task + if let Err(items) = sess.inject_input(vec![InputItem::Text { + text: "Start Summarization".to_string(), + }]) { + let task = AgentTask::compact( + sess.clone(), + sub.id, + items, + SUMMARIZATION_PROMPT.to_string(), + ); + sess.set_task(task); + } + } Op::Shutdown => { info!("Shutting down Codex instance"); @@ -945,7 +995,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { return; } - let initial_input_for_turn = ResponseInputItem::from(input); + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); sess.record_conversation_items(&[initial_input_for_turn.clone().into()]) .await; @@ -966,8 +1016,7 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { // conversation history on each turn. The rollout file, however, should // only record the new items that originated in this turn so that it // represents an append-only log without duplicates. - let turn_input: Vec = - [sess.state.lock().unwrap().history.contents(), pending_input].concat(); + let turn_input: Vec = sess.turn_input_with_history(pending_input); let turn_input_messages: Vec = turn_input .iter() @@ -1293,6 +1342,88 @@ async fn try_run_turn( } } +async fn run_compact_task( + sess: Arc, + sub_id: String, + input: Vec, + compact_instructions: String, +) { + let start_event = Event { + id: sub_id.clone(), + msg: EventMsg::TaskStarted, + }; + if sess.tx_event.send(start_event).await.is_err() { + return; + } + + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); + let turn_input: Vec = + sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]); + + let prompt = Prompt { + input: turn_input, + user_instructions: None, + store: !sess.disable_response_storage, + extra_tools: HashMap::new(), + base_instructions_override: Some(compact_instructions.clone()), + }; + + let max_retries = sess.client.get_provider().stream_max_retries(); + let mut retries = 0; + + loop { + let attempt_result = drain_to_completed(&sess, &prompt).await; + + match attempt_result { + Ok(()) => break, + Err(CodexErr::Interrupted) => return, + Err(e) => { + if retries < max_retries { + retries += 1; + let delay = backoff(retries); + sess.notify_background_event( + &sub_id, + format!( + "stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…" + ), + ) + .await; + tokio::time::sleep(delay).await; + continue; + } else { + let event = Event { + id: sub_id.clone(), + msg: EventMsg::Error(ErrorEvent { + message: e.to_string(), + }), + }; + sess.send_event(event).await; + return; + } + } + } + } + + sess.remove_task(&sub_id); + let event = Event { + id: sub_id.clone(), + msg: EventMsg::AgentMessage(AgentMessageEvent { + message: "Compact task completed".to_string(), + }), + }; + sess.send_event(event).await; + let event = Event { + id: sub_id.clone(), + msg: EventMsg::TaskComplete(TaskCompleteEvent { + last_agent_message: None, + }), + }; + sess.send_event(event).await; + + let mut state = sess.state.lock().unwrap(); + state.history.keep_last_messages(1); +} + async fn handle_response_item( sess: &Session, sub_id: &str, @@ -1858,3 +1989,20 @@ fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option CodexResult<()> { + let mut stream = sess.client.clone().stream(prompt).await?; + loop { + let maybe_event = stream.next().await; + let Some(event) = maybe_event else { + return Err(CodexErr::Stream( + "stream closed before response.completed".into(), + )); + }; + match event { + Ok(ResponseEvent::Completed { .. }) => return Ok(()), + Ok(_) => continue, + Err(e) => return Err(e), + } + } +} diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index 4cd989cb..f5254f33 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -30,6 +30,34 @@ impl ConversationHistory { } } } + + pub(crate) fn keep_last_messages(&mut self, n: usize) { + if n == 0 { + self.items.clear(); + return; + } + + // Collect the last N message items (assistant/user), newest to oldest. + let mut kept: Vec = Vec::with_capacity(n); + for item in self.items.iter().rev() { + if let ResponseItem::Message { role, content, .. } = item { + kept.push(ResponseItem::Message { + // we need to remove the id or the model will complain that messages are sent without + // their reasonings + id: None, + role: role.clone(), + content: content.clone(), + }); + if kept.len() == n { + break; + } + } + } + + // Preserve chronological order (oldest to newest) within the kept slice. + kept.reverse(); + self.items = kept; + } } /// Anything that is not a system message or "reasoning" message is considered diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index bc922eb0..1b4832a2 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -121,6 +121,10 @@ pub enum Op { /// Request a single history entry identified by `log_id` + `offset`. GetHistoryEntryRequest { offset: usize, log_id: u64 }, + /// Request the agent to summarize the current conversation context. + /// The agent will use its existing context (either conversation history or previous response id) + /// to generate a summary which will be returned as an AgentMessage event. + Compact, /// Request to shut down codex instance. Shutdown, } diff --git a/codex-rs/core/tests/compact.rs b/codex-rs/core/tests/compact.rs new file mode 100644 index 00000000..8967c86a --- /dev/null +++ b/codex-rs/core/tests/compact.rs @@ -0,0 +1,254 @@ +#![expect(clippy::unwrap_used)] + +use codex_core::Codex; +use codex_core::CodexSpawnOk; +use codex_core::ModelProviderInfo; +use codex_core::built_in_model_providers; +use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_core::protocol::EventMsg; +use codex_core::protocol::InputItem; +use codex_core::protocol::Op; +use codex_login::CodexAuth; +use core_test_support::load_default_config_for_test; +use core_test_support::wait_for_event; +use serde_json::Value; +use tempfile::TempDir; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::method; +use wiremock::matchers::path; + +use pretty_assertions::assert_eq; + +// --- Test helpers ----------------------------------------------------------- + +/// Build an SSE stream body from a list of JSON events. +fn sse(events: Vec) -> String { + use std::fmt::Write as _; + let mut out = String::new(); + for ev in events { + let kind = ev.get("type").and_then(|v| v.as_str()).unwrap(); + writeln!(&mut out, "event: {kind}").unwrap(); + if !ev.as_object().map(|o| o.len() == 1).unwrap_or(false) { + write!(&mut out, "data: {ev}\n\n").unwrap(); + } else { + out.push('\n'); + } + } + out +} + +/// Convenience: SSE event for a completed response with a specific id. +fn ev_completed(id: &str) -> Value { + serde_json::json!({ + "type": "response.completed", + "response": { + "id": id, + "usage": {"input_tokens":0,"input_tokens_details":null,"output_tokens":0,"output_tokens_details":null,"total_tokens":0} + } + }) +} + +/// Convenience: SSE event for a single assistant message output item. +fn ev_assistant_message(id: &str, text: &str) -> Value { + serde_json::json!({ + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "id": id, + "content": [{"type": "output_text", "text": text}] + } + }) +} + +fn sse_response(body: String) -> ResponseTemplate { + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_raw(body, "text/event-stream") +} + +async fn mount_sse_once(server: &MockServer, matcher: M, body: String) +where + M: wiremock::Match + Send + Sync + 'static, +{ + Mock::given(method("POST")) + .and(path("/v1/responses")) + .and(matcher) + .respond_with(sse_response(body)) + .expect(1) + .mount(server) + .await; +} + +const FIRST_REPLY: &str = "FIRST_REPLY"; +const SUMMARY_TEXT: &str = "SUMMARY_ONLY_CONTEXT"; +const SUMMARIZE_TRIGGER: &str = "Start Summarization"; +const THIRD_USER_MSG: &str = "next turn"; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn summarize_context_three_requests_and_instructions() { + if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + // Set up a mock server that we can inspect after the run. + let server = MockServer::start().await; + + // SSE 1: assistant replies normally so it is recorded in history. + let sse1 = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed("r1"), + ]); + + // SSE 2: summarizer returns a summary message. + let sse2 = sse(vec![ + ev_assistant_message("m2", SUMMARY_TEXT), + ev_completed("r2"), + ]); + + // SSE 3: minimal completed; we only need to capture the request body. + let sse3 = sse(vec![ev_completed("r3")]); + + // Mount three expectations, one per request, matched by body content. + let first_matcher = |req: &wiremock::Request| { + let body = std::str::from_utf8(&req.body).unwrap_or(""); + body.contains("\"text\":\"hello world\"") + && !body.contains(&format!("\"text\":\"{SUMMARIZE_TRIGGER}\"")) + }; + mount_sse_once(&server, first_matcher, sse1).await; + + let second_matcher = |req: &wiremock::Request| { + let body = std::str::from_utf8(&req.body).unwrap_or(""); + body.contains(&format!("\"text\":\"{SUMMARIZE_TRIGGER}\"")) + }; + mount_sse_once(&server, second_matcher, sse2).await; + + let third_matcher = |req: &wiremock::Request| { + let body = std::str::from_utf8(&req.body).unwrap_or(""); + body.contains(&format!("\"text\":\"{THIRD_USER_MSG}\"")) + }; + mount_sse_once(&server, third_matcher, sse3).await; + + // Build config pointing to the mock server and spawn Codex. + let model_provider = ModelProviderInfo { + base_url: Some(format!("{}/v1", server.uri())), + ..built_in_model_providers()["openai"].clone() + }; + let home = TempDir::new().unwrap(); + let mut config = load_default_config_for_test(&home); + config.model_provider = model_provider; + let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new()); + let CodexSpawnOk { codex, .. } = Codex::spawn( + config, + Some(CodexAuth::from_api_key("dummy".to_string())), + ctrl_c.clone(), + ) + .await + .unwrap(); + + // 1) Normal user input – should hit server once. + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello world".into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + // 2) Summarize – second hit with summarization instructions. + codex.submit(Op::Compact).await.unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + // 3) Next user input – third hit; history should include only the summary. + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: THIRD_USER_MSG.into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + // Inspect the three captured requests. + let requests = server.received_requests().await.unwrap(); + assert_eq!(requests.len(), 3, "expected exactly three requests"); + + let req1 = &requests[0]; + let req2 = &requests[1]; + let req3 = &requests[2]; + + let body1 = req1.body_json::().unwrap(); + let body2 = req2.body_json::().unwrap(); + let body3 = req3.body_json::().unwrap(); + + // System instructions should change for the summarization turn. + let instr1 = body1.get("instructions").and_then(|v| v.as_str()).unwrap(); + let instr2 = body2.get("instructions").and_then(|v| v.as_str()).unwrap(); + assert_ne!( + instr1, instr2, + "summarization should override base instructions" + ); + assert!( + instr2.contains("You are a summarization assistant"), + "summarization instructions not applied" + ); + + // The summarization request should include the injected user input marker. + let input2 = body2.get("input").and_then(|v| v.as_array()).unwrap(); + // The last item is the user message created from the injected input. + let last2 = input2.last().unwrap(); + assert_eq!(last2.get("type").unwrap().as_str().unwrap(), "message"); + assert_eq!(last2.get("role").unwrap().as_str().unwrap(), "user"); + let text2 = last2["content"][0]["text"].as_str().unwrap(); + assert!(text2.contains(SUMMARIZE_TRIGGER)); + + // Third request must contain only the summary from step 2 as prior history plus new user msg. + let input3 = body3.get("input").and_then(|v| v.as_array()).unwrap(); + println!("third request body: {body3}"); + assert!( + input3.len() >= 2, + "expected summary + new user message in third request" + ); + + // Collect all (role, text) message tuples. + let mut messages: Vec<(String, String)> = Vec::new(); + for item in input3 { + if item["type"].as_str() == Some("message") { + let role = item["role"].as_str().unwrap_or_default().to_string(); + let text = item["content"][0]["text"] + .as_str() + .unwrap_or_default() + .to_string(); + messages.push((role, text)); + } + } + + // Exactly one assistant message should remain after compaction and the new user message is present. + let assistant_count = messages.iter().filter(|(r, _)| r == "assistant").count(); + assert_eq!( + assistant_count, 1, + "exactly one assistant message should remain after compaction" + ); + assert!( + messages + .iter() + .any(|(r, t)| r == "user" && t == THIRD_USER_MSG), + "third request should include the new user message" + ); + assert!( + !messages.iter().any(|(_, t)| t.contains("hello world")), + "third request should not include the original user input" + ); + assert!( + !messages.iter().any(|(_, t)| t.contains(SUMMARIZE_TRIGGER)), + "third request should not include the summarize trigger" + ); +} diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index cc3118e8..d4c2f3c1 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -10,6 +10,8 @@ use crate::tui; use codex_core::config::Config; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecApprovalRequestEvent; +use codex_core::protocol::Op; use color_eyre::eyre::Result; use crossterm::SynchronizedUpdate; use crossterm::event::KeyCode; @@ -298,6 +300,12 @@ impl App<'_> { self.app_state = AppState::Chat { widget: new_widget }; self.app_event_tx.send(AppEvent::RequestRedraw); } + SlashCommand::Compact => { + if let AppState::Chat { widget } = &mut self.app_state { + widget.clear_token_usage(); + self.app_event_tx.send(AppEvent::CodexOp(Op::Compact)); + } + } SlashCommand::Quit => { break; } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 4c415777..c777af52 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -502,6 +502,12 @@ impl ChatWidget<'_> { pub(crate) fn token_usage(&self) -> &TokenUsage { &self.token_usage } + + pub(crate) fn clear_token_usage(&mut self) { + self.token_usage = TokenUsage::default(); + self.bottom_pane + .set_token_usage(self.token_usage.clone(), self.config.model_context_window); + } } impl WidgetRef for &ChatWidget<'_> { diff --git a/codex-rs/tui/src/slash_command.rs b/codex-rs/tui/src/slash_command.rs index 7df1bcbd..d82a1660 100644 --- a/codex-rs/tui/src/slash_command.rs +++ b/codex-rs/tui/src/slash_command.rs @@ -13,6 +13,7 @@ pub enum SlashCommand { // DO NOT ALPHA-SORT! Enum order is presentation order in the popup, so // more frequently used commands should be listed first. New, + Compact, Diff, Quit, #[cfg(debug_assertions)] @@ -24,6 +25,7 @@ impl SlashCommand { pub fn description(self) -> &'static str { match self { SlashCommand::New => "Start a new chat.", + SlashCommand::Compact => "Compact the chat history.", SlashCommand::Quit => "Exit the application.", SlashCommand::Diff => { "Show git diff of the working directory (including untracked files)"