diff --git a/codex-rs/core/src/apply_patch.rs b/codex-rs/core/src/apply_patch.rs index a37b60c0..dffe94be 100644 --- a/codex-rs/core/src/apply_patch.rs +++ b/codex-rs/core/src/apply_patch.rs @@ -61,7 +61,13 @@ pub(crate) async fn apply_patch( // that similar patches can be auto-approved in the future during // this session. let rx_approve = sess - .request_patch_approval(turn_context, call_id.to_owned(), &action, None, None) + .request_patch_approval( + turn_context, + call_id.to_owned(), + convert_apply_patch_to_protocol(&action), + None, + None, + ) .await; match rx_approve.await.unwrap_or_default() { ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 1fb2230d..6200a587 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -20,6 +20,7 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::FunctionCallOutputContentItem; use codex_protocol::models::ReasoningItemContent; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::SessionSource; use eventsource_stream::Eventsource; use futures::Stream; use futures::StreamExt; @@ -41,6 +42,7 @@ pub(crate) async fn stream_chat_completions( client: &CodexHttpClient, provider: &ModelProviderInfo, otel_event_manager: &OtelEventManager, + session_source: &SessionSource, ) -> Result { if prompt.output_schema.is_some() { return Err(CodexErr::UnsupportedOperation( @@ -343,7 +345,15 @@ pub(crate) async fn stream_chat_completions( loop { attempt += 1; - let req_builder = provider.create_request_builder(client, &None).await?; + let mut req_builder = provider.create_request_builder(client, &None).await?; + + // Include session source for backend telemetry and routing. + let task_type = match serde_json::to_value(session_source) { + Ok(serde_json::Value::String(s)) => s, + Ok(other) => other.to_string(), + Err(_) => "unknown".to_string(), + }; + req_builder = req_builder.header("Codex-Task-Type", task_type); let res = otel_event_manager .log_request(attempt, || { diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index b38d6de8..5d8da794 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -13,6 +13,7 @@ use codex_protocol::ConversationId; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::SessionSource; use eventsource_stream::Eventsource; use futures::prelude::*; use regex_lite::Regex; @@ -56,7 +57,6 @@ use crate::openai_model_info::get_model_info; use crate::protocol::RateLimitSnapshot; use crate::protocol::RateLimitWindow; use crate::protocol::TokenUsage; -use crate::state::TaskKind; use crate::token_data::PlanType; use crate::tools::spec::create_tools_json_for_responses_api; use crate::util::backoff; @@ -87,8 +87,10 @@ pub struct ModelClient { conversation_id: ConversationId, effort: Option, summary: ReasoningSummaryConfig, + session_source: SessionSource, } +#[allow(clippy::too_many_arguments)] impl ModelClient { pub fn new( config: Arc, @@ -98,6 +100,7 @@ impl ModelClient { effort: Option, summary: ReasoningSummaryConfig, conversation_id: ConversationId, + session_source: SessionSource, ) -> Self { let client = create_client(); @@ -110,6 +113,7 @@ impl ModelClient { conversation_id, effort, summary, + session_source, } } @@ -127,13 +131,6 @@ impl ModelClient { }) } - /// Dispatches to either the Responses or Chat implementation depending on - /// the provider config. Public callers always invoke `stream()` – the - /// specialised helpers are private to avoid accidental misuse. - pub async fn stream(&self, prompt: &Prompt) -> Result { - self.stream_with_task_kind(prompt, TaskKind::Regular).await - } - pub fn config(&self) -> Arc { Arc::clone(&self.config) } @@ -142,13 +139,9 @@ impl ModelClient { &self.provider } - pub(crate) async fn stream_with_task_kind( - &self, - prompt: &Prompt, - task_kind: TaskKind, - ) -> Result { + pub async fn stream(&self, prompt: &Prompt) -> Result { match self.provider.wire_api { - WireApi::Responses => self.stream_responses(prompt, task_kind).await, + WireApi::Responses => self.stream_responses(prompt).await, WireApi::Chat => { // Create the raw streaming connection first. let response_stream = stream_chat_completions( @@ -157,6 +150,7 @@ impl ModelClient { &self.client, &self.provider, &self.otel_event_manager, + &self.session_source, ) .await?; @@ -189,11 +183,7 @@ impl ModelClient { } /// Implementation for the OpenAI *Responses* experimental API. - async fn stream_responses( - &self, - prompt: &Prompt, - task_kind: TaskKind, - ) -> Result { + async fn stream_responses(&self, prompt: &Prompt) -> Result { if let Some(path) = &*CODEX_RS_SSE_FIXTURE { // short circuit for tests warn!(path, "Streaming from fixture"); @@ -268,7 +258,7 @@ impl ModelClient { let max_attempts = self.provider.request_max_retries(); for attempt in 0..=max_attempts { match self - .attempt_stream_responses(attempt, &payload_json, &auth_manager, task_kind) + .attempt_stream_responses(attempt, &payload_json, &auth_manager) .await { Ok(stream) => { @@ -296,7 +286,6 @@ impl ModelClient { attempt: u64, payload_json: &Value, auth_manager: &Option>, - task_kind: TaskKind, ) -> std::result::Result { // Always fetch the latest auth in case a prior attempt refreshed the token. let auth = auth_manager.as_ref().and_then(|m| m.auth()); @@ -314,12 +303,19 @@ impl ModelClient { .await .map_err(StreamAttemptError::Fatal)?; + // Include session source for backend telemetry and routing. + let task_type = match serde_json::to_value(&self.session_source) { + Ok(serde_json::Value::String(s)) => s, + Ok(other) => other.to_string(), + Err(_) => "unknown".to_string(), + }; + req_builder = req_builder.header("Codex-Task-Type", task_type); + req_builder = req_builder // Send session_id for compatibility. .header("conversation_id", self.conversation_id.to_string()) .header("session_id", self.conversation_id.to_string()) .header(reqwest::header::ACCEPT, "text/event-stream") - .header("Codex-Task-Type", task_kind.header_value()) .json(payload_json); if let Some(auth) = auth.as_ref() @@ -462,6 +458,10 @@ impl ModelClient { self.otel_event_manager.clone() } + pub fn get_session_source(&self) -> SessionSource { + self.session_source.clone() + } + /// Returns the currently configured model slug. pub fn get_model(&self) -> String { self.config.model.clone() diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index 397b09dd..203a6302 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -23,6 +23,11 @@ use tokio::sync::mpsc; /// Review thread system prompt. Edit `core/src/review_prompt.md` to customize. pub const REVIEW_PROMPT: &str = include_str!("../review_prompt.md"); +// Centralized templates for review-related user messages +pub const REVIEW_EXIT_SUCCESS_TMPL: &str = include_str!("../templates/review/exit_success.xml"); +pub const REVIEW_EXIT_INTERRUPTED_TMPL: &str = + include_str!("../templates/review/exit_interrupted.xml"); + /// API request payload for a single model turn #[derive(Default, Debug, Clone)] pub struct Prompt { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8659eb45..8e45f250 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -6,21 +6,20 @@ use std::sync::atomic::AtomicU64; use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; +use crate::features::Feature; use crate::function_tool::FunctionCallError; use crate::mcp::auth::McpAuthStatusEntry; use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT; use crate::parse_command::parse_command; use crate::parse_turn_item; use crate::response_processing::process_items; -use crate::review_format::format_review_findings_block; use crate::terminal; use crate::user_notification::UserNotifier; use async_channel::Receiver; use async_channel::Sender; -use codex_apply_patch::ApplyPatchAction; use codex_protocol::ConversationId; use codex_protocol::items::TurnItem; -use codex_protocol::protocol::ExitedReviewModeEvent; +use codex_protocol::protocol::FileChange; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::RawResponseItemEvent; @@ -48,11 +47,9 @@ use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::error; use tracing::info; -use tracing::trace; use tracing::warn; use crate::ModelProviderInfo; -use crate::apply_patch::convert_apply_patch_to_protocol; use crate::client::ModelClient; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; @@ -87,7 +84,6 @@ use crate::protocol::ExecApprovalRequestEvent; use crate::protocol::Op; use crate::protocol::RateLimitSnapshot; use crate::protocol::ReviewDecision; -use crate::protocol::ReviewOutputEvent; use crate::protocol::SandboxCommandAssessment; use crate::protocol::SandboxPolicy; use crate::protocol::SessionConfiguredEvent; @@ -103,7 +99,6 @@ use crate::shell; use crate::state::ActiveTurn; use crate::state::SessionServices; use crate::state::SessionState; -use crate::state::TaskKind; use crate::tasks::GhostSnapshotTask; use crate::tasks::ReviewTask; use crate::tasks::SessionTask; @@ -139,9 +134,9 @@ use self::compact::collect_user_messages; /// The high-level interface to the Codex system. /// It operates as a queue pair where you send submissions and receive events. pub struct Codex { - next_id: AtomicU64, - tx_sub: Sender, - rx_event: Receiver, + pub(crate) next_id: AtomicU64, + pub(crate) tx_sub: Sender, + pub(crate) rx_event: Receiver, } /// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`], @@ -182,16 +177,18 @@ impl Codex { cwd: config.cwd.clone(), original_config_do_not_use: Arc::clone(&config), features: config.features.clone(), + session_source, }; // Generate a unique ID for the lifetime of this Codex session. + let session_source_clone = session_configuration.session_source.clone(); let session = Session::new( session_configuration, config.clone(), auth_manager.clone(), tx_event.clone(), conversation_history, - session_source, + session_source_clone, ) .await .map_err(|e| { @@ -272,7 +269,6 @@ pub(crate) struct TurnContext { pub(crate) sandbox_policy: SandboxPolicy, pub(crate) shell_environment_policy: ShellEnvironmentPolicy, pub(crate) tools_config: ToolsConfig, - pub(crate) is_review_mode: bool, pub(crate) final_output_json_schema: Option, pub(crate) codex_linux_sandbox_exe: Option, pub(crate) tool_call_gate: Arc, @@ -286,6 +282,7 @@ impl TurnContext { } } +#[allow(dead_code)] #[derive(Clone)] pub(crate) struct SessionConfiguration { /// Provider identifier ("openai", "openrouter", ...). @@ -322,6 +319,8 @@ pub(crate) struct SessionConfiguration { // TODO(pakrym): Remove config from here original_config_do_not_use: Arc, + /// Source of the session (cli, vscode, exec, mcp, ...) + session_source: SessionSource, } impl SessionConfiguration { @@ -394,6 +393,7 @@ impl Session { session_configuration.model_reasoning_effort, session_configuration.model_reasoning_summary, conversation_id, + session_configuration.session_source.clone(), ); let tools_config = ToolsConfig::new(&ToolsConfigParams { @@ -411,7 +411,6 @@ impl Session { sandbox_policy: session_configuration.sandbox_policy.clone(), shell_environment_policy: config.shell_environment_policy.clone(), tools_config, - is_review_mode: false, final_output_json_schema: None, codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), tool_call_gate: Arc::new(ReadinessFlag::new()), @@ -816,6 +815,7 @@ impl Session { auth_manager, &otel, self.conversation_id, + turn_context.client.get_session_source(), call_id, command, &turn_context.sandbox_policy, @@ -874,7 +874,7 @@ impl Session { &self, turn_context: &TurnContext, call_id: String, - action: &ApplyPatchAction, + changes: HashMap, reason: Option, grant_root: Option, ) -> oneshot::Receiver { @@ -898,7 +898,7 @@ impl Session { let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, - changes: convert_apply_patch_to_protocol(action), + changes, reason, grant_root, }); @@ -1120,18 +1120,16 @@ impl Session { turn_context: Arc, cancellation_token: CancellationToken, ) { - if turn_context.is_review_mode - || !self - .state - .lock() - .await - .session_configuration - .features - .enabled(Feature::GhostCommit) + if !self + .state + .lock() + .await + .session_configuration + .features + .enabled(Feature::GhostCommit) { return; } - let token = match turn_context.tool_call_gate.subscribe().await { Ok(token) => token, Err(err) => { @@ -1633,9 +1631,10 @@ async fn spawn_review_thread( .unwrap_or_else(|| parent_turn_context.client.get_model_family()); // For reviews, disable web_search and view_image regardless of global settings. let mut review_features = config.features.clone(); - review_features.disable(crate::features::Feature::WebSearchRequest); - review_features.disable(crate::features::Feature::ViewImageTool); - review_features.disable(crate::features::Feature::StreamableShell); + review_features + .disable(crate::features::Feature::WebSearchRequest) + .disable(crate::features::Feature::ViewImageTool) + .disable(crate::features::Feature::StreamableShell); let tools_config = ToolsConfig::new(&ToolsConfigParams { model_family: &review_model_family, features: &review_features, @@ -1674,6 +1673,7 @@ async fn spawn_review_thread( per_turn_config.model_reasoning_effort, per_turn_config.model_reasoning_summary, sess.conversation_id, + parent_turn_context.client.get_session_source(), ); let review_turn_context = TurnContext { @@ -1686,7 +1686,6 @@ async fn spawn_review_thread( sandbox_policy: parent_turn_context.sandbox_policy.clone(), shell_environment_policy: parent_turn_context.shell_environment_policy.clone(), cwd: parent_turn_context.cwd.clone(), - is_review_mode: true, final_output_json_schema: None, codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(), tool_call_gate: Arc::new(ReadinessFlag::new()), @@ -1718,14 +1717,10 @@ async fn spawn_review_thread( /// - If the model sends only an assistant message, we record it in the /// conversation history and consider the task complete. /// -/// Review mode: when `turn_context.is_review_mode` is true, the turn runs in an -/// isolated in-memory thread without the parent session's prior history or -/// user_instructions. Emits ExitedReviewMode upon final review message. pub(crate) async fn run_task( sess: Arc, turn_context: Arc, input: Vec, - task_kind: TaskKind, cancellation_token: CancellationToken, ) -> Option { if input.is_empty() { @@ -1737,21 +1732,8 @@ pub(crate) async fn run_task( sess.send_event(&turn_context, event).await; let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); - // For review threads, keep an isolated in-memory history so the - // model sees a fresh conversation without the parent session's history. - // For normal turns, continue recording to the session history as before. - let is_review_mode = turn_context.is_review_mode; - - let mut review_thread_history: ConversationHistory = ConversationHistory::new(); - if is_review_mode { - // Seed review threads with environment context so the model knows the working directory. - review_thread_history - .record_items(sess.build_initial_context(turn_context.as_ref()).iter()); - review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into())); - } else { - sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) - .await; - } + sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) + .await; sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token()) .await; @@ -1773,21 +1755,7 @@ pub(crate) async fn run_task( .collect::>(); // Construct the input that we will send to the model. - // - // - For review threads, use the isolated in-memory history so the - // model sees a fresh conversation (no parent history/user_instructions). - // - // - For normal turns, use the session's full history. When using the - // chat completions API (or ZDR clients), the model needs the full - // conversation history on each turn. The rollout file, however, should - // only record the new items that originated in this turn so that it - // represents an append-only log without duplicates. - let turn_input: Vec = if is_review_mode { - if !pending_input.is_empty() { - review_thread_history.record_items(&pending_input); - } - review_thread_history.get_history_for_prompt() - } else { + let turn_input: Vec = { sess.record_conversation_items(&turn_context, &pending_input) .await; sess.clone_history().await.get_history_for_prompt() @@ -1811,7 +1779,6 @@ pub(crate) async fn run_task( Arc::clone(&turn_context), Arc::clone(&turn_diff_tracker), turn_input, - task_kind, cancellation_token.child_token(), ) .await @@ -1831,14 +1798,8 @@ pub(crate) async fn run_task( let token_limit_reached = total_usage_tokens .map(|tokens| tokens >= limit) .unwrap_or(false); - let (responses, items_to_record_in_conversation_history) = process_items( - processed_items, - is_review_mode, - &mut review_thread_history, - &sess, - &turn_context, - ) - .await; + let (responses, items_to_record_in_conversation_history) = + process_items(processed_items, &sess, &turn_context).await; if token_limit_reached { if auto_compact_recently_attempted { @@ -1880,14 +1841,7 @@ pub(crate) async fn run_task( Err(CodexErr::TurnAborted { dangling_artifacts: processed_items, }) => { - let _ = process_items( - processed_items, - is_review_mode, - &mut review_thread_history, - &sess, - &turn_context, - ) - .await; + let _ = process_items(processed_items, &sess, &turn_context).await; // Aborted turn is reported via a different event. break; } @@ -1903,56 +1857,14 @@ pub(crate) async fn run_task( } } - // If this was a review thread and we have a final assistant message, - // try to parse it as a ReviewOutput. - // - // If parsing fails, construct a minimal ReviewOutputEvent using the plain - // text as the overall explanation. Else, just exit review mode with None. - // - // Emits an ExitedReviewMode event with the parsed review output. - if turn_context.is_review_mode { - exit_review_mode( - sess.clone(), - Arc::clone(&turn_context), - last_agent_message.as_deref().map(parse_review_output_event), - ) - .await; - } - last_agent_message } -/// Parse the review output; when not valid JSON, build a structured -/// fallback that carries the plain text as the overall explanation. -/// -/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text. -fn parse_review_output_event(text: &str) -> ReviewOutputEvent { - // Try direct parse first - if let Ok(ev) = serde_json::from_str::(text) { - return ev; - } - // If wrapped in markdown fences or extra prose, attempt to extract the first JSON object - if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}')) - && start < end - && let Some(slice) = text.get(start..=end) - && let Ok(ev) = serde_json::from_str::(slice) - { - return ev; - } - // Not JSON – return a structured ReviewOutputEvent that carries - // the plain text as the overall explanation. - ReviewOutputEvent { - overall_explanation: text.to_string(), - ..Default::default() - } -} - async fn run_turn( sess: Arc, turn_context: Arc, turn_diff_tracker: SharedTurnDiffTracker, input: Vec, - task_kind: TaskKind, cancellation_token: CancellationToken, ) -> CodexResult { let mcp_tools = sess.services.mcp_connection_manager.list_all_tools(); @@ -1982,7 +1894,6 @@ async fn run_turn( Arc::clone(&turn_context), Arc::clone(&turn_diff_tracker), &prompt, - task_kind, cancellation_token.child_token(), ) .await @@ -2065,7 +1976,6 @@ async fn try_run_turn( turn_context: Arc, turn_diff_tracker: SharedTurnDiffTracker, prompt: &Prompt, - task_kind: TaskKind, cancellation_token: CancellationToken, ) -> CodexResult { let rollout_item = RolloutItem::TurnContext(TurnContextItem { @@ -2081,7 +1991,7 @@ async fn try_run_turn( let mut stream = turn_context .client .clone() - .stream_with_task_kind(prompt, task_kind) + .stream(prompt) .or_cancel(&cancellation_token) .await??; @@ -2231,14 +2141,8 @@ async fn try_run_turn( return Ok(result); } ResponseEvent::OutputTextDelta(delta) => { - // In review child threads, suppress assistant text deltas; the - // UI will show a selection popup from the final ReviewOutput. - if !turn_context.is_review_mode { - let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }); - sess.send_event(&turn_context, event).await; - } else { - trace!("suppressing OutputTextDelta in review mode"); - } + let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }); + sess.send_event(&turn_context, event).await; } ResponseEvent::ReasoningSummaryDelta(delta) => { let event = EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }); @@ -2273,13 +2177,7 @@ async fn handle_non_tool_response_item( ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } => { - let turn_item = match &item { - ResponseItem::Message { .. } if turn_context.is_review_mode => { - trace!("suppressing assistant Message in review mode"); - None - } - _ => parse_turn_item(&item), - }; + let turn_item = parse_turn_item(&item); if let Some(turn_item) = turn_item { sess.emit_turn_item_started_completed( turn_context.as_ref(), @@ -2317,62 +2215,6 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) - } }) } -/// Emits an ExitedReviewMode Event with optional ReviewOutput, -/// and records a developer message with the review output. -pub(crate) async fn exit_review_mode( - session: Arc, - turn_context: Arc, - review_output: Option, -) { - let event = EventMsg::ExitedReviewMode(ExitedReviewModeEvent { - review_output: review_output.clone(), - }); - session.send_event(turn_context.as_ref(), event).await; - - let mut user_message = String::new(); - if let Some(out) = review_output { - let mut findings_str = String::new(); - let text = out.overall_explanation.trim(); - if !text.is_empty() { - findings_str.push_str(text); - } - if !out.findings.is_empty() { - let block = format_review_findings_block(&out.findings, None); - findings_str.push_str(&format!("\n{block}")); - } - user_message.push_str(&format!( - r#" - User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve. - review - - {findings_str} - - -"#)); - } else { - user_message.push_str(r#" - User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete. - review - - None. - - -"#); - } - - session - .record_conversation_items( - &turn_context, - &[ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { text: user_message }], - }], - ) - .await; - // Make the recorded review note visible immediately for readers. - session.flush_rollout().await; -} fn mcp_init_error_display( server_name: &str, @@ -2427,7 +2269,6 @@ fn is_mcp_client_startup_timeout_error(error: &anyhow::Error) -> bool { || error_message.contains("timed out handshaking with MCP server") } -use crate::features::Feature; use crate::features::Features; #[cfg(test)] pub(crate) use tests::make_session_and_context; @@ -2665,6 +2506,7 @@ mod tests { cwd: config.cwd.clone(), original_config_do_not_use: Arc::clone(&config), features: Features::default(), + session_source: SessionSource::Exec, }; let state = SessionState::new(session_configuration.clone()); @@ -2738,6 +2580,7 @@ mod tests { cwd: config.cwd.clone(), original_config_do_not_use: Arc::clone(&config), features: Features::default(), + session_source: SessionSource::Exec, }; let state = SessionState::new(session_configuration.clone()); @@ -2802,12 +2645,6 @@ mod tests { sleep(Duration::from_secs(60)).await; } } - - async fn abort(&self, session: Arc, ctx: Arc) { - if let TaskKind::Review = self.kind { - exit_review_mode(session.clone_session(), ctx, None).await; - } - } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -2872,25 +2709,25 @@ mod tests { let input = vec![UserInput::Text { text: "start review".to_string(), }]; - sess.spawn_task( - Arc::clone(&tc), - input, - NeverEndingTask { - kind: TaskKind::Review, - listen_to_cancellation_token: false, - }, - ) - .await; + sess.spawn_task(Arc::clone(&tc), input, ReviewTask).await; sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) - .await - .expect("timeout waiting for first event") - .expect("first event"); - match first.msg { - EventMsg::ExitedReviewMode(ev) => assert!(ev.review_output.is_none()), - other => panic!("unexpected first event: {other:?}"), + // Drain events until we observe ExitedReviewMode; earlier + // RawResponseItem entries (e.g., environment context) may arrive first. + loop { + let evt = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()) + .await + .expect("timeout waiting for first event") + .expect("first event"); + match evt.msg { + EventMsg::ExitedReviewMode(ev) => { + assert!(ev.review_output.is_none()); + break; + } + // Ignore any non-critical events before exit. + _ => continue, + } } loop { let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index a8340cd0..6b07e245 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -13,7 +13,6 @@ use crate::protocol::ErrorEvent; use crate::protocol::EventMsg; use crate::protocol::TaskStartedEvent; use crate::protocol::TurnContextItem; -use crate::state::TaskKind; use crate::truncate::truncate_middle; use crate::util::backoff; use askama::Template; @@ -255,11 +254,7 @@ async fn drain_to_completed( turn_context: &TurnContext, prompt: &Prompt, ) -> CodexResult<()> { - let mut stream = turn_context - .client - .clone() - .stream_with_task_kind(prompt, TaskKind::Compact) - .await?; + let mut stream = turn_context.client.clone().stream(prompt).await?; loop { let maybe_event = stream.next().await; let Some(event) = maybe_event else { diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs new file mode 100644 index 00000000..b123a8a3 --- /dev/null +++ b/codex-rs/core/src/codex_delegate.rs @@ -0,0 +1,292 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicU64; + +use async_channel::Receiver; +use async_channel::Sender; +use codex_async_utils::OrCancelExt; +use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ExecApprovalRequestEvent; +use codex_protocol::protocol::Op; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::Submission; +use codex_protocol::user_input::UserInput; +use tokio_util::sync::CancellationToken; + +use crate::AuthManager; +use crate::codex::Codex; +use crate::codex::CodexSpawnOk; +use crate::codex::SUBMISSION_CHANNEL_CAPACITY; +use crate::codex::Session; +use crate::codex::TurnContext; +use crate::config::Config; +use crate::error::CodexErr; +use codex_protocol::protocol::InitialHistory; + +/// Start an interactive sub-Codex conversation and return IO channels. +/// +/// The returned `events_rx` yields non-approval events emitted by the sub-agent. +/// Approval requests are handled via `parent_session` and are not surfaced. +/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent. +pub(crate) async fn run_codex_conversation_interactive( + config: Config, + auth_manager: Arc, + parent_session: Arc, + parent_ctx: Arc, + cancel_token: CancellationToken, +) -> Result { + let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); + let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); + + let CodexSpawnOk { codex, .. } = Codex::spawn( + config, + auth_manager, + InitialHistory::New, + SessionSource::SubAgent(SubAgentSource::Review), + ) + .await?; + let codex = Arc::new(codex); + + // Use a child token so parent cancel cascades but we can scope it to this task + let cancel_token_events = cancel_token.child_token(); + let cancel_token_ops = cancel_token.child_token(); + + // Forward events from the sub-agent to the consumer, filtering approvals and + // routing them to the parent session for decisions. + let parent_session_clone = Arc::clone(&parent_session); + let parent_ctx_clone = Arc::clone(&parent_ctx); + let codex_for_events = Arc::clone(&codex); + tokio::spawn(async move { + let _ = forward_events( + codex_for_events, + tx_sub, + parent_session_clone, + parent_ctx_clone, + cancel_token_events.clone(), + ) + .or_cancel(&cancel_token_events) + .await; + }); + + // Forward ops from the caller to the sub-agent. + let codex_for_ops = Arc::clone(&codex); + tokio::spawn(async move { + forward_ops(codex_for_ops, rx_ops, cancel_token_ops).await; + }); + + Ok(Codex { + next_id: AtomicU64::new(0), + tx_sub: tx_ops, + rx_event: rx_sub, + }) +} + +/// Convenience wrapper for one-time use with an initial prompt. +/// +/// Internally calls the interactive variant, then immediately submits the provided input. +pub(crate) async fn run_codex_conversation_one_shot( + config: Config, + auth_manager: Arc, + input: Vec, + parent_session: Arc, + parent_ctx: Arc, + cancel_token: CancellationToken, +) -> Result { + // Use a child token so we can stop the delegate after completion without + // requiring the caller to cancel the parent token. + let child_cancel = cancel_token.child_token(); + let io = run_codex_conversation_interactive( + config, + auth_manager, + parent_session, + parent_ctx, + child_cancel.clone(), + ) + .await?; + + // Send the initial input to kick off the one-shot turn. + io.submit(Op::UserInput { items: input }).await?; + + // Bridge events so we can observe completion and shut down automatically. + let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); + let ops_tx = io.tx_sub.clone(); + let io_for_bridge = io; + tokio::spawn(async move { + while let Ok(event) = io_for_bridge.next_event().await { + let should_shutdown = matches!( + event.msg, + EventMsg::TaskComplete(_) | EventMsg::TurnAborted(_) + ); + let _ = tx_bridge.send(event).await; + if should_shutdown { + let _ = ops_tx + .send(Submission { + id: "shutdown".to_string(), + op: Op::Shutdown {}, + }) + .await; + child_cancel.cancel(); + break; + } + } + }); + + // For one-shot usage, return a closed `tx_sub` so callers cannot submit + // additional ops after the initial request. Create a channel and drop the + // receiver to close it immediately. + let (tx_closed, rx_closed) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); + drop(rx_closed); + + Ok(Codex { + next_id: AtomicU64::new(0), + rx_event: rx_bridge, + tx_sub: tx_closed, + }) +} + +async fn forward_events( + codex: Arc, + tx_sub: Sender, + parent_session: Arc, + parent_ctx: Arc, + cancel_token: CancellationToken, +) { + while let Ok(event) = codex.next_event().await { + match event { + Event { + id: _, + msg: EventMsg::SessionConfigured(_), + } => continue, + Event { + id, + msg: EventMsg::ExecApprovalRequest(event), + } => { + // Initiate approval via parent session; do not surface to consumer. + handle_exec_approval( + &codex, + id, + &parent_session, + &parent_ctx, + event, + &cancel_token, + ) + .await; + } + Event { + id, + msg: EventMsg::ApplyPatchApprovalRequest(event), + } => { + handle_patch_approval( + &codex, + id, + &parent_session, + &parent_ctx, + event, + &cancel_token, + ) + .await; + } + other => { + let _ = tx_sub.send(other).await; + } + } + } +} + +/// Forward ops from a caller to a sub-agent, respecting cancellation. +async fn forward_ops( + codex: Arc, + rx_ops: Receiver, + cancel_token_ops: CancellationToken, +) { + loop { + let op: Op = match rx_ops.recv().or_cancel(&cancel_token_ops).await { + Ok(Ok(Submission { id: _, op })) => op, + Ok(Err(_)) | Err(_) => break, + }; + let _ = codex.submit(op).await; + } +} + +/// Handle an ExecApprovalRequest by consulting the parent session and replying. +async fn handle_exec_approval( + codex: &Codex, + id: String, + parent_session: &Session, + parent_ctx: &TurnContext, + event: ExecApprovalRequestEvent, + cancel_token: &CancellationToken, +) { + // Race approval with cancellation and timeout to avoid hangs. + let approval_fut = parent_session.request_command_approval( + parent_ctx, + parent_ctx.sub_id.clone(), + event.command, + event.cwd, + event.reason, + event.risk, + ); + let decision = await_approval_with_cancel( + approval_fut, + parent_session, + &parent_ctx.sub_id, + cancel_token, + ) + .await; + + let _ = codex.submit(Op::ExecApproval { id, decision }).await; +} + +/// Handle an ApplyPatchApprovalRequest by consulting the parent session and replying. +async fn handle_patch_approval( + codex: &Codex, + id: String, + parent_session: &Session, + parent_ctx: &TurnContext, + event: ApplyPatchApprovalRequestEvent, + cancel_token: &CancellationToken, +) { + let decision_rx = parent_session + .request_patch_approval( + parent_ctx, + parent_ctx.sub_id.clone(), + event.changes, + event.reason, + event.grant_root, + ) + .await; + let decision = await_approval_with_cancel( + async move { decision_rx.await.unwrap_or_default() }, + parent_session, + &parent_ctx.sub_id, + cancel_token, + ) + .await; + let _ = codex.submit(Op::PatchApproval { id, decision }).await; +} + +/// Await an approval decision, aborting on cancellation. +async fn await_approval_with_cancel( + fut: F, + parent_session: &Session, + sub_id: &str, + cancel_token: &CancellationToken, +) -> codex_protocol::protocol::ReviewDecision +where + F: core::future::Future, +{ + tokio::select! { + biased; + _ = cancel_token.cancelled() => { + parent_session + .notify_approval(sub_id, codex_protocol::protocol::ReviewDecision::Abort) + .await; + codex_protocol::protocol::ReviewDecision::Abort + } + decision = fut => { + decision + } + } +} diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index f0aa8b06..8ffefd56 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -73,7 +73,7 @@ impl ConversationManager { config, auth_manager, InitialHistory::New, - self.session_source, + self.session_source.clone(), ) .await?; self.finalize_spawn(codex, conversation_id).await @@ -145,7 +145,13 @@ impl ConversationManager { let CodexSpawnOk { codex, conversation_id, - } = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?; + } = Codex::spawn( + config, + auth_manager, + initial_history, + self.session_source.clone(), + ) + .await?; self.finalize_spawn(codex, conversation_id).await } @@ -179,7 +185,7 @@ impl ConversationManager { let CodexSpawnOk { codex, conversation_id, - } = Codex::spawn(config, auth_manager, history, self.session_source).await?; + } = Codex::spawn(config, auth_manager, history, self.session_source.clone()).await?; self.finalize_spawn(codex, conversation_id).await } diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 77c72a68..24c00aa2 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -122,8 +122,9 @@ impl Features { self.enabled.insert(f); } - pub fn disable(&mut self, f: Feature) { + pub fn disable(&mut self, f: Feature) -> &mut Self { self.enabled.remove(&f); + self } pub fn record_legacy_usage_force(&mut self, alias: &str, feature: Feature) { diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index f7d86c41..f853ea87 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -14,6 +14,7 @@ mod client_common; pub mod codex; mod codex_conversation; pub use codex_conversation::CodexConversation; +mod codex_delegate; mod command_safety; pub mod config; pub mod config_edit; diff --git a/codex-rs/core/src/response_processing.rs b/codex-rs/core/src/response_processing.rs index 5c30c112..d1767b74 100644 --- a/codex-rs/core/src/response_processing.rs +++ b/codex-rs/core/src/response_processing.rs @@ -1,6 +1,5 @@ use crate::codex::Session; use crate::codex::TurnContext; -use crate::conversation_history::ConversationHistory; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; @@ -11,8 +10,6 @@ use tracing::warn; /// - `ResponseInputItem`s to send back to the model on the next turn. pub(crate) async fn process_items( processed_items: Vec, - is_review_mode: bool, - review_thread_history: &mut ConversationHistory, sess: &Session, turn_context: &TurnContext, ) -> (Vec, Vec) { @@ -100,12 +97,8 @@ pub(crate) async fn process_items( // Only attempt to take the lock if there is something to record. if !items_to_record_in_conversation_history.is_empty() { - if is_review_mode { - review_thread_history.record_items(items_to_record_in_conversation_history.iter()); - } else { - sess.record_conversation_items(turn_context, &items_to_record_in_conversation_history) - .await; - } + sess.record_conversation_items(turn_context, &items_to_record_in_conversation_history) + .await; } (responses, items_to_record_in_conversation_history) } diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index 13663beb..d1bc82e0 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -409,7 +409,7 @@ async fn read_head_and_tail( match rollout_line.item { RolloutItem::SessionMeta(session_meta_line) => { - summary.source = Some(session_meta_line.meta.source); + summary.source = Some(session_meta_line.meta.source.clone()); summary.model_provider = session_meta_line.meta.model_provider.clone(); summary.created_at = summary .created_at diff --git a/codex-rs/core/src/sandboxing/assessment.rs b/codex-rs/core/src/sandboxing/assessment.rs index f02a90b4..c7310c1f 100644 --- a/codex-rs/core/src/sandboxing/assessment.rs +++ b/codex-rs/core/src/sandboxing/assessment.rs @@ -17,6 +17,7 @@ use codex_protocol::ConversationId; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::SandboxCommandAssessment; +use codex_protocol::protocol::SessionSource; use futures::StreamExt; use serde_json::json; use tokio::time::timeout; @@ -53,6 +54,7 @@ pub(crate) async fn assess_command( auth_manager: Arc, parent_otel: &OtelEventManager, conversation_id: ConversationId, + session_source: SessionSource, call_id: &str, command: &[String], sandbox_policy: &SandboxPolicy, @@ -141,6 +143,7 @@ pub(crate) async fn assess_command( config.model_reasoning_effort, config.model_reasoning_summary, conversation_id, + session_source, ); let start = Instant::now(); diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index e2ed6f30..e2fff055 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -37,16 +37,6 @@ pub(crate) enum TaskKind { Compact, } -impl TaskKind { - pub(crate) fn header_value(self) -> &'static str { - match self { - TaskKind::Regular => "standard", - TaskKind::Review => "review", - TaskKind::Compact => "compact", - } - } -} - #[derive(Clone)] pub(crate) struct RunningTask { pub(crate) done: Arc, @@ -123,15 +113,3 @@ impl ActiveTurn { ts.clear_pending(); } } - -#[cfg(test)] -mod tests { - use super::TaskKind; - - #[test] - fn header_value_matches_expected_labels() { - assert_eq!(TaskKind::Regular.header_value(), "standard"); - assert_eq!(TaskKind::Review.header_value(), "review"); - assert_eq!(TaskKind::Compact.header_value(), "compact"); - } -} diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index c81fa2c0..9bda02c3 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -16,6 +16,7 @@ use tokio_util::task::AbortOnDropHandle; use tracing::trace; use tracing::warn; +use crate::AuthManager; use crate::codex::Session; use crate::codex::TurnContext; use crate::protocol::EventMsg; @@ -50,6 +51,10 @@ impl SessionTaskContext { pub(crate) fn clone_session(&self) -> Arc { Arc::clone(&self.session) } + + pub(crate) fn auth_manager(&self) -> Arc { + Arc::clone(&self.session.services.auth_manager) + } } /// Async task that drives a [`Session`] turn. @@ -123,7 +128,7 @@ impl Session { task_cancellation_token.child_token(), ) .await; - + session_ctx.clone_session().flush_rollout().await; if !task_cancellation_token.is_cancelled() { // Emit completion uniformly from spawn site so all tasks share the same lifecycle. let sess = session_ctx.clone_session(); diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 58ecedd4..416dba3f 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -28,6 +28,6 @@ impl SessionTask for RegularTask { cancellation_token: CancellationToken, ) -> Option { let sess = session.clone_session(); - run_task(sess, ctx, input, TaskKind::Regular, cancellation_token).await + run_task(sess, ctx, input, cancellation_token).await } } diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index fbf553b3..ba622100 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -1,11 +1,18 @@ use std::sync::Arc; use async_trait::async_trait; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ExitedReviewModeEvent; +use codex_protocol::protocol::ReviewOutputEvent; use tokio_util::sync::CancellationToken; +use crate::codex::Session; use crate::codex::TurnContext; -use crate::codex::exit_review_mode; -use crate::codex::run_task; +use crate::codex_delegate::run_codex_conversation_one_shot; +use crate::review_format::format_review_findings_block; use crate::state::TaskKind; use codex_protocol::user_input::UserInput; @@ -28,11 +35,162 @@ impl SessionTask for ReviewTask { input: Vec, cancellation_token: CancellationToken, ) -> Option { - let sess = session.clone_session(); - run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await + // Start sub-codex conversation and get the receiver for events. + let output = match start_review_conversation( + session.clone(), + ctx.clone(), + input, + cancellation_token.clone(), + ) + .await + { + Some(receiver) => process_review_events(session.clone(), ctx.clone(), receiver).await, + None => None, + }; + if !cancellation_token.is_cancelled() { + exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await; + } + None } async fn abort(&self, session: Arc, ctx: Arc) { - exit_review_mode(session.clone_session(), ctx, None).await; + exit_review_mode(session.clone_session(), None, ctx).await; } } + +async fn start_review_conversation( + session: Arc, + ctx: Arc, + input: Vec, + cancellation_token: CancellationToken, +) -> Option> { + let config = ctx.client.config(); + let mut sub_agent_config = config.as_ref().clone(); + // Run with only reviewer rubric — drop outer user_instructions + sub_agent_config.user_instructions = None; + // Avoid loading project docs; reviewer only needs findings + sub_agent_config.project_doc_max_bytes = 0; + // Carry over review-only feature restrictions so the delegate cannot + // re-enable blocked tools (web search, view image, streamable shell). + sub_agent_config + .features + .disable(crate::features::Feature::WebSearchRequest) + .disable(crate::features::Feature::ViewImageTool) + .disable(crate::features::Feature::StreamableShell); + // Set explicit review rubric for the sub-agent + sub_agent_config.base_instructions = Some(crate::REVIEW_PROMPT.to_string()); + (run_codex_conversation_one_shot( + sub_agent_config, + session.auth_manager(), + input, + session.clone_session(), + ctx.clone(), + cancellation_token, + ) + .await) + .ok() + .map(|io| io.rx_event) +} + +async fn process_review_events( + session: Arc, + ctx: Arc, + receiver: async_channel::Receiver, +) -> Option { + let mut prev_agent_message: Option = None; + while let Ok(event) = receiver.recv().await { + match event.clone().msg { + EventMsg::AgentMessage(_) => { + if let Some(prev) = prev_agent_message.take() { + session + .clone_session() + .send_event(ctx.as_ref(), prev.msg) + .await; + } + prev_agent_message = Some(event); + } + EventMsg::TaskComplete(task_complete) => { + // Parse review output from the last agent message (if present). + let out = task_complete + .last_agent_message + .as_deref() + .map(parse_review_output_event); + return out; + } + EventMsg::TurnAborted(_) => { + // Cancellation or abort: consumer will finalize with None. + return None; + } + other => { + session + .clone_session() + .send_event(ctx.as_ref(), other) + .await; + } + } + } + // Channel closed without TaskComplete: treat as interrupted. + None +} + +/// Parse a ReviewOutputEvent from a text blob returned by the reviewer model. +/// If the text is valid JSON matching ReviewOutputEvent, deserialize it. +/// Otherwise, attempt to extract the first JSON object substring and parse it. +/// If parsing still fails, return a structured fallback carrying the plain text +/// in `overall_explanation`. +fn parse_review_output_event(text: &str) -> ReviewOutputEvent { + if let Ok(ev) = serde_json::from_str::(text) { + return ev; + } + if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}')) + && start < end + && let Some(slice) = text.get(start..=end) + && let Ok(ev) = serde_json::from_str::(slice) + { + return ev; + } + ReviewOutputEvent { + overall_explanation: text.to_string(), + ..Default::default() + } +} + +/// Emits an ExitedReviewMode Event with optional ReviewOutput, +/// and records a developer message with the review output. +pub(crate) async fn exit_review_mode( + session: Arc, + review_output: Option, + ctx: Arc, +) { + let user_message = if let Some(out) = review_output.clone() { + let mut findings_str = String::new(); + let text = out.overall_explanation.trim(); + if !text.is_empty() { + findings_str.push_str(text); + } + if !out.findings.is_empty() { + let block = format_review_findings_block(&out.findings, None); + findings_str.push_str(&format!("\n{block}")); + } + crate::client_common::REVIEW_EXIT_SUCCESS_TMPL.replace("{results}", &findings_str) + } else { + crate::client_common::REVIEW_EXIT_INTERRUPTED_TMPL.to_string() + }; + + session + .record_conversation_items( + &ctx, + &[ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { text: user_message }], + }], + ) + .await; + session + .send_event( + ctx.as_ref(), + EventMsg::ExitedReviewMode(ExitedReviewModeEvent { review_output }), + ) + .await; +} diff --git a/codex-rs/core/templates/review/exit_interrupted.xml b/codex-rs/core/templates/review/exit_interrupted.xml new file mode 100644 index 00000000..1c7bd730 --- /dev/null +++ b/codex-rs/core/templates/review/exit_interrupted.xml @@ -0,0 +1,8 @@ + + User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete. + review + + None. + + + diff --git a/codex-rs/core/templates/review/exit_success.xml b/codex-rs/core/templates/review/exit_success.xml new file mode 100644 index 00000000..f8e59f4b --- /dev/null +++ b/codex-rs/core/templates/review/exit_success.xml @@ -0,0 +1,8 @@ + + User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve. + review + + {results} + + + diff --git a/codex-rs/core/templates/review/history_message_completed.md b/codex-rs/core/templates/review/history_message_completed.md new file mode 100644 index 00000000..1f8354f5 --- /dev/null +++ b/codex-rs/core/templates/review/history_message_completed.md @@ -0,0 +1,8 @@ + + User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve. + review + + {findings} + + + diff --git a/codex-rs/core/templates/review/history_message_interrupted.md b/codex-rs/core/templates/review/history_message_interrupted.md new file mode 100644 index 00000000..1c7bd730 --- /dev/null +++ b/codex-rs/core/templates/review/history_message_interrupted.md @@ -0,0 +1,8 @@ + + User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete. + review + + None. + + + diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index a1e14bcd..cadf6be2 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -94,6 +94,7 @@ async fn run_request(input: Vec) -> Value { effort, summary, conversation_id, + codex_protocol::protocol::SessionSource::Exec, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index 33af9bcc..095b08c1 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -94,6 +94,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec { effort, summary, conversation_id, + codex_protocol::protocol::SessionSource::Exec, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index f5887ebc..66df0aef 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -10,6 +10,7 @@ use codex_core::ResponseItem; use codex_core::WireApi; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; +use codex_protocol::protocol::SessionSource; use core_test_support::load_default_config_for_test; use core_test_support::responses; use futures::StreamExt; @@ -26,12 +27,9 @@ async fn responses_stream_includes_task_type_header() { responses::ev_completed("resp-1"), ]); - let request_recorder = responses::mount_sse_once_match( - &server, - header("Codex-Task-Type", "standard"), - response_body, - ) - .await; + let request_recorder = + responses::mount_sse_once_match(&server, header("Codex-Task-Type", "exec"), response_body) + .await; let provider = ModelProviderInfo { name: "mock".into(), @@ -78,6 +76,7 @@ async fn responses_stream_includes_task_type_header() { effort, summary, conversation_id, + SessionSource::Exec, ); let mut prompt = Prompt::default(); @@ -97,8 +96,5 @@ async fn responses_stream_includes_task_type_header() { } let request = request_recorder.single_request(); - assert_eq!( - request.header("Codex-Task-Type").as_deref(), - Some("standard") - ); + assert_eq!(request.header("Codex-Task-Type").as_deref(), Some("exec")); } diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 02e9cdc3..75b1ca3c 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -675,6 +675,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { effort, summary, conversation_id, + codex_protocol::protocol::SessionSource::Exec, ); let mut prompt = Prompt::default(); diff --git a/codex-rs/core/tests/suite/codex_delegate.rs b/codex-rs/core/tests/suite/codex_delegate.rs new file mode 100644 index 00000000..82a7bd9a --- /dev/null +++ b/codex-rs/core/tests/suite/codex_delegate.rs @@ -0,0 +1,173 @@ +use codex_core::protocol::AskForApproval; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::ReviewDecision; +use codex_core::protocol::ReviewRequest; +use codex_core::protocol::SandboxPolicy; +use core_test_support::responses::ev_apply_patch_function_call; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; + +/// Delegate should surface ExecApprovalRequest from sub-agent and proceed +/// after parent submits an approval decision. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() { + skip_if_no_network!(); + + // Sub-agent turn 1: emit a shell function_call requiring approval, then complete. + let call_id = "call-exec-1"; + let args = serde_json::json!({ + "command": ["bash", "-lc", "rm -rf delegated"], + "timeout_ms": 1000, + "with_escalated_permissions": true, + }) + .to_string(); + let sse1 = sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "shell", &args), + ev_completed("resp-1"), + ]); + + // Sub-agent turn 2: return structured review output and complete. + let review_json = serde_json::json!({ + "findings": [], + "overall_correctness": "ok", + "overall_explanation": "delegate approved exec", + "overall_confidence_score": 0.5 + }) + .to_string(); + let sse2 = sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-1", &review_json), + ev_completed("resp-2"), + ]); + + let server = start_mock_server().await; + mount_sse_sequence(&server, vec![sse1, sse2]).await; + + // Build a conversation configured to require approvals so the delegate + // routes ExecApprovalRequest via the parent. + let mut builder = test_codex().with_config(|config| { + config.approval_policy = AskForApproval::OnRequest; + config.sandbox_policy = SandboxPolicy::ReadOnly; + }); + let test = builder.build(&server).await.expect("build test codex"); + + // Kick off review (sub-agent starts internally). + test.codex + .submit(Op::Review { + review_request: ReviewRequest { + prompt: "Please review".to_string(), + user_facing_hint: "review".to_string(), + }, + }) + .await + .expect("submit review"); + + // Lifecycle: Entered -> ExecApprovalRequest -> Exited(Some) -> TaskComplete. + wait_for_event(&test.codex, |ev| { + matches!(ev, EventMsg::EnteredReviewMode(_)) + }) + .await; + + // Expect parent-side approval request (forwarded by delegate). + wait_for_event(&test.codex, |ev| { + matches!(ev, EventMsg::ExecApprovalRequest(_)) + }) + .await; + + // Approve via parent; id "0" is the active sub_id in tests. + test.codex + .submit(Op::ExecApproval { + id: "0".into(), + decision: ReviewDecision::Approved, + }) + .await + .expect("submit exec approval"); + + wait_for_event(&test.codex, |ev| { + matches!(ev, EventMsg::ExitedReviewMode(_)) + }) + .await; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; +} + +/// Delegate should surface ApplyPatchApprovalRequest and honor parent decision +/// so the sub-agent can proceed to completion. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() { + skip_if_no_network!(); + + let call_id = "call-patch-1"; + let patch = "*** Begin Patch\n*** Add File: delegated.txt\n+hello\n*** End Patch\n"; + let sse1 = sse(vec![ + ev_response_created("resp-1"), + ev_apply_patch_function_call(call_id, patch), + ev_completed("resp-1"), + ]); + let review_json = serde_json::json!({ + "findings": [], + "overall_correctness": "ok", + "overall_explanation": "delegate patch handled", + "overall_confidence_score": 0.5 + }) + .to_string(); + let sse2 = sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-1", &review_json), + ev_completed("resp-2"), + ]); + + let server = start_mock_server().await; + mount_sse_sequence(&server, vec![sse1, sse2]).await; + + let mut builder = test_codex().with_config(|config| { + config.approval_policy = AskForApproval::OnRequest; + // Use a restricted sandbox so patch approval is required + config.sandbox_policy = SandboxPolicy::ReadOnly; + config.include_apply_patch_tool = true; + }); + let test = builder.build(&server).await.expect("build test codex"); + + test.codex + .submit(Op::Review { + review_request: ReviewRequest { + prompt: "Please review".to_string(), + user_facing_hint: "review".to_string(), + }, + }) + .await + .expect("submit review"); + + wait_for_event(&test.codex, |ev| { + matches!(ev, EventMsg::EnteredReviewMode(_)) + }) + .await; + wait_for_event(&test.codex, |ev| { + matches!(ev, EventMsg::ApplyPatchApprovalRequest(_)) + }) + .await; + + // Deny via parent so delegate can continue; id "0" is the active sub_id in tests. + test.codex + .submit(Op::PatchApproval { + id: "0".into(), + decision: ReviewDecision::Denied, + }) + .await + .expect("submit patch approval"); + + wait_for_event(&test.codex, |ev| { + matches!(ev, EventMsg::ExitedReviewMode(_)) + }) + .await; + wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index eac65b8f..5eca79b5 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -8,6 +8,7 @@ mod apply_patch_cli; mod approvals; mod cli_stream; mod client; +mod codex_delegate; mod compact; mod compact_resume_fork; mod deprecation_notice; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 718c1798..31506061 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -950,7 +950,7 @@ impl InitialHistory { } } -#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq, JsonSchema, TS, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS, Default)] #[serde(rename_all = "lowercase")] #[ts(rename_all = "lowercase")] pub enum SessionSource { @@ -959,10 +959,18 @@ pub enum SessionSource { VSCode, Exec, Mcp, + SubAgent(SubAgentSource), #[serde(other)] Unknown, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)] +pub enum SubAgentSource { + Review, + Compact, + Other(String), +} + #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)] #[ts(optional_fields = nullable)] pub struct SessionMeta {