Review Mode (Core) (#3401)
## 📝 Review Mode -- Core
This PR introduces the Core implementation for Review mode:
- New op `Op::Review { prompt: String }:` spawns a child review task
with isolated context, a review‑specific system prompt, and a
`Config.review_model`.
- `EnteredReviewMode`: emitted when the child review session starts.
Every event from this point onwards reflects the review session.
- `ExitedReviewMode(Option<ReviewOutputEvent>)`: emitted when the review
finishes or is interrupted, with optional structured findings:
```json
{
"findings": [
{
"title": "<≤ 80 chars, imperative>",
"body": "<valid Markdown explaining *why* this is a problem; cite files/lines/functions>",
"confidence_score": <float 0.0-1.0>,
"priority": <int 0-3>,
"code_location": {
"absolute_file_path": "<file path>",
"line_range": {"start": <int>, "end": <int>}
}
}
],
"overall_correctness": "patch is correct" | "patch is incorrect",
"overall_explanation": "<1-3 sentence explanation justifying the overall_correctness verdict>",
"overall_confidence_score": <float 0.0-1.0>
}
```
## Questions
### Why separate out its own message history?
We want the review thread to match the training of our review models as
much as possible -- that means using a custom prompt, removing user
instructions, and starting a clean chat history.
We also want to make sure the review thread doesn't leak into the parent
thread.
### Why do this as a mode, vs. sub-agents?
1. We want review to be a synchronous task, so it's fine for now to do a
bespoke implementation.
2. We're still unclear about the final structure for sub-agents. We'd
prefer to land this quickly and then refactor into sub-agents without
rushing that implementation.
This commit is contained in:
@@ -9,6 +9,7 @@ use std::sync::atomic::AtomicU64;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::client_common::REVIEW_PROMPT;
|
||||
use crate::event_mapping::map_response_item_to_event_messages;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
@@ -17,6 +18,7 @@ use codex_apply_patch::MaybeApplyPatchVerified;
|
||||
use codex_apply_patch::maybe_parse_apply_patch_verified;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::protocol::ConversationPathResponseEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::TaskStartedEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
@@ -95,6 +97,7 @@ use crate::protocol::Op;
|
||||
use crate::protocol::PatchApplyBeginEvent;
|
||||
use crate::protocol::PatchApplyEndEvent;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::ReviewOutputEvent;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::protocol::StreamErrorEvent;
|
||||
@@ -307,6 +310,7 @@ pub(crate) struct TurnContext {
|
||||
pub(crate) sandbox_policy: SandboxPolicy,
|
||||
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
|
||||
pub(crate) tools_config: ToolsConfig,
|
||||
pub(crate) is_review_mode: bool,
|
||||
}
|
||||
|
||||
impl TurnContext {
|
||||
@@ -478,6 +482,7 @@ impl Session {
|
||||
sandbox_policy,
|
||||
shell_environment_policy: config.shell_environment_policy.clone(),
|
||||
cwd,
|
||||
is_review_mode: false,
|
||||
};
|
||||
let sess = Arc::new(Session {
|
||||
conversation_id,
|
||||
@@ -1032,11 +1037,19 @@ pub(crate) struct ApplyPatchCommandContext {
|
||||
pub(crate) changes: HashMap<PathBuf, FileChange>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
enum AgentTaskKind {
|
||||
Regular,
|
||||
Review,
|
||||
Compact,
|
||||
}
|
||||
|
||||
/// A series of Turns in response to user input.
|
||||
pub(crate) struct AgentTask {
|
||||
sess: Arc<Session>,
|
||||
sub_id: String,
|
||||
handle: AbortHandle,
|
||||
kind: AgentTaskKind,
|
||||
}
|
||||
|
||||
impl AgentTask {
|
||||
@@ -1056,6 +1069,27 @@ impl AgentTask {
|
||||
sess,
|
||||
sub_id,
|
||||
handle,
|
||||
kind: AgentTaskKind::Regular,
|
||||
}
|
||||
}
|
||||
|
||||
fn review(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
sub_id: String,
|
||||
input: Vec<InputItem>,
|
||||
) -> Self {
|
||||
let handle = {
|
||||
let sess = sess.clone();
|
||||
let sub_id = sub_id.clone();
|
||||
let tc = Arc::clone(&turn_context);
|
||||
tokio::spawn(async move { run_task(sess, tc, sub_id, input).await }).abort_handle()
|
||||
};
|
||||
Self {
|
||||
sess,
|
||||
sub_id,
|
||||
handle,
|
||||
kind: AgentTaskKind::Review,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1079,12 +1113,20 @@ impl AgentTask {
|
||||
sess,
|
||||
sub_id,
|
||||
handle,
|
||||
kind: AgentTaskKind::Compact,
|
||||
}
|
||||
}
|
||||
|
||||
fn abort(self, reason: TurnAbortReason) {
|
||||
// TOCTOU?
|
||||
if !self.handle.is_finished() {
|
||||
if self.kind == AgentTaskKind::Review {
|
||||
let sess = self.sess.clone();
|
||||
let sub_id = self.sub_id.clone();
|
||||
tokio::spawn(async move {
|
||||
exit_review_mode(sess, sub_id, None).await;
|
||||
});
|
||||
}
|
||||
self.handle.abort();
|
||||
let event = Event {
|
||||
id: self.sub_id,
|
||||
@@ -1184,6 +1226,7 @@ async fn submission_loop(
|
||||
sandbox_policy: new_sandbox_policy.clone(),
|
||||
shell_environment_policy: prev.shell_environment_policy.clone(),
|
||||
cwd: new_cwd.clone(),
|
||||
is_review_mode: false,
|
||||
};
|
||||
|
||||
// Install the new persistent context for subsequent tasks/turns.
|
||||
@@ -1269,6 +1312,7 @@ async fn submission_loop(
|
||||
sandbox_policy,
|
||||
shell_environment_policy: turn_context.shell_environment_policy.clone(),
|
||||
cwd,
|
||||
is_review_mode: false,
|
||||
};
|
||||
// TODO: record the new environment context in the conversation history
|
||||
// no current task, spawn a new one with the per‑turn context
|
||||
@@ -1430,6 +1474,16 @@ async fn submission_loop(
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
Op::Review { review_request } => {
|
||||
spawn_review_thread(
|
||||
sess.clone(),
|
||||
config.clone(),
|
||||
turn_context.clone(),
|
||||
sub.id,
|
||||
review_request,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
_ => {
|
||||
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
|
||||
}
|
||||
@@ -1438,6 +1492,82 @@ async fn submission_loop(
|
||||
debug!("Agent loop exited");
|
||||
}
|
||||
|
||||
/// Spawn a review thread using the given prompt.
|
||||
async fn spawn_review_thread(
|
||||
sess: Arc<Session>,
|
||||
config: Arc<Config>,
|
||||
parent_turn_context: Arc<TurnContext>,
|
||||
sub_id: String,
|
||||
review_request: ReviewRequest,
|
||||
) {
|
||||
let model = config.review_model.clone();
|
||||
let review_model_family = find_family_for_model(&model)
|
||||
.unwrap_or_else(|| parent_turn_context.client.get_model_family());
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &review_model_family,
|
||||
approval_policy: parent_turn_context.approval_policy,
|
||||
sandbox_policy: parent_turn_context.sandbox_policy.clone(),
|
||||
include_plan_tool: false,
|
||||
include_apply_patch_tool: config.include_apply_patch_tool,
|
||||
include_web_search_request: false,
|
||||
use_streamable_shell_tool: false,
|
||||
include_view_image_tool: false,
|
||||
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
|
||||
});
|
||||
|
||||
let base_instructions = Some(REVIEW_PROMPT.to_string());
|
||||
let provider = parent_turn_context.client.get_provider();
|
||||
let auth_manager = parent_turn_context.client.get_auth_manager();
|
||||
let model_family = review_model_family.clone();
|
||||
|
||||
// Build per‑turn client with the requested model/family.
|
||||
let mut per_turn_config = (*config).clone();
|
||||
per_turn_config.model = model.clone();
|
||||
per_turn_config.model_family = model_family.clone();
|
||||
if let Some(model_info) = get_model_info(&model_family) {
|
||||
per_turn_config.model_context_window = Some(model_info.context_window);
|
||||
}
|
||||
|
||||
let client = ModelClient::new(
|
||||
Arc::new(per_turn_config),
|
||||
auth_manager,
|
||||
provider,
|
||||
parent_turn_context.client.get_reasoning_effort(),
|
||||
parent_turn_context.client.get_reasoning_summary(),
|
||||
sess.conversation_id,
|
||||
);
|
||||
|
||||
let review_turn_context = TurnContext {
|
||||
client,
|
||||
tools_config,
|
||||
user_instructions: None,
|
||||
base_instructions,
|
||||
approval_policy: parent_turn_context.approval_policy,
|
||||
sandbox_policy: parent_turn_context.sandbox_policy.clone(),
|
||||
shell_environment_policy: parent_turn_context.shell_environment_policy.clone(),
|
||||
cwd: parent_turn_context.cwd.clone(),
|
||||
is_review_mode: true,
|
||||
};
|
||||
|
||||
// Seed the child task with the review prompt as the initial user message.
|
||||
let input: Vec<InputItem> = vec![InputItem::Text {
|
||||
text: review_request.prompt.clone(),
|
||||
}];
|
||||
let tc = Arc::new(review_turn_context);
|
||||
|
||||
// Clone sub_id for the upcoming announcement before moving it into the task.
|
||||
let sub_id_for_event = sub_id.clone();
|
||||
let task = AgentTask::review(sess.clone(), tc.clone(), sub_id, input);
|
||||
sess.set_task(task);
|
||||
|
||||
// Announce entering review mode so UIs can switch modes.
|
||||
sess.send_event(Event {
|
||||
id: sub_id_for_event,
|
||||
msg: EventMsg::EnteredReviewMode(review_request),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Takes a user message as input and runs a loop where, at each turn, the model
|
||||
/// replies with either:
|
||||
///
|
||||
@@ -1451,6 +1581,10 @@ async fn submission_loop(
|
||||
/// back to the model in the next turn.
|
||||
/// - If the model sends only an assistant message, we record it in the
|
||||
/// conversation history and consider the task complete.
|
||||
///
|
||||
/// Review mode: when `turn_context.is_review_mode` is true, the turn runs in an
|
||||
/// isolated in-memory thread without the parent session's prior history or
|
||||
/// user_instructions. Emits ExitedReviewMode upon final review message.
|
||||
async fn run_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
@@ -1469,8 +1603,17 @@ async fn run_task(
|
||||
sess.send_event(event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
|
||||
.await;
|
||||
// For review threads, keep an isolated in-memory history so the
|
||||
// model sees a fresh conversation without the parent session's history.
|
||||
// For normal turns, continue recording to the session history as before.
|
||||
let is_review_mode = turn_context.is_review_mode;
|
||||
let mut review_thread_history: Vec<ResponseItem> = Vec::new();
|
||||
if is_review_mode {
|
||||
review_thread_history.push(initial_input_for_turn.into());
|
||||
} else {
|
||||
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
|
||||
.await;
|
||||
}
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
|
||||
@@ -1487,14 +1630,26 @@ async fn run_task(
|
||||
.into_iter()
|
||||
.map(ResponseItem::from)
|
||||
.collect::<Vec<ResponseItem>>();
|
||||
sess.record_conversation_items(&pending_input).await;
|
||||
|
||||
// Construct the input that we will send to the model. When using the
|
||||
// Chat completions API (or ZDR clients), the model needs the full
|
||||
// conversation history on each turn. The rollout file, however, should
|
||||
// only record the new items that originated in this turn so that it
|
||||
// represents an append-only log without duplicates.
|
||||
let turn_input: Vec<ResponseItem> = sess.turn_input_with_history(pending_input);
|
||||
// Construct the input that we will send to the model.
|
||||
//
|
||||
// - For review threads, use the isolated in-memory history so the
|
||||
// model sees a fresh conversation (no parent history/user_instructions).
|
||||
//
|
||||
// - For normal turns, use the session's full history. When using the
|
||||
// chat completions API (or ZDR clients), the model needs the full
|
||||
// conversation history on each turn. The rollout file, however, should
|
||||
// only record the new items that originated in this turn so that it
|
||||
// represents an append-only log without duplicates.
|
||||
let turn_input: Vec<ResponseItem> = if is_review_mode {
|
||||
if !pending_input.is_empty() {
|
||||
review_thread_history.extend(pending_input);
|
||||
}
|
||||
review_thread_history.clone()
|
||||
} else {
|
||||
sess.record_conversation_items(&pending_input).await;
|
||||
sess.turn_input_with_history(pending_input)
|
||||
};
|
||||
|
||||
let turn_input_messages: Vec<String> = turn_input
|
||||
.iter()
|
||||
@@ -1628,8 +1783,13 @@ async fn run_task(
|
||||
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
if is_review_mode {
|
||||
review_thread_history
|
||||
.extend(items_to_record_in_conversation_history.clone());
|
||||
} else {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
if token_limit_reached {
|
||||
@@ -1683,6 +1843,23 @@ async fn run_task(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If this was a review thread and we have a final assistant message,
|
||||
// try to parse it as a ReviewOutput.
|
||||
//
|
||||
// If parsing fails, construct a minimal ReviewOutputEvent using the plain
|
||||
// text as the overall explanation. Else, just exit review mode with None.
|
||||
//
|
||||
// Emits an ExitedReviewMode event with the parsed review output.
|
||||
if turn_context.is_review_mode {
|
||||
exit_review_mode(
|
||||
sess.clone(),
|
||||
sub_id.clone(),
|
||||
last_agent_message.as_deref().map(parse_review_output_event),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
sess.remove_task(&sub_id);
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
@@ -1691,6 +1868,31 @@ async fn run_task(
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
/// Parse the review output; when not valid JSON, build a structured
|
||||
/// fallback that carries the plain text as the overall explanation.
|
||||
///
|
||||
/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text.
|
||||
fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
|
||||
// Try direct parse first
|
||||
if let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(text) {
|
||||
return ev;
|
||||
}
|
||||
// If wrapped in markdown fences or extra prose, attempt to extract the first JSON object
|
||||
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}'))
|
||||
&& start < end
|
||||
&& let Some(slice) = text.get(start..=end)
|
||||
&& let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(slice)
|
||||
{
|
||||
return ev;
|
||||
}
|
||||
// Not JSON – return a structured ReviewOutputEvent that carries
|
||||
// the plain text as the overall explanation.
|
||||
ReviewOutputEvent {
|
||||
overall_explanation: text.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_turn(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
@@ -1917,11 +2119,17 @@ async fn try_run_turn(
|
||||
return Ok(result);
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
// In review child threads, suppress assistant text deltas; the
|
||||
// UI will show a selection popup from the final ReviewOutput.
|
||||
if !turn_context.is_review_mode {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
} else {
|
||||
trace!("suppressing OutputTextDelta in review mode");
|
||||
}
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta(delta) => {
|
||||
let event = Event {
|
||||
@@ -2053,7 +2261,15 @@ async fn handle_response_item(
|
||||
ResponseItem::Message { .. }
|
||||
| ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::WebSearchCall { .. } => {
|
||||
let msgs = map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning);
|
||||
// In review child threads, suppress assistant message events but
|
||||
// keep reasoning/web search.
|
||||
let msgs = match &item {
|
||||
ResponseItem::Message { .. } if turn_context.is_review_mode => {
|
||||
trace!("suppressing assistant Message in review mode");
|
||||
Vec::new()
|
||||
}
|
||||
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning),
|
||||
};
|
||||
for msg in msgs {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
@@ -2988,6 +3204,19 @@ fn convert_call_tool_result_to_function_call_output_payload(
|
||||
}
|
||||
}
|
||||
|
||||
/// Emits an ExitedReviewMode Event with optional ReviewOutput.
|
||||
async fn exit_review_mode(
|
||||
session: Arc<Session>,
|
||||
task_sub_id: String,
|
||||
res: Option<ReviewOutputEvent>,
|
||||
) {
|
||||
let event = Event {
|
||||
id: task_sub_id,
|
||||
msg: EventMsg::ExitedReviewMode(res),
|
||||
};
|
||||
session.send_event(event).await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user