Delegate review to codex instance (#5572)

In this PR, I am exploring migrating task kind to an invocation of
Codex. The main reason would be getting rid off multiple
`ConversationHistory` state and streamlining our context/history
management.

This approach depends on opening a channel between the sub-codex and
codex. This channel is responsible for forwarding `interactive`
(`approvals`) and `non-interactive` events. The `task` is responsible
for handling those events.

This opens the door for implementing `codex as a tool`, replacing
`compact` and `review`, and potentially subagents.

One consideration is this code is very similar to `app-server` specially
in the approval part. If in the future we wanted an interactive
`sub-codex` we should consider using `codex-mcp`
This commit is contained in:
Ahmed Ibrahim
2025-10-29 14:04:25 -07:00
committed by GitHub
parent db31f6966d
commit 13e1d0362d
28 changed files with 805 additions and 302 deletions

View File

@@ -6,21 +6,20 @@ use std::sync::atomic::AtomicU64;
use crate::AuthManager;
use crate::client_common::REVIEW_PROMPT;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::mcp::auth::McpAuthStatusEntry;
use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
use crate::parse_command::parse_command;
use crate::parse_turn_item;
use crate::response_processing::process_items;
use crate::review_format::format_review_findings_block;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::RawResponseItemEvent;
@@ -48,11 +47,9 @@ use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::trace;
use tracing::warn;
use crate::ModelProviderInfo;
use crate::apply_patch::convert_apply_patch_to_protocol;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
@@ -87,7 +84,6 @@ use crate::protocol::ExecApprovalRequestEvent;
use crate::protocol::Op;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::ReviewDecision;
use crate::protocol::ReviewOutputEvent;
use crate::protocol::SandboxCommandAssessment;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent;
@@ -103,7 +99,6 @@ use crate::shell;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
use crate::state::TaskKind;
use crate::tasks::GhostSnapshotTask;
use crate::tasks::ReviewTask;
use crate::tasks::SessionTask;
@@ -139,9 +134,9 @@ use self::compact::collect_user_messages;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
pub struct Codex {
next_id: AtomicU64,
tx_sub: Sender<Submission>,
rx_event: Receiver<Event>,
pub(crate) next_id: AtomicU64,
pub(crate) tx_sub: Sender<Submission>,
pub(crate) rx_event: Receiver<Event>,
}
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
@@ -182,16 +177,18 @@ impl Codex {
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: config.features.clone(),
session_source,
};
// Generate a unique ID for the lifetime of this Codex session.
let session_source_clone = session_configuration.session_source.clone();
let session = Session::new(
session_configuration,
config.clone(),
auth_manager.clone(),
tx_event.clone(),
conversation_history,
session_source,
session_source_clone,
)
.await
.map_err(|e| {
@@ -272,7 +269,6 @@ pub(crate) struct TurnContext {
pub(crate) sandbox_policy: SandboxPolicy,
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
pub(crate) tools_config: ToolsConfig,
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) tool_call_gate: Arc<ReadinessFlag>,
@@ -286,6 +282,7 @@ impl TurnContext {
}
}
#[allow(dead_code)]
#[derive(Clone)]
pub(crate) struct SessionConfiguration {
/// Provider identifier ("openai", "openrouter", ...).
@@ -322,6 +319,8 @@ pub(crate) struct SessionConfiguration {
// TODO(pakrym): Remove config from here
original_config_do_not_use: Arc<Config>,
/// Source of the session (cli, vscode, exec, mcp, ...)
session_source: SessionSource,
}
impl SessionConfiguration {
@@ -394,6 +393,7 @@ impl Session {
session_configuration.model_reasoning_effort,
session_configuration.model_reasoning_summary,
conversation_id,
session_configuration.session_source.clone(),
);
let tools_config = ToolsConfig::new(&ToolsConfigParams {
@@ -411,7 +411,6 @@ impl Session {
sandbox_policy: session_configuration.sandbox_policy.clone(),
shell_environment_policy: config.shell_environment_policy.clone(),
tools_config,
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
tool_call_gate: Arc::new(ReadinessFlag::new()),
@@ -816,6 +815,7 @@ impl Session {
auth_manager,
&otel,
self.conversation_id,
turn_context.client.get_session_source(),
call_id,
command,
&turn_context.sandbox_policy,
@@ -874,7 +874,7 @@ impl Session {
&self,
turn_context: &TurnContext,
call_id: String,
action: &ApplyPatchAction,
changes: HashMap<PathBuf, FileChange>,
reason: Option<String>,
grant_root: Option<PathBuf>,
) -> oneshot::Receiver<ReviewDecision> {
@@ -898,7 +898,7 @@ impl Session {
let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
changes: convert_apply_patch_to_protocol(action),
changes,
reason,
grant_root,
});
@@ -1120,18 +1120,16 @@ impl Session {
turn_context: Arc<TurnContext>,
cancellation_token: CancellationToken,
) {
if turn_context.is_review_mode
|| !self
.state
.lock()
.await
.session_configuration
.features
.enabled(Feature::GhostCommit)
if !self
.state
.lock()
.await
.session_configuration
.features
.enabled(Feature::GhostCommit)
{
return;
}
let token = match turn_context.tool_call_gate.subscribe().await {
Ok(token) => token,
Err(err) => {
@@ -1633,9 +1631,10 @@ async fn spawn_review_thread(
.unwrap_or_else(|| parent_turn_context.client.get_model_family());
// For reviews, disable web_search and view_image regardless of global settings.
let mut review_features = config.features.clone();
review_features.disable(crate::features::Feature::WebSearchRequest);
review_features.disable(crate::features::Feature::ViewImageTool);
review_features.disable(crate::features::Feature::StreamableShell);
review_features
.disable(crate::features::Feature::WebSearchRequest)
.disable(crate::features::Feature::ViewImageTool)
.disable(crate::features::Feature::StreamableShell);
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_family: &review_model_family,
features: &review_features,
@@ -1674,6 +1673,7 @@ async fn spawn_review_thread(
per_turn_config.model_reasoning_effort,
per_turn_config.model_reasoning_summary,
sess.conversation_id,
parent_turn_context.client.get_session_source(),
);
let review_turn_context = TurnContext {
@@ -1686,7 +1686,6 @@ async fn spawn_review_thread(
sandbox_policy: parent_turn_context.sandbox_policy.clone(),
shell_environment_policy: parent_turn_context.shell_environment_policy.clone(),
cwd: parent_turn_context.cwd.clone(),
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
tool_call_gate: Arc::new(ReadinessFlag::new()),
@@ -1718,14 +1717,10 @@ async fn spawn_review_thread(
/// - If the model sends only an assistant message, we record it in the
/// conversation history and consider the task complete.
///
/// Review mode: when `turn_context.is_review_mode` is true, the turn runs in an
/// isolated in-memory thread without the parent session's prior history or
/// user_instructions. Emits ExitedReviewMode upon final review message.
pub(crate) async fn run_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
task_kind: TaskKind,
cancellation_token: CancellationToken,
) -> Option<String> {
if input.is_empty() {
@@ -1737,21 +1732,8 @@ pub(crate) async fn run_task(
sess.send_event(&turn_context, event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
// For review threads, keep an isolated in-memory history so the
// model sees a fresh conversation without the parent session's history.
// For normal turns, continue recording to the session history as before.
let is_review_mode = turn_context.is_review_mode;
let mut review_thread_history: ConversationHistory = ConversationHistory::new();
if is_review_mode {
// Seed review threads with environment context so the model knows the working directory.
review_thread_history
.record_items(sess.build_initial_context(turn_context.as_ref()).iter());
review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into()));
} else {
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
.await;
}
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
.await;
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
.await;
@@ -1773,21 +1755,7 @@ pub(crate) async fn run_task(
.collect::<Vec<ResponseItem>>();
// Construct the input that we will send to the model.
//
// - For review threads, use the isolated in-memory history so the
// model sees a fresh conversation (no parent history/user_instructions).
//
// - For normal turns, use the session's full history. When using the
// chat completions API (or ZDR clients), the model needs the full
// conversation history on each turn. The rollout file, however, should
// only record the new items that originated in this turn so that it
// represents an append-only log without duplicates.
let turn_input: Vec<ResponseItem> = if is_review_mode {
if !pending_input.is_empty() {
review_thread_history.record_items(&pending_input);
}
review_thread_history.get_history_for_prompt()
} else {
let turn_input: Vec<ResponseItem> = {
sess.record_conversation_items(&turn_context, &pending_input)
.await;
sess.clone_history().await.get_history_for_prompt()
@@ -1811,7 +1779,6 @@ pub(crate) async fn run_task(
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
turn_input,
task_kind,
cancellation_token.child_token(),
)
.await
@@ -1831,14 +1798,8 @@ pub(crate) async fn run_task(
let token_limit_reached = total_usage_tokens
.map(|tokens| tokens >= limit)
.unwrap_or(false);
let (responses, items_to_record_in_conversation_history) = process_items(
processed_items,
is_review_mode,
&mut review_thread_history,
&sess,
&turn_context,
)
.await;
let (responses, items_to_record_in_conversation_history) =
process_items(processed_items, &sess, &turn_context).await;
if token_limit_reached {
if auto_compact_recently_attempted {
@@ -1880,14 +1841,7 @@ pub(crate) async fn run_task(
Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
}) => {
let _ = process_items(
processed_items,
is_review_mode,
&mut review_thread_history,
&sess,
&turn_context,
)
.await;
let _ = process_items(processed_items, &sess, &turn_context).await;
// Aborted turn is reported via a different event.
break;
}
@@ -1903,56 +1857,14 @@ pub(crate) async fn run_task(
}
}
// If this was a review thread and we have a final assistant message,
// try to parse it as a ReviewOutput.
//
// If parsing fails, construct a minimal ReviewOutputEvent using the plain
// text as the overall explanation. Else, just exit review mode with None.
//
// Emits an ExitedReviewMode event with the parsed review output.
if turn_context.is_review_mode {
exit_review_mode(
sess.clone(),
Arc::clone(&turn_context),
last_agent_message.as_deref().map(parse_review_output_event),
)
.await;
}
last_agent_message
}
/// Parse the review output; when not valid JSON, build a structured
/// fallback that carries the plain text as the overall explanation.
///
/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text.
fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
// Try direct parse first
if let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(text) {
return ev;
}
// If wrapped in markdown fences or extra prose, attempt to extract the first JSON object
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}'))
&& start < end
&& let Some(slice) = text.get(start..=end)
&& let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(slice)
{
return ev;
}
// Not JSON return a structured ReviewOutputEvent that carries
// the plain text as the overall explanation.
ReviewOutputEvent {
overall_explanation: text.to_string(),
..Default::default()
}
}
async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
input: Vec<ResponseItem>,
task_kind: TaskKind,
cancellation_token: CancellationToken,
) -> CodexResult<TurnRunResult> {
let mcp_tools = sess.services.mcp_connection_manager.list_all_tools();
@@ -1982,7 +1894,6 @@ async fn run_turn(
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
&prompt,
task_kind,
cancellation_token.child_token(),
)
.await
@@ -2065,7 +1976,6 @@ async fn try_run_turn(
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
prompt: &Prompt,
task_kind: TaskKind,
cancellation_token: CancellationToken,
) -> CodexResult<TurnRunResult> {
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
@@ -2081,7 +1991,7 @@ async fn try_run_turn(
let mut stream = turn_context
.client
.clone()
.stream_with_task_kind(prompt, task_kind)
.stream(prompt)
.or_cancel(&cancellation_token)
.await??;
@@ -2231,14 +2141,8 @@ async fn try_run_turn(
return Ok(result);
}
ResponseEvent::OutputTextDelta(delta) => {
// In review child threads, suppress assistant text deltas; the
// UI will show a selection popup from the final ReviewOutput.
if !turn_context.is_review_mode {
let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta });
sess.send_event(&turn_context, event).await;
} else {
trace!("suppressing OutputTextDelta in review mode");
}
let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta });
sess.send_event(&turn_context, event).await;
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
let event = EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta });
@@ -2273,13 +2177,7 @@ async fn handle_non_tool_response_item(
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let turn_item = match &item {
ResponseItem::Message { .. } if turn_context.is_review_mode => {
trace!("suppressing assistant Message in review mode");
None
}
_ => parse_turn_item(&item),
};
let turn_item = parse_turn_item(&item);
if let Some(turn_item) = turn_item {
sess.emit_turn_item_started_completed(
turn_context.as_ref(),
@@ -2317,62 +2215,6 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
}
})
}
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
/// and records a developer message with the review output.
pub(crate) async fn exit_review_mode(
session: Arc<Session>,
turn_context: Arc<TurnContext>,
review_output: Option<ReviewOutputEvent>,
) {
let event = EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: review_output.clone(),
});
session.send_event(turn_context.as_ref(), event).await;
let mut user_message = String::new();
if let Some(out) = review_output {
let mut findings_str = String::new();
let text = out.overall_explanation.trim();
if !text.is_empty() {
findings_str.push_str(text);
}
if !out.findings.is_empty() {
let block = format_review_findings_block(&out.findings, None);
findings_str.push_str(&format!("\n{block}"));
}
user_message.push_str(&format!(
r#"<user_action>
<context>User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve.</context>
<action>review</action>
<results>
{findings_str}
</results>
</user_action>
"#));
} else {
user_message.push_str(r#"<user_action>
<context>User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete.</context>
<action>review</action>
<results>
None.
</results>
</user_action>
"#);
}
session
.record_conversation_items(
&turn_context,
&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_message }],
}],
)
.await;
// Make the recorded review note visible immediately for readers.
session.flush_rollout().await;
}
fn mcp_init_error_display(
server_name: &str,
@@ -2427,7 +2269,6 @@ fn is_mcp_client_startup_timeout_error(error: &anyhow::Error) -> bool {
|| error_message.contains("timed out handshaking with MCP server")
}
use crate::features::Feature;
use crate::features::Features;
#[cfg(test)]
pub(crate) use tests::make_session_and_context;
@@ -2665,6 +2506,7 @@ mod tests {
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: Features::default(),
session_source: SessionSource::Exec,
};
let state = SessionState::new(session_configuration.clone());
@@ -2738,6 +2580,7 @@ mod tests {
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: Features::default(),
session_source: SessionSource::Exec,
};
let state = SessionState::new(session_configuration.clone());
@@ -2802,12 +2645,6 @@ mod tests {
sleep(Duration::from_secs(60)).await;
}
}
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
if let TaskKind::Review = self.kind {
exit_review_mode(session.clone_session(), ctx, None).await;
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -2872,25 +2709,25 @@ mod tests {
let input = vec![UserInput::Text {
text: "start review".to_string(),
}];
sess.spawn_task(
Arc::clone(&tc),
input,
NeverEndingTask {
kind: TaskKind::Review,
listen_to_cancellation_token: false,
},
)
.await;
sess.spawn_task(Arc::clone(&tc), input, ReviewTask).await;
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for first event")
.expect("first event");
match first.msg {
EventMsg::ExitedReviewMode(ev) => assert!(ev.review_output.is_none()),
other => panic!("unexpected first event: {other:?}"),
// Drain events until we observe ExitedReviewMode; earlier
// RawResponseItem entries (e.g., environment context) may arrive first.
loop {
let evt = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("timeout waiting for first event")
.expect("first event");
match evt.msg {
EventMsg::ExitedReviewMode(ev) => {
assert!(ev.review_output.is_none());
break;
}
// Ignore any non-critical events before exit.
_ => continue,
}
}
loop {
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())