Always send entire request context (#1641)

Always store the entire conversation history.
Request encrypted COT when not storing Responses.
Send entire input context instead of sending previous_response_id
This commit is contained in:
pakrym-oai
2025-07-23 10:37:45 -07:00
committed by GitHub
parent d6c4083f98
commit 591cb6149a
8 changed files with 101 additions and 325 deletions

View File

@@ -34,7 +34,6 @@ use tracing::trace;
use tracing::warn;
use uuid::Uuid;
use crate::WireApi;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
@@ -191,6 +190,7 @@ pub(crate) struct Session {
sandbox_policy: SandboxPolicy,
shell_environment_policy: ShellEnvironmentPolicy,
writable_roots: Mutex<Vec<PathBuf>>,
disable_response_storage: bool,
/// Manager for external MCP servers/tools.
mcp_connection_manager: McpConnectionManager,
@@ -219,13 +219,9 @@ impl Session {
struct State {
approved_commands: HashSet<Vec<String>>,
current_task: Option<AgentTask>,
/// Call IDs that have been sent from the Responses API but have not been sent back yet.
/// You CANNOT send a Responses API follow-up message unless you have sent back the output for all pending calls or else it will 400.
pending_call_ids: HashSet<String>,
previous_response_id: Option<String>,
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
zdr_transcript: Option<ConversationHistory>,
history: ConversationHistory,
}
impl Session {
@@ -320,18 +316,11 @@ impl Session {
debug!("Recording items for conversation: {items:?}");
self.record_state_snapshot(items).await;
if let Some(transcript) = self.state.lock().unwrap().zdr_transcript.as_mut() {
transcript.record_items(items);
}
self.state.lock().unwrap().history.record_items(items);
}
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
let snapshot = {
let state = self.state.lock().unwrap();
crate::rollout::SessionStateSnapshot {
previous_response_id: state.previous_response_id.clone(),
}
};
let snapshot = { crate::rollout::SessionStateSnapshot {} };
let recorder = {
let guard = self.rollout.lock().unwrap();
@@ -433,8 +422,6 @@ impl Session {
pub fn abort(&self) {
info!("Aborting existing session");
let mut state = self.state.lock().unwrap();
// Don't clear pending_call_ids because we need to keep track of them to ensure we don't 400 on the next turn.
// We will generate a synthetic aborted response for each pending call id.
state.pending_approvals.clear();
state.pending_input.clear();
if let Some(task) = state.current_task.take() {
@@ -479,15 +466,10 @@ impl Drop for Session {
}
impl State {
pub fn partial_clone(&self, retain_zdr_transcript: bool) -> Self {
pub fn partial_clone(&self) -> Self {
Self {
approved_commands: self.approved_commands.clone(),
previous_response_id: self.previous_response_id.clone(),
zdr_transcript: if retain_zdr_transcript {
self.zdr_transcript.clone()
} else {
None
},
history: self.history.clone(),
..Default::default()
}
}
@@ -606,13 +588,11 @@ async fn submission_loop(
}
// Optionally resume an existing rollout.
let mut restored_items: Option<Vec<ResponseItem>> = None;
let mut restored_prev_id: Option<String> = None;
let rollout_recorder: Option<RolloutRecorder> =
if let Some(path) = resume_path.as_ref() {
match RolloutRecorder::resume(path).await {
Ok((rec, saved)) => {
session_id = saved.session_id;
restored_prev_id = saved.state.previous_response_id;
if !saved.items.is_empty() {
restored_items = Some(saved.items);
}
@@ -651,22 +631,13 @@ async fn submission_loop(
);
// abort any current running session and clone its state
let retain_zdr_transcript =
record_conversation_history(disable_response_storage, provider.wire_api);
let state = match sess.take() {
Some(sess) => {
sess.abort();
sess.state
.lock()
.unwrap()
.partial_clone(retain_zdr_transcript)
sess.state.lock().unwrap().partial_clone()
}
None => State {
zdr_transcript: if retain_zdr_transcript {
Some(ConversationHistory::new())
} else {
None
},
history: ConversationHistory::new(),
..Default::default()
},
};
@@ -717,18 +688,14 @@ async fn submission_loop(
state: Mutex::new(state),
rollout: Mutex::new(rollout_recorder),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
disable_response_storage,
}));
// Patch restored state into the newly created session.
if let Some(sess_arc) = &sess {
if restored_prev_id.is_some() || restored_items.is_some() {
if restored_items.is_some() {
let mut st = sess_arc.state.lock().unwrap();
st.previous_response_id = restored_prev_id;
if let (Some(hist), Some(items)) =
(st.zdr_transcript.as_mut(), restored_items.as_ref())
{
hist.record_items(items.iter());
}
st.history.record_items(restored_items.unwrap().iter());
}
}
@@ -875,14 +842,8 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
.await;
let mut input_for_next_turn: Vec<ResponseInputItem> = vec![initial_input_for_turn];
let last_agent_message: Option<String>;
loop {
let mut net_new_turn_input = input_for_next_turn
.drain(..)
.map(ResponseItem::from)
.collect::<Vec<_>>();
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
@@ -899,29 +860,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
// only record the new items that originated in this turn so that it
// represents an append-only log without duplicates.
let turn_input: Vec<ResponseItem> =
if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() {
// If we are using Chat/ZDR, we need to send the transcript with
// every turn. By induction, `transcript` already contains:
// - The `input` that kicked off this task.
// - Each `ResponseItem` that was recorded in the previous turn.
// - Each response to a `ResponseItem` (in practice, the only
// response type we seem to have is `FunctionCallOutput`).
//
// The only thing the `transcript` does not contain is the
// `pending_input` that was injected while the model was
// running. We need to add that to the conversation history
// so that the model can see it in the next turn.
[transcript.contents(), pending_input].concat()
} else {
// In practice, net_new_turn_input should contain only:
// - User messages
// - Outputs for function calls requested by the model
net_new_turn_input.extend(pending_input);
// Responses API path we can just send the new items and
// record the same.
net_new_turn_input
};
[sess.state.lock().unwrap().history.contents(), pending_input].concat();
let turn_input_messages: Vec<String> = turn_input
.iter()
@@ -997,8 +936,19 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
},
);
}
(ResponseItem::Reasoning { .. }, None) => {
// Omit from conversation history.
(
ResponseItem::Reasoning {
id,
summary,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
@@ -1027,8 +977,6 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
});
break;
}
input_for_next_turn = responses;
}
Err(e) => {
info!("Turn error: {e:#}");
@@ -1056,26 +1004,11 @@ async fn run_turn(
sub_id: String,
input: Vec<ResponseItem>,
) -> CodexResult<Vec<ProcessedResponseItem>> {
// Decide whether to use server-side storage (previous_response_id) or disable it
let (prev_id, store) = {
let state = sess.state.lock().unwrap();
let store = state.zdr_transcript.is_none();
let prev_id = if store {
state.previous_response_id.clone()
} else {
// When using ZDR, the Responses API may send previous_response_id
// back, but trying to use it results in a 400.
None
};
(prev_id, store)
};
let extra_tools = sess.mcp_connection_manager.list_all_tools();
let prompt = Prompt {
input,
prev_id,
user_instructions: sess.user_instructions.clone(),
store,
store: !sess.disable_response_storage,
extra_tools,
base_instructions_override: sess.base_instructions.clone(),
};
@@ -1149,11 +1082,17 @@ async fn try_run_turn(
// This usually happens because the user interrupted the model before we responded to one of its tool calls
// and then the user sent a follow-up message.
let missing_calls = {
sess.state
.lock()
.unwrap()
.pending_call_ids
prompt
.input
.iter()
.filter_map(|ri| match ri {
ResponseItem::FunctionCall { call_id, .. } => Some(call_id),
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => Some(call_id),
_ => None,
})
.filter_map(|call_id| {
if completed_call_ids.contains(&call_id) {
None
@@ -1207,31 +1146,14 @@ async fn try_run_turn(
};
match event {
ResponseEvent::Created => {
let mut state = sess.state.lock().unwrap();
// We successfully created a new response and ensured that all pending calls were included so we can clear the pending call ids.
state.pending_call_ids.clear();
}
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let call_id = match &item {
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => Some(call_id),
ResponseItem::FunctionCall { call_id, .. } => Some(call_id),
_ => None,
};
if let Some(call_id) = call_id {
// We just got a new call id so we need to make sure to respond to it in the next turn.
let mut state = sess.state.lock().unwrap();
state.pending_call_ids.insert(call_id.clone());
}
let response = handle_response_item(sess, sub_id, item.clone()).await?;
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::Completed {
response_id,
response_id: _,
token_usage,
} => {
if let Some(token_usage) = token_usage {
@@ -1244,8 +1166,6 @@ async fn try_run_turn(
.ok();
}
let mut state = sess.state.lock().unwrap();
state.previous_response_id = Some(response_id);
return Ok(output);
}
ResponseEvent::OutputTextDelta(delta) => {
@@ -1285,7 +1205,7 @@ async fn handle_response_item(
}
None
}
ResponseItem::Reasoning { id: _, summary } => {
ResponseItem::Reasoning { summary, .. } => {
for item in summary {
let text = match item {
ReasoningItemReasoningSummary::SummaryText { text } => text,
@@ -1302,6 +1222,7 @@ async fn handle_response_item(
name,
arguments,
call_id,
..
} => {
info!("FunctionCall: {arguments}");
Some(handle_function_call(sess, sub_id.to_string(), name, arguments, call_id).await)
@@ -2092,7 +2013,7 @@ fn format_exec_output(output: &str, exit_code: i32, duration: Duration) -> Strin
fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
responses.iter().rev().find_map(|item| {
if let ResponseItem::Message { role, content } = item {
if let ResponseItem::Message { role, content, .. } = item {
if role == "assistant" {
content.iter().rev().find_map(|ci| {
if let ContentItem::OutputText { text } = ci {
@@ -2109,15 +2030,3 @@ fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<St
}
})
}
/// See [`ConversationHistory`] for details.
fn record_conversation_history(disable_response_storage: bool, wire_api: WireApi) -> bool {
if disable_response_storage {
return true;
}
match wire_api {
WireApi::Responses => false,
WireApi::Chat => true,
}
}