2025-09-12 13:07:10 -07:00
use std ::sync ::Arc ;
use super ::Session ;
use super ::TurnContext ;
use super ::get_last_assistant_message_from_turn ;
use crate ::Prompt ;
use crate ::client_common ::ResponseEvent ;
use crate ::error ::CodexErr ;
use crate ::error ::Result as CodexResult ;
use crate ::protocol ::AgentMessageEvent ;
use crate ::protocol ::CompactedItem ;
use crate ::protocol ::ErrorEvent ;
use crate ::protocol ::EventMsg ;
use crate ::protocol ::TaskStartedEvent ;
use crate ::protocol ::TurnContextItem ;
2025-10-31 13:27:33 -07:00
use crate ::protocol ::WarningEvent ;
2025-09-22 16:12:26 -07:00
use crate ::truncate ::truncate_middle ;
2025-09-12 13:07:10 -07:00
use crate ::util ::backoff ;
use askama ::Template ;
2025-10-22 10:14:50 -07:00
use codex_protocol ::items ::TurnItem ;
2025-09-12 13:07:10 -07:00
use codex_protocol ::models ::ContentItem ;
use codex_protocol ::models ::ResponseInputItem ;
use codex_protocol ::models ::ResponseItem ;
use codex_protocol ::protocol ::RolloutItem ;
2025-10-20 13:34:44 -07:00
use codex_protocol ::user_input ::UserInput ;
2025-09-12 13:07:10 -07:00
use futures ::prelude ::* ;
2025-10-22 13:08:46 -07:00
use tracing ::error ;
2025-09-12 13:07:10 -07:00
2025-09-23 17:59:17 +01:00
pub const SUMMARIZATION_PROMPT : & str = include_str! ( " ../../templates/compact/prompt.md " ) ;
2025-09-22 16:12:26 -07:00
const COMPACT_USER_MESSAGE_MAX_TOKENS : usize = 20_000 ;
2025-09-12 13:07:10 -07:00
#[ derive(Template) ]
#[ template(path = " compact/history_bridge.md " , escape = " none " ) ]
struct HistoryBridgeTemplate < ' a > {
user_messages_text : & ' a str ,
summary_text : & ' a str ,
}
2025-09-26 15:49:08 +02:00
pub ( crate ) async fn run_inline_auto_compact_task (
2025-09-12 13:07:10 -07:00
sess : Arc < Session > ,
turn_context : Arc < TurnContext > ,
) {
2025-10-30 14:24:24 +00:00
let prompt = turn_context . compact_prompt ( ) . to_string ( ) ;
let input = vec! [ UserInput ::Text { text : prompt } ] ;
2025-10-21 08:04:16 -07:00
run_compact_task_inner ( sess , turn_context , input ) . await ;
2025-09-12 13:07:10 -07:00
}
2025-09-26 15:49:08 +02:00
pub ( crate ) async fn run_compact_task (
2025-09-12 13:07:10 -07:00
sess : Arc < Session > ,
turn_context : Arc < TurnContext > ,
2025-10-20 13:34:44 -07:00
input : Vec < UserInput > ,
2025-09-26 15:49:08 +02:00
) -> Option < String > {
2025-10-21 08:04:16 -07:00
let start_event = EventMsg ::TaskStarted ( TaskStartedEvent {
model_context_window : turn_context . client . get_model_context_window ( ) ,
} ) ;
sess . send_event ( & turn_context , start_event ) . await ;
run_compact_task_inner ( sess . clone ( ) , turn_context , input ) . await ;
2025-09-26 15:49:08 +02:00
None
2025-09-12 13:07:10 -07:00
}
async fn run_compact_task_inner (
sess : Arc < Session > ,
turn_context : Arc < TurnContext > ,
2025-10-20 13:34:44 -07:00
input : Vec < UserInput > ,
2025-09-12 13:07:10 -07:00
) {
let initial_input_for_turn : ResponseInputItem = ResponseInputItem ::from ( input ) ;
2025-10-22 13:08:46 -07:00
let mut history = sess . clone_history ( ) . await ;
history . record_items ( & [ initial_input_for_turn . into ( ) ] ) ;
2025-10-08 18:11:08 +01:00
let mut truncated_count = 0 usize ;
2025-09-12 13:07:10 -07:00
let max_retries = turn_context . client . get_provider ( ) . stream_max_retries ( ) ;
let mut retries = 0 ;
let rollout_item = RolloutItem ::TurnContext ( TurnContextItem {
cwd : turn_context . cwd . clone ( ) ,
approval_policy : turn_context . approval_policy ,
sandbox_policy : turn_context . sandbox_policy . clone ( ) ,
model : turn_context . client . get_model ( ) ,
effort : turn_context . client . get_reasoning_effort ( ) ,
summary : turn_context . client . get_reasoning_summary ( ) ,
} ) ;
sess . persist_rollout_items ( & [ rollout_item ] ) . await ;
loop {
2025-10-28 11:39:34 -07:00
let turn_input = history . get_history_for_prompt ( ) ;
2025-10-08 18:11:08 +01:00
let prompt = Prompt {
2025-10-28 11:39:34 -07:00
input : turn_input . clone ( ) ,
2025-10-08 18:11:08 +01:00
.. Default ::default ( )
} ;
2025-10-21 08:04:16 -07:00
let attempt_result = drain_to_completed ( & sess , turn_context . as_ref ( ) , & prompt ) . await ;
2025-09-12 13:07:10 -07:00
match attempt_result {
Ok ( ( ) ) = > {
2025-10-08 18:11:08 +01:00
if truncated_count > 0 {
sess . notify_background_event (
2025-10-21 08:04:16 -07:00
turn_context . as_ref ( ) ,
2025-10-08 18:11:08 +01:00
format! (
" Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window. "
) ,
)
. await ;
}
2025-09-12 13:07:10 -07:00
break ;
}
Err ( CodexErr ::Interrupted ) = > {
return ;
}
2025-10-04 18:40:06 -07:00
Err ( e @ CodexErr ::ContextWindowExceeded ) = > {
2025-10-28 11:39:34 -07:00
if turn_input . len ( ) > 1 {
2025-10-22 13:08:46 -07:00
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error! (
" Context window exceeded while compacting; removing oldest history item. Error: {e} "
) ;
history . remove_first_item ( ) ;
2025-10-08 18:11:08 +01:00
truncated_count + = 1 ;
retries = 0 ;
continue ;
}
2025-10-21 08:04:16 -07:00
sess . set_total_tokens_full ( turn_context . as_ref ( ) ) . await ;
let event = EventMsg ::Error ( ErrorEvent {
message : e . to_string ( ) ,
} ) ;
sess . send_event ( & turn_context , event ) . await ;
2025-10-04 18:40:06 -07:00
return ;
}
2025-09-12 13:07:10 -07:00
Err ( e ) = > {
if retries < max_retries {
retries + = 1 ;
let delay = backoff ( retries ) ;
sess . notify_stream_error (
2025-10-21 08:04:16 -07:00
turn_context . as_ref ( ) ,
2025-10-27 12:09:53 -05:00
format! ( " Reconnecting... {retries} / {max_retries} " ) ,
2025-09-12 13:07:10 -07:00
)
. await ;
tokio ::time ::sleep ( delay ) . await ;
continue ;
} else {
2025-10-21 08:04:16 -07:00
let event = EventMsg ::Error ( ErrorEvent {
message : e . to_string ( ) ,
} ) ;
sess . send_event ( & turn_context , event ) . await ;
2025-09-12 13:07:10 -07:00
return ;
}
}
}
}
2025-10-28 11:39:34 -07:00
let history_snapshot = sess . clone_history ( ) . await . get_history ( ) ;
2025-09-12 13:07:10 -07:00
let summary_text = get_last_assistant_message_from_turn ( & history_snapshot ) . unwrap_or_default ( ) ;
let user_messages = collect_user_messages ( & history_snapshot ) ;
2025-09-14 09:23:31 -04:00
let initial_context = sess . build_initial_context ( turn_context . as_ref ( ) ) ;
2025-10-27 11:48:01 +00:00
let mut new_history = build_compacted_history ( initial_context , & user_messages , & summary_text ) ;
let ghost_snapshots : Vec < ResponseItem > = history_snapshot
. iter ( )
. filter ( | item | matches! ( item , ResponseItem ::GhostSnapshot { .. } ) )
. cloned ( )
. collect ( ) ;
new_history . extend ( ghost_snapshots ) ;
ref: full state refactor (#4174)
## Current State Observations
- `Session` currently holds many unrelated responsibilities (history,
approval queues, task handles, rollout recorder, shell discovery, token
tracking, etc.), making it hard to reason about ownership and lifetimes.
- The anonymous `State` struct inside `codex.rs` mixes session-long data
with turn-scoped queues and approval bookkeeping.
- Turn execution (`run_task`) relies on ad-hoc local variables that
should conceptually belong to a per-turn state object.
- External modules (`codex::compact`, tests) frequently poke the raw
`Session.state` mutex, which couples them to implementation details.
- Interrupts, approvals, and rollout persistence all have bespoke
cleanup paths, contributing to subtle bugs when a turn is aborted
mid-flight.
## Desired End State
- Keep a slim `Session` object that acts as the orchestrator and façade.
It should expose a focused API (submit, approvals, interrupts, event
emission) without storing unrelated fields directly.
- Introduce a `state` module that encapsulates all mutable data
structures:
- `SessionState`: session-persistent data (history, approved commands,
token/rate-limit info, maybe user preferences).
- `ActiveTurn`: metadata for the currently running turn (sub-id, task
kind, abort handle) and an `Arc<TurnState>`.
- `TurnState`: all turn-scoped pieces (pending inputs, approval waiters,
diff tracker, review history, auto-compact flags, last agent message,
outstanding tool call bookkeeping).
- Group long-lived helpers/managers into a dedicated `SessionServices`
struct so `Session` does not accumulate "random" fields.
- Provide clear, lock-safe APIs so other modules never touch raw
mutexes.
- Ensure every turn creates/drops a `TurnState` and that
interrupts/finishes delegate cleanup to it.
2025-09-25 11:16:06 +01:00
sess . replace_history ( new_history ) . await ;
2025-09-12 13:07:10 -07:00
let rollout_item = RolloutItem ::Compacted ( CompactedItem {
message : summary_text . clone ( ) ,
} ) ;
sess . persist_rollout_items ( & [ rollout_item ] ) . await ;
2025-10-21 08:04:16 -07:00
let event = EventMsg ::AgentMessage ( AgentMessageEvent {
message : " Compact task completed " . to_string ( ) ,
} ) ;
sess . send_event ( & turn_context , event ) . await ;
2025-10-31 13:27:33 -07:00
let warning = EventMsg ::Warning ( WarningEvent {
message : " Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start new a new conversation when possible to keep conversations small and targeted. " . to_string ( ) ,
} ) ;
sess . send_event ( & turn_context , warning ) . await ;
2025-09-12 13:07:10 -07:00
}
2025-09-18 13:55:53 -07:00
pub fn content_items_to_text ( content : & [ ContentItem ] ) -> Option < String > {
2025-09-12 13:07:10 -07:00
let mut pieces = Vec ::new ( ) ;
for item in content {
match item {
ContentItem ::InputText { text } | ContentItem ::OutputText { text } = > {
if ! text . is_empty ( ) {
pieces . push ( text . as_str ( ) ) ;
}
}
ContentItem ::InputImage { .. } = > { }
}
}
if pieces . is_empty ( ) {
None
} else {
Some ( pieces . join ( " \n " ) )
}
}
2025-09-14 09:23:31 -04:00
pub ( crate ) fn collect_user_messages ( items : & [ ResponseItem ] ) -> Vec < String > {
2025-09-12 13:07:10 -07:00
items
. iter ( )
2025-10-22 10:14:50 -07:00
. filter_map ( | item | match crate ::event_mapping ::parse_turn_item ( item ) {
Some ( TurnItem ::UserMessage ( user ) ) = > Some ( user . message ( ) ) ,
2025-09-12 13:07:10 -07:00
_ = > None ,
} )
. collect ( )
}
2025-09-14 09:23:31 -04:00
pub ( crate ) fn build_compacted_history (
initial_context : Vec < ResponseItem > ,
2025-09-12 13:07:10 -07:00
user_messages : & [ String ] ,
summary_text : & str ,
) -> Vec < ResponseItem > {
2025-10-26 01:46:08 -05:00
build_compacted_history_with_limit (
initial_context ,
user_messages ,
summary_text ,
COMPACT_USER_MESSAGE_MAX_TOKENS * 4 ,
)
}
fn build_compacted_history_with_limit (
mut history : Vec < ResponseItem > ,
user_messages : & [ String ] ,
summary_text : & str ,
max_bytes : usize ,
) -> Vec < ResponseItem > {
2025-09-22 16:12:26 -07:00
let mut user_messages_text = if user_messages . is_empty ( ) {
2025-09-12 13:07:10 -07:00
" (none) " . to_string ( )
} else {
user_messages . join ( " \n \n " )
} ;
2025-09-22 16:12:26 -07:00
// Truncate the concatenated prior user messages so the bridge message
// stays well under the context window (approx. 4 bytes/token).
if user_messages_text . len ( ) > max_bytes {
user_messages_text = truncate_middle ( & user_messages_text , max_bytes ) . 0 ;
}
2025-09-12 13:07:10 -07:00
let summary_text = if summary_text . is_empty ( ) {
" (no summary available) " . to_string ( )
} else {
summary_text . to_string ( )
} ;
let Ok ( bridge ) = HistoryBridgeTemplate {
user_messages_text : & user_messages_text ,
summary_text : & summary_text ,
}
. render ( ) else {
return vec! [ ] ;
} ;
history . push ( ResponseItem ::Message {
id : None ,
role : " user " . to_string ( ) ,
content : vec ! [ ContentItem ::InputText { text : bridge } ] ,
} ) ;
history
}
async fn drain_to_completed (
sess : & Session ,
turn_context : & TurnContext ,
prompt : & Prompt ,
) -> CodexResult < ( ) > {
2025-10-29 14:04:25 -07:00
let mut stream = turn_context . client . clone ( ) . stream ( prompt ) . await ? ;
2025-09-12 13:07:10 -07:00
loop {
let maybe_event = stream . next ( ) . await ;
let Some ( event ) = maybe_event else {
return Err ( CodexErr ::Stream (
" stream closed before response.completed " . into ( ) ,
None ,
) ) ;
} ;
match event {
Ok ( ResponseEvent ::OutputItemDone ( item ) ) = > {
ref: full state refactor (#4174)
## Current State Observations
- `Session` currently holds many unrelated responsibilities (history,
approval queues, task handles, rollout recorder, shell discovery, token
tracking, etc.), making it hard to reason about ownership and lifetimes.
- The anonymous `State` struct inside `codex.rs` mixes session-long data
with turn-scoped queues and approval bookkeeping.
- Turn execution (`run_task`) relies on ad-hoc local variables that
should conceptually belong to a per-turn state object.
- External modules (`codex::compact`, tests) frequently poke the raw
`Session.state` mutex, which couples them to implementation details.
- Interrupts, approvals, and rollout persistence all have bespoke
cleanup paths, contributing to subtle bugs when a turn is aborted
mid-flight.
## Desired End State
- Keep a slim `Session` object that acts as the orchestrator and façade.
It should expose a focused API (submit, approvals, interrupts, event
emission) without storing unrelated fields directly.
- Introduce a `state` module that encapsulates all mutable data
structures:
- `SessionState`: session-persistent data (history, approved commands,
token/rate-limit info, maybe user preferences).
- `ActiveTurn`: metadata for the currently running turn (sub-id, task
kind, abort handle) and an `Arc<TurnState>`.
- `TurnState`: all turn-scoped pieces (pending inputs, approval waiters,
diff tracker, review history, auto-compact flags, last agent message,
outstanding tool call bookkeeping).
- Group long-lived helpers/managers into a dedicated `SessionServices`
struct so `Session` does not accumulate "random" fields.
- Provide clear, lock-safe APIs so other modules never touch raw
mutexes.
- Ensure every turn creates/drops a `TurnState` and that
interrupts/finishes delegate cleanup to it.
2025-09-25 11:16:06 +01:00
sess . record_into_history ( std ::slice ::from_ref ( & item ) ) . await ;
2025-09-12 13:07:10 -07:00
}
2025-09-26 16:24:27 +02:00
Ok ( ResponseEvent ::RateLimits ( snapshot ) ) = > {
2025-10-21 08:04:16 -07:00
sess . update_rate_limits ( turn_context , snapshot ) . await ;
2025-09-26 16:24:27 +02:00
}
Ok ( ResponseEvent ::Completed { token_usage , .. } ) = > {
2025-10-21 08:04:16 -07:00
sess . update_token_usage_info ( turn_context , token_usage . as_ref ( ) )
2025-09-26 16:24:27 +02:00
. await ;
2025-09-12 13:07:10 -07:00
return Ok ( ( ) ) ;
}
Ok ( _ ) = > continue ,
Err ( e ) = > return Err ( e ) ,
}
}
}
#[ cfg(test) ]
mod tests {
use super ::* ;
use pretty_assertions ::assert_eq ;
#[ test ]
fn content_items_to_text_joins_non_empty_segments ( ) {
let items = vec! [
ContentItem ::InputText {
text : " hello " . to_string ( ) ,
} ,
ContentItem ::OutputText {
text : String ::new ( ) ,
} ,
ContentItem ::OutputText {
text : " world " . to_string ( ) ,
} ,
] ;
let joined = content_items_to_text ( & items ) ;
assert_eq! ( Some ( " hello \n world " . to_string ( ) ) , joined ) ;
}
#[ test ]
fn content_items_to_text_ignores_image_only_content ( ) {
let items = vec! [ ContentItem ::InputImage {
image_url : " file://image.png " . to_string ( ) ,
} ] ;
let joined = content_items_to_text ( & items ) ;
assert_eq! ( None , joined ) ;
}
#[ test ]
fn collect_user_messages_extracts_user_text_only ( ) {
let items = vec! [
ResponseItem ::Message {
id : Some ( " assistant " . to_string ( ) ) ,
role : " assistant " . to_string ( ) ,
content : vec ! [ ContentItem ::OutputText {
text : " ignored " . to_string ( ) ,
} ] ,
} ,
ResponseItem ::Message {
id : Some ( " user " . to_string ( ) ) ,
role : " user " . to_string ( ) ,
2025-10-22 10:14:50 -07:00
content : vec ! [ ContentItem ::InputText {
text : " first " . to_string ( ) ,
} ] ,
2025-09-12 13:07:10 -07:00
} ,
ResponseItem ::Other ,
] ;
let collected = collect_user_messages ( & items ) ;
2025-10-22 10:14:50 -07:00
assert_eq! ( vec! [ " first " . to_string ( ) ] , collected ) ;
2025-09-12 13:07:10 -07:00
}
#[ test ]
fn collect_user_messages_filters_session_prefix_entries ( ) {
let items = vec! [
ResponseItem ::Message {
id : None ,
role : " user " . to_string ( ) ,
content : vec ! [ ContentItem ::InputText {
2025-10-30 18:44:02 -07:00
text : " # AGENTS.md instructions for project \n \n <INSTRUCTIONS> \n do things \n </INSTRUCTIONS> "
. to_string ( ) ,
2025-09-12 13:07:10 -07:00
} ] ,
} ,
ResponseItem ::Message {
id : None ,
role : " user " . to_string ( ) ,
content : vec ! [ ContentItem ::InputText {
text : " <ENVIRONMENT_CONTEXT>cwd=/tmp</ENVIRONMENT_CONTEXT> " . to_string ( ) ,
} ] ,
} ,
ResponseItem ::Message {
id : None ,
role : " user " . to_string ( ) ,
content : vec ! [ ContentItem ::InputText {
text : " real user message " . to_string ( ) ,
} ] ,
} ,
] ;
let collected = collect_user_messages ( & items ) ;
assert_eq! ( vec! [ " real user message " . to_string ( ) ] , collected ) ;
}
2025-09-22 16:12:26 -07:00
#[ test ]
fn build_compacted_history_truncates_overlong_user_messages ( ) {
2025-10-26 01:46:08 -05:00
// Use a small truncation limit so the test remains fast while still validating
// that oversized user content is truncated.
let max_bytes = 128 ;
let big = " X " . repeat ( max_bytes + 50 ) ;
let history = super ::build_compacted_history_with_limit (
Vec ::new ( ) ,
std ::slice ::from_ref ( & big ) ,
" SUMMARY " ,
max_bytes ,
) ;
2025-09-22 16:12:26 -07:00
// Expect exactly one bridge message added to history (plus any initial context we provided, which is none).
assert_eq! ( history . len ( ) , 1 ) ;
// Extract the text content of the bridge message.
let bridge_text = match & history [ 0 ] {
ResponseItem ::Message { role , content , .. } if role = = " user " = > {
content_items_to_text ( content ) . unwrap_or_default ( )
}
other = > panic! ( " unexpected item in history: {other:?} " ) ,
} ;
// The bridge should contain the truncation marker and not the full original payload.
assert! (
bridge_text . contains ( " tokens truncated " ) ,
" expected truncation marker in bridge message "
) ;
assert! (
! bridge_text . contains ( & big ) ,
" bridge should not include the full oversized user text "
) ;
assert! (
bridge_text . contains ( " SUMMARY " ) ,
" bridge should include the provided summary text "
) ;
}
2025-09-12 13:07:10 -07:00
}