feat: parallel tool calls (#4663)

Add parallel tool calls. This is configurable at model level and tool
level
This commit is contained in:
jif-oai
2025-10-05 17:10:49 +01:00
committed by GitHub
parent 3203862167
commit dc3c6bf62a
23 changed files with 961 additions and 244 deletions

View File

@@ -100,7 +100,9 @@ use crate::tasks::CompactTask;
use crate::tasks::RegularTask;
use crate::tasks::ReviewTask;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::format_exec_output_str;
use crate::tools::parallel::ToolCallRuntime;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_instructions::UserInstructions;
@@ -818,7 +820,7 @@ impl Session {
async fn on_exec_command_begin(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
turn_diff_tracker: SharedTurnDiffTracker,
exec_command_context: ExecCommandContext,
) {
let ExecCommandContext {
@@ -834,7 +836,10 @@ impl Session {
user_explicitly_approved_this_action,
changes,
}) => {
turn_diff_tracker.on_patch_begin(&changes);
{
let mut tracker = turn_diff_tracker.lock().await;
tracker.on_patch_begin(&changes);
}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id,
@@ -861,7 +866,7 @@ impl Session {
async fn on_exec_command_end(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
turn_diff_tracker: SharedTurnDiffTracker,
sub_id: &str,
call_id: &str,
output: &ExecToolCallOutput,
@@ -909,7 +914,10 @@ impl Session {
// If this is an apply_patch, after we emit the end patch, emit a second event
// with the full turn diff if there is one.
if is_apply_patch {
let unified_diff = turn_diff_tracker.get_unified_diff();
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
};
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
let event = Event {
@@ -926,7 +934,7 @@ impl Session {
/// Returns the output of the exec tool call.
pub(crate) async fn run_exec_with_events(
&self,
turn_diff_tracker: &mut TurnDiffTracker,
turn_diff_tracker: SharedTurnDiffTracker,
prepared: PreparedExec,
approval_policy: AskForApproval,
) -> Result<ExecToolCallOutput, ExecError> {
@@ -935,7 +943,7 @@ impl Session {
let sub_id = context.sub_id.clone();
let call_id = context.call_id.clone();
self.on_exec_command_begin(turn_diff_tracker, context.clone())
self.on_exec_command_begin(turn_diff_tracker.clone(), context.clone())
.await;
let result = self
@@ -1644,7 +1652,7 @@ pub(crate) async fn run_task(
let mut last_agent_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.
let mut turn_diff_tracker = TurnDiffTracker::new();
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let mut auto_compact_recently_attempted = false;
loop {
@@ -1692,9 +1700,9 @@ pub(crate) async fn run_task(
})
.collect();
match run_turn(
&sess,
turn_context.as_ref(),
&mut turn_diff_tracker,
Arc::clone(&sess),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
sub_id.clone(),
turn_input,
)
@@ -1917,18 +1925,27 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
}
async fn run_turn(
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
sub_id: String,
input: Vec<ResponseItem>,
) -> CodexResult<TurnRunResult> {
let mcp_tools = sess.services.mcp_connection_manager.list_all_tools();
let router = ToolRouter::from_config(&turn_context.tools_config, Some(mcp_tools));
let router = Arc::new(ToolRouter::from_config(
&turn_context.tools_config,
Some(mcp_tools),
));
let model_supports_parallel = turn_context
.client
.get_model_family()
.supports_parallel_tool_calls;
let parallel_tool_calls = model_supports_parallel;
let prompt = Prompt {
input,
tools: router.specs().to_vec(),
tools: router.specs(),
parallel_tool_calls,
base_instructions_override: turn_context.base_instructions.clone(),
output_schema: turn_context.final_output_json_schema.clone(),
};
@@ -1936,10 +1953,10 @@ async fn run_turn(
let mut retries = 0;
loop {
match try_run_turn(
&router,
sess,
turn_context,
turn_diff_tracker,
Arc::clone(&router),
Arc::clone(&sess),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
&sub_id,
&prompt,
)
@@ -1950,7 +1967,7 @@ async fn run_turn(
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
Err(e @ CodexErr::Fatal(_)) => return Err(e),
Err(e @ CodexErr::ContextWindowExceeded) => {
sess.set_total_tokens_full(&sub_id, turn_context).await;
sess.set_total_tokens_full(&sub_id, &turn_context).await;
return Err(e);
}
Err(CodexErr::UsageLimitReached(e)) => {
@@ -1999,9 +2016,9 @@ async fn run_turn(
/// "handled" such that it produces a `ResponseInputItem` that needs to be
/// sent back to the model on the next turn.
#[derive(Debug)]
struct ProcessedResponseItem {
item: ResponseItem,
response: Option<ResponseInputItem>,
pub(crate) struct ProcessedResponseItem {
pub(crate) item: ResponseItem,
pub(crate) response: Option<ResponseInputItem>,
}
#[derive(Debug)]
@@ -2011,10 +2028,10 @@ struct TurnRunResult {
}
async fn try_run_turn(
router: &crate::tools::ToolRouter,
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
router: Arc<ToolRouter>,
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
sub_id: &str,
prompt: &Prompt,
) -> CodexResult<TurnRunResult> {
@@ -2085,24 +2102,34 @@ async fn try_run_turn(
let mut stream = turn_context.client.clone().stream(&prompt).await?;
let mut output = Vec::new();
let mut tool_runtime = ToolCallRuntime::new(
Arc::clone(&router),
Arc::clone(&sess),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
sub_id.to_string(),
);
loop {
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = stream.next().await;
let Some(event) = event else {
// Channel closed without yielding a final Completed event or explicit error.
// Treat as a disconnected stream so the caller can retry.
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
let event = match event {
Some(event) => event,
None => {
tool_runtime.abort_all();
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
}
};
let event = match event {
Ok(ev) => ev,
Err(e) => {
tool_runtime.abort_all();
// Propagate the underlying stream error to the caller (run_turn), which
// will apply the configured `stream_max_retries` policy.
return Err(e);
@@ -2112,16 +2139,66 @@ async fn try_run_turn(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let response = handle_response_item(
router,
sess,
turn_context,
turn_diff_tracker,
sub_id,
item.clone(),
)
.await?;
output.push(ProcessedResponseItem { item, response });
match ToolRouter::build_tool_call(sess.as_ref(), item.clone()) {
Ok(Some(call)) => {
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
let index = output.len();
output.push(ProcessedResponseItem {
item,
response: None,
});
tool_runtime
.handle_tool_call(call, index, output.as_mut_slice())
.await?;
}
Ok(None) => {
let response = handle_non_tool_response_item(
Arc::clone(&sess),
Arc::clone(&turn_context),
sub_id,
item.clone(),
)
.await?;
output.push(ProcessedResponseItem { item, response });
}
Err(FunctionCallError::MissingLocalShellCallId) => {
let msg = "LocalShellCall without call_id or id";
turn_context
.client
.get_otel_event_manager()
.log_tool_failed("local_shell", msg);
error!(msg);
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: msg.to_string(),
success: None,
},
};
output.push(ProcessedResponseItem {
item,
response: Some(response),
});
}
Err(FunctionCallError::RespondToModel(message)) => {
let response = ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: message,
success: None,
},
};
output.push(ProcessedResponseItem {
item,
response: Some(response),
});
}
Err(FunctionCallError::Fatal(message)) => {
return Err(CodexErr::Fatal(message));
}
}
}
ResponseEvent::WebSearchCallBegin { call_id } => {
let _ = sess
@@ -2141,10 +2218,15 @@ async fn try_run_turn(
response_id: _,
token_usage,
} => {
sess.update_token_usage_info(sub_id, turn_context, token_usage.as_ref())
sess.update_token_usage_info(sub_id, turn_context.as_ref(), token_usage.as_ref())
.await;
let unified_diff = turn_diff_tracker.get_unified_diff();
tool_runtime.resolve_pending(output.as_mut_slice()).await?;
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
};
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
let event = Event {
@@ -2203,88 +2285,40 @@ async fn try_run_turn(
}
}
async fn handle_response_item(
router: &crate::tools::ToolRouter,
sess: &Session,
turn_context: &TurnContext,
turn_diff_tracker: &mut TurnDiffTracker,
async fn handle_non_tool_response_item(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: &str,
item: ResponseItem,
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");
match ToolRouter::build_tool_call(sess, item.clone()) {
Ok(Some(call)) => {
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
match router
.dispatch_tool_call(sess, turn_context, turn_diff_tracker, sub_id, call)
.await
{
Ok(response) => Ok(Some(response)),
Err(FunctionCallError::Fatal(message)) => Err(CodexErr::Fatal(message)),
Err(other) => unreachable!("non-fatal tool error returned: {other:?}"),
match &item {
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let msgs = match &item {
ResponseItem::Message { .. } if turn_context.is_review_mode => {
trace!("suppressing assistant Message in review mode");
Vec::new()
}
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning()),
};
for msg in msgs {
let event = Event {
id: sub_id.to_string(),
msg,
};
sess.send_event(event).await;
}
}
Ok(None) => {
match &item {
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let msgs = match &item {
ResponseItem::Message { .. } if turn_context.is_review_mode => {
trace!("suppressing assistant Message in review mode");
Vec::new()
}
_ => map_response_item_to_event_messages(
&item,
sess.show_raw_agent_reasoning(),
),
};
for msg in msgs {
let event = Event {
id: sub_id.to_string(),
msg,
};
sess.send_event(event).await;
}
}
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCallOutput { .. } => {
debug!("unexpected tool output from stream");
}
_ => {}
}
Ok(None)
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
debug!("unexpected tool output from stream");
}
Err(FunctionCallError::MissingLocalShellCallId) => {
let msg = "LocalShellCall without call_id or id";
turn_context
.client
.get_otel_event_manager()
.log_tool_failed("local_shell", msg);
error!(msg);
Ok(Some(ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: msg.to_string(),
success: None,
},
}))
}
Err(FunctionCallError::RespondToModel(msg)) => {
Ok(Some(ResponseInputItem::FunctionCallOutput {
call_id: String::new(),
output: FunctionCallOutputPayload {
content: msg,
success: None,
},
}))
}
Err(FunctionCallError::Fatal(message)) => Err(CodexErr::Fatal(message)),
_ => {}
}
Ok(None)
}
pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
@@ -2927,13 +2961,10 @@ mod tests {
#[tokio::test]
async fn fatal_tool_error_stops_turn_and_reports_error() {
let (session, turn_context, _rx) = make_session_and_context_with_rx();
let session_ref = session.as_ref();
let turn_context_ref = turn_context.as_ref();
let router = ToolRouter::from_config(
&turn_context_ref.tools_config,
Some(session_ref.services.mcp_connection_manager.list_all_tools()),
&turn_context.tools_config,
Some(session.services.mcp_connection_manager.list_all_tools()),
);
let mut tracker = TurnDiffTracker::new();
let item = ResponseItem::CustomToolCall {
id: None,
status: None,
@@ -2942,22 +2973,26 @@ mod tests {
input: "{}".to_string(),
};
let err = handle_response_item(
&router,
session_ref,
turn_context_ref,
&mut tracker,
"sub-id",
item,
)
.await
.expect_err("expected fatal error");
let call = ToolRouter::build_tool_call(session.as_ref(), item.clone())
.expect("build tool call")
.expect("tool call present");
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let err = router
.dispatch_tool_call(
Arc::clone(&session),
Arc::clone(&turn_context),
tracker,
"sub-id".to_string(),
call,
)
.await
.expect_err("expected fatal error");
match err {
CodexErr::Fatal(message) => {
FunctionCallError::Fatal(message) => {
assert_eq!(message, "tool shell invoked with incompatible payload");
}
other => panic!("expected CodexErr::Fatal, got {other:?}"),
other => panic!("expected FunctionCallError::Fatal, got {other:?}"),
}
}
@@ -3071,9 +3106,11 @@ mod tests {
use crate::turn_diff_tracker::TurnDiffTracker;
use std::collections::HashMap;
let (session, mut turn_context) = make_session_and_context();
let (session, mut turn_context_raw) = make_session_and_context();
// Ensure policy is NOT OnRequest so the early rejection path triggers
turn_context.approval_policy = AskForApproval::OnFailure;
turn_context_raw.approval_policy = AskForApproval::OnFailure;
let session = Arc::new(session);
let mut turn_context = Arc::new(turn_context_raw);
let params = ExecParams {
command: if cfg!(windows) {
@@ -3101,7 +3138,7 @@ mod tests {
..params.clone()
};
let mut turn_diff_tracker = TurnDiffTracker::new();
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let tool_name = "shell";
let sub_id = "test-sub".to_string();
@@ -3110,9 +3147,9 @@ mod tests {
let resp = handle_container_exec_with_params(
tool_name,
params,
&session,
&turn_context,
&mut turn_diff_tracker,
Arc::clone(&session),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
sub_id,
call_id,
)
@@ -3131,14 +3168,16 @@ mod tests {
// Now retry the same command WITHOUT escalated permissions; should succeed.
// Force DangerFullAccess to avoid platform sandbox dependencies in tests.
turn_context.sandbox_policy = SandboxPolicy::DangerFullAccess;
Arc::get_mut(&mut turn_context)
.expect("unique turn context Arc")
.sandbox_policy = SandboxPolicy::DangerFullAccess;
let resp2 = handle_container_exec_with_params(
tool_name,
params2,
&session,
&turn_context,
&mut turn_diff_tracker,
Arc::clone(&session),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
"test-sub".to_string(),
"test-call-2".to_string(),
)