diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 51e8b8e8..10ec0b97 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -28,15 +28,20 @@ use crate::flags::CODEX_RS_SSE_FIXTURE; use crate::flags::OPENAI_API_BASE; use crate::flags::OPENAI_REQUEST_MAX_RETRIES; use crate::flags::OPENAI_STREAM_IDLE_TIMEOUT_MS; -use crate::models::ResponseInputItem; use crate::models::ResponseItem; use crate::util::backoff; +/// API request payload for a single model turn. #[derive(Default, Debug, Clone)] pub struct Prompt { - pub input: Vec, + /// Conversation context input items. + pub input: Vec, + /// Optional previous response ID (when storage is enabled). pub prev_id: Option, + /// Optional initial instructions (only sent on first turn). pub instructions: Option, + /// Whether to store response on server side (disable_response_storage = !store). + pub store: bool, } #[derive(Debug)] @@ -50,13 +55,18 @@ struct Payload<'a> { model: &'a str, #[serde(skip_serializing_if = "Option::is_none")] instructions: Option<&'a String>, - input: &'a Vec, + // TODO(mbolin): ResponseItem::Other should not be serialized. Currently, + // we code defensively to avoid this case, but perhaps we should use a + // separate enum for serialization. + input: &'a Vec, tools: &'a [Tool], tool_choice: &'static str, parallel_tool_calls: bool, reasoning: Option, #[serde(skip_serializing_if = "Option::is_none")] previous_response_id: Option, + /// true when using the Responses API. + store: bool, stream: bool, } @@ -151,6 +161,7 @@ impl ModelClient { generate_summary: None, }), previous_response_id: prompt.prev_id.clone(), + store: prompt.store, stream: true, }; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index e57d3bbf..0d17c8e4 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -55,6 +55,7 @@ use crate::safety::assess_command_safety; use crate::safety::assess_patch_safety; use crate::safety::SafetyCheck; use crate::util::backoff; +use crate::zdr_transcript::ZdrTranscript; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. @@ -214,6 +215,7 @@ struct State { previous_response_id: Option, pending_approvals: HashMap>, pending_input: Vec, + zdr_transcript: Option, } impl Session { @@ -399,6 +401,7 @@ impl State { Self { approved_commands: self.approved_commands.clone(), previous_response_id: self.previous_response_id.clone(), + zdr_transcript: self.zdr_transcript.clone(), ..Default::default() } } @@ -489,6 +492,7 @@ async fn submission_loop( instructions, approval_policy, sandbox_policy, + disable_response_storage, } => { let model = model.unwrap_or_else(|| OPENAI_DEFAULT_MODEL.to_string()); info!(model, "Configuring session"); @@ -500,7 +504,14 @@ async fn submission_loop( sess.abort(); sess.state.lock().unwrap().partial_clone() } - None => State::default(), + None => State { + zdr_transcript: if disable_response_storage { + Some(ZdrTranscript::new()) + } else { + None + }, + ..Default::default() + }, }; // update session @@ -587,18 +598,54 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { return; } - let mut turn_input = vec![ResponseInputItem::from(input)]; + let mut pending_response_input: Vec = vec![ResponseInputItem::from(input)]; loop { - let pending_input = sess.get_pending_input(); - turn_input.splice(0..0, pending_input); + let mut net_new_turn_input = pending_response_input + .drain(..) + .map(ResponseItem::from) + .collect::>(); + + // Note that pending_input would be something like a message the user + // submitted through the UI while the model was running. Though the UI + // may support this, the model might not. + let pending_input = sess.get_pending_input().into_iter().map(ResponseItem::from); + net_new_turn_input.extend(pending_input); + + let turn_input: Vec = + if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() { + // If we are using ZDR, we need to send the transcript with every turn. + let mut full_transcript = transcript.contents(); + full_transcript.extend(net_new_turn_input.clone()); + transcript.record_items(net_new_turn_input); + full_transcript + } else { + net_new_turn_input + }; match run_turn(&sess, sub_id.clone(), turn_input).await { Ok(turn_output) => { - if turn_output.is_empty() { + let (items, responses): (Vec<_>, Vec<_>) = turn_output + .into_iter() + .map(|p| (p.item, p.response)) + .unzip(); + let responses = responses + .into_iter() + .flatten() + .collect::>(); + + // Only attempt to take the lock if there is something to record. + if !items.is_empty() { + if let Some(transcript) = sess.state.lock().unwrap().zdr_transcript.as_mut() { + transcript.record_items(items); + } + } + + if responses.is_empty() { debug!("Turn completed"); break; } - turn_input = turn_output; + + pending_response_input = responses; } Err(e) => { info!("Turn error: {e:#}"); @@ -624,21 +671,31 @@ async fn run_task(sess: Arc, sub_id: String, input: Vec) { async fn run_turn( sess: &Session, sub_id: String, - input: Vec, -) -> CodexResult> { - let prev_id = { + input: Vec, +) -> CodexResult> { + // Decide whether to use server-side storage (previous_response_id) or disable it + let (prev_id, store, is_first_turn) = { let state = sess.state.lock().unwrap(); - state.previous_response_id.clone() + let is_first_turn = state.previous_response_id.is_none(); + if state.zdr_transcript.is_some() { + // When using ZDR, the Reponses API may send previous_response_id + // back, but trying to use it results in a 400. + (None, true, is_first_turn) + } else { + (state.previous_response_id.clone(), false, is_first_turn) + } }; - let instructions = match prev_id { - Some(_) => None, - None => sess.instructions.clone(), + let instructions = if is_first_turn { + sess.instructions.clone() + } else { + None }; let prompt = Prompt { input, prev_id, instructions, + store, }; let mut retries = 0; @@ -676,11 +733,20 @@ async fn run_turn( } } +/// When the model is prompted, it returns a stream of events. Some of these +/// events map to a `ResponseItem`. A `ResponseItem` may need to be +/// "handled" such that it produces a `ResponseInputItem` that needs to be +/// sent back to the model on the next turn. +struct ProcessedResponseItem { + item: ResponseItem, + response: Option, +} + async fn try_run_turn( sess: &Session, sub_id: &str, prompt: &Prompt, -) -> CodexResult> { +) -> CodexResult> { let mut stream = sess.client.clone().stream(prompt).await?; // Buffer all the incoming messages from the stream first, then execute them. @@ -694,9 +760,8 @@ async fn try_run_turn( for event in input { match event { ResponseEvent::OutputItemDone(item) => { - if let Some(item) = handle_response_item(sess, sub_id, item).await? { - output.push(item); - } + let response = handle_response_item(sess, sub_id, item.clone()).await?; + output.push(ProcessedResponseItem { item, response }); } ResponseEvent::Completed { response_id } => { let mut state = sess.state.lock().unwrap(); diff --git a/codex-rs/core/src/codex_wrapper.rs b/codex-rs/core/src/codex_wrapper.rs index 426b5373..8d19683f 100644 --- a/codex-rs/core/src/codex_wrapper.rs +++ b/codex-rs/core/src/codex_wrapper.rs @@ -21,6 +21,7 @@ use tracing::debug; pub async fn init_codex( approval_policy: AskForApproval, sandbox_policy: SandboxPolicy, + disable_response_storage: bool, model_override: Option, ) -> anyhow::Result<(CodexWrapper, Event, Arc)> { let ctrl_c = notify_on_sigint(); @@ -33,6 +34,7 @@ pub async fn init_codex( instructions: config.instructions, approval_policy, sandbox_policy, + disable_response_storage, }) .await?; diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 7d330915..d517e688 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -19,6 +19,7 @@ mod models; pub mod protocol; mod safety; pub mod util; +mod zdr_transcript; pub use codex::Codex; diff --git a/codex-rs/core/src/models.rs b/codex-rs/core/src/models.rs index 551ac318..2665e8c1 100644 --- a/codex-rs/core/src/models.rs +++ b/codex-rs/core/src/models.rs @@ -56,6 +56,17 @@ pub enum ResponseItem { Other, } +impl From for ResponseItem { + fn from(item: ResponseInputItem) -> Self { + match item { + ResponseInputItem::Message { role, content } => Self::Message { role, content }, + ResponseInputItem::FunctionCallOutput { call_id, output } => { + Self::FunctionCallOutput { call_id, output } + } + } + } +} + impl From> for ResponseInputItem { fn from(items: Vec) -> Self { Self::Message { diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index 42c8478e..96c4ea48 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -33,6 +33,9 @@ pub enum Op { approval_policy: AskForApproval, /// How to sandbox commands executed in the system sandbox_policy: SandboxPolicy, + /// Disable server-side response storage (send full context each request) + #[serde(default)] + disable_response_storage: bool, }, /// Abort current task. diff --git a/codex-rs/core/src/zdr_transcript.rs b/codex-rs/core/src/zdr_transcript.rs new file mode 100644 index 00000000..25fdc5a6 --- /dev/null +++ b/codex-rs/core/src/zdr_transcript.rs @@ -0,0 +1,46 @@ +use crate::models::ResponseItem; + +/// Transcript that needs to be maintained for ZDR clients for which +/// previous_response_id is not available, so we must include the transcript +/// with every API call. This must include each `function_call` and its +/// corresponding `function_call_output`. +#[derive(Debug, Clone)] +pub(crate) struct ZdrTranscript { + /// The oldest items are at the beginning of the vector. + items: Vec, +} + +impl ZdrTranscript { + pub(crate) fn new() -> Self { + Self { items: Vec::new() } + } + + /// Returns a clone of the contents in the transcript. + pub(crate) fn contents(&self) -> Vec { + self.items.clone() + } + + /// `items` is ordered from oldest to newest. + pub(crate) fn record_items(&mut self, items: I) + where + I: IntoIterator, + { + for item in items { + if is_api_message(&item) { + // Note agent-loop.ts also does filtering on some of the fields. + self.items.push(item.clone()); + } + } + } +} + +/// Anything that is not a system message or "reasoning" message is considered +/// an API message. +fn is_api_message(message: &ResponseItem) -> bool { + match message { + ResponseItem::Message { role, .. } => role.as_str() != "system", + ResponseItem::FunctionCall { .. } => true, + ResponseItem::FunctionCallOutput { .. } => true, + _ => false, + } +} diff --git a/codex-rs/core/tests/live_agent.rs b/codex-rs/core/tests/live_agent.rs index 6562654c..823cd73a 100644 --- a/codex-rs/core/tests/live_agent.rs +++ b/codex-rs/core/tests/live_agent.rs @@ -55,6 +55,7 @@ async fn spawn_codex() -> Codex { instructions: None, approval_policy: AskForApproval::OnFailure, sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted, + disable_response_storage: false, }, }) .await diff --git a/codex-rs/core/tests/previous_response_id.rs b/codex-rs/core/tests/previous_response_id.rs index 56fa9a6c..de1309e8 100644 --- a/codex-rs/core/tests/previous_response_id.rs +++ b/codex-rs/core/tests/previous_response_id.rs @@ -95,6 +95,7 @@ async fn keeps_previous_response_id_between_tasks() { instructions: None, approval_policy: AskForApproval::OnFailure, sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted, + disable_response_storage: false, }, }) .await diff --git a/codex-rs/core/tests/stream_no_completed.rs b/codex-rs/core/tests/stream_no_completed.rs index da0cfb27..c732a5fd 100644 --- a/codex-rs/core/tests/stream_no_completed.rs +++ b/codex-rs/core/tests/stream_no_completed.rs @@ -78,6 +78,7 @@ async fn retries_on_early_close() { instructions: None, approval_policy: AskForApproval::OnFailure, sandbox_policy: SandboxPolicy::NetworkAndFileWriteRestricted, + disable_response_storage: false, }, }) .await diff --git a/codex-rs/exec/src/cli.rs b/codex-rs/exec/src/cli.rs index a934aba0..299e8587 100644 --- a/codex-rs/exec/src/cli.rs +++ b/codex-rs/exec/src/cli.rs @@ -16,6 +16,10 @@ pub struct Cli { #[arg(long = "skip-git-repo-check", default_value_t = false)] pub skip_git_repo_check: bool, + /// Disable server‑side response storage (sends the full conversation context with every request) + #[arg(long = "disable-response-storage", default_value_t = false)] + pub disable_response_storage: bool, + /// Initial instructions for the agent. pub prompt: Option, } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index c22b6bd6..ab7d735e 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -31,9 +31,10 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { .try_init(); let Cli { - skip_git_repo_check, - model, images, + model, + skip_git_repo_check, + disable_response_storage, prompt, .. } = cli; @@ -50,8 +51,13 @@ pub async fn run_main(cli: Cli) -> anyhow::Result<()> { // likely come from a new --execution-policy arg. let approval_policy = AskForApproval::Never; let sandbox_policy = SandboxPolicy::NetworkAndFileWriteRestricted; - let (codex_wrapper, event, ctrl_c) = - codex_wrapper::init_codex(approval_policy, sandbox_policy, model).await?; + let (codex_wrapper, event, ctrl_c) = codex_wrapper::init_codex( + approval_policy, + sandbox_policy, + disable_response_storage, + model, + ) + .await?; let codex = Arc::new(codex_wrapper); info!("Codex initialized with event: {event:?}"); diff --git a/codex-rs/repl/src/cli.rs b/codex-rs/repl/src/cli.rs index bb83046d..4de42a76 100644 --- a/codex-rs/repl/src/cli.rs +++ b/codex-rs/repl/src/cli.rs @@ -50,6 +50,10 @@ pub struct Cli { #[arg(long, action = ArgAction::SetTrue, default_value_t = false)] pub allow_no_git_exec: bool, + /// Disable server‑side response storage (sends the full conversation context with every request) + #[arg(long = "disable-response-storage", default_value_t = false)] + pub disable_response_storage: bool, + /// Record submissions into file as JSON #[arg(short = 'S', long)] pub record_submissions: Option, diff --git a/codex-rs/repl/src/lib.rs b/codex-rs/repl/src/lib.rs index 2266718e..0f9c47e4 100644 --- a/codex-rs/repl/src/lib.rs +++ b/codex-rs/repl/src/lib.rs @@ -97,6 +97,7 @@ async fn codex_main(mut cli: Cli, cfg: Config, ctrl_c: Arc) -> anyhow::R instructions: cfg.instructions, approval_policy: cli.approval_policy.into(), sandbox_policy: cli.sandbox_policy.into(), + disable_response_storage: cli.disable_response_storage, }, }; diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 26a7074b..8f27ce6e 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -40,6 +40,7 @@ impl App<'_> { show_git_warning: bool, initial_images: Vec, model: Option, + disable_response_storage: bool, ) -> Self { let (app_event_tx, app_event_rx) = channel(); let scroll_event_helper = ScrollEventHelper::new(app_event_tx.clone()); @@ -85,6 +86,7 @@ impl App<'_> { initial_prompt.clone(), initial_images, model, + disable_response_storage, ); let app_state = if show_git_warning { diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 64cb896d..b852638c 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -52,6 +52,7 @@ impl ChatWidget<'_> { initial_prompt: Option, initial_images: Vec, model: Option, + disable_response_storage: bool, ) -> Self { let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); @@ -63,15 +64,22 @@ impl ChatWidget<'_> { let app_event_tx_clone = app_event_tx.clone(); // Create the Codex asynchronously so the UI loads as quickly as possible. tokio::spawn(async move { - let (codex, session_event, _ctrl_c) = - match init_codex(approval_policy, sandbox_policy, model).await { - Ok(vals) => vals, - Err(e) => { - // TODO(mbolin): This error needs to be surfaced to the user. - tracing::error!("failed to initialize codex: {e}"); - return; - } - }; + // Initialize session; storage enabled by default + let (codex, session_event, _ctrl_c) = match init_codex( + approval_policy, + sandbox_policy, + disable_response_storage, + model, + ) + .await + { + Ok(vals) => vals, + Err(e) => { + // TODO(mbolin): This error needs to be surfaced to the user. + tracing::error!("failed to initialize codex: {e}"); + return; + } + }; // Forward the captured `SessionInitialized` event that was consumed // inside `init_codex()` so it can be rendered in the UI. diff --git a/codex-rs/tui/src/cli.rs b/codex-rs/tui/src/cli.rs index fa764d1a..db25ad2b 100644 --- a/codex-rs/tui/src/cli.rs +++ b/codex-rs/tui/src/cli.rs @@ -31,6 +31,10 @@ pub struct Cli { #[arg(long = "skip-git-repo-check", default_value_t = false)] pub skip_git_repo_check: bool, + /// Disable server‑side response storage (sends the full conversation context with every request) + #[arg(long = "disable-response-storage", default_value_t = false)] + pub disable_response_storage: bool, + /// Convenience alias for low-friction sandboxed automatic execution (-a on-failure, -s network-and-file-write-restricted) #[arg(long = "full-auto", default_value_t = true)] pub full_auto: bool, diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 7361663b..4a063de0 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -107,6 +107,7 @@ fn run_ratatui_app( approval_policy, sandbox_policy: sandbox, model, + disable_response_storage, .. } = cli; @@ -120,6 +121,7 @@ fn run_ratatui_app( show_git_warning, images, model, + disable_response_storage, ); // Bridge log receiver into the AppEvent channel so latest log lines update the UI.