Pass TurnContext around instead of sub_id (#5421)
Today `sub_id` is an ID of a single incoming Codex Op submition. We then associate all events triggered by this operation using the same `sub_id`. At the same time we are also creating a TurnContext per submission and we'd like to start associating some events (item added/item completed) with an entire turn instead of just the operation that started it. Using turn context when sending events give us flexibility to change notification scheme.
This commit is contained in:
@@ -255,6 +255,7 @@ pub(crate) struct Session {
|
||||
/// The context needed for a single turn of the conversation.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TurnContext {
|
||||
pub(crate) sub_id: String,
|
||||
pub(crate) client: ModelClient,
|
||||
/// The session's current working directory. All relative paths provided by
|
||||
/// the model as well as sandbox policies are resolved against this path
|
||||
@@ -359,6 +360,7 @@ impl Session {
|
||||
session_configuration: &SessionConfiguration,
|
||||
conversation_id: ConversationId,
|
||||
tx_event: Sender<Event>,
|
||||
sub_id: String,
|
||||
) -> TurnContext {
|
||||
let config = session_configuration.original_config_do_not_use.clone();
|
||||
let model_family = find_family_for_model(&session_configuration.model)
|
||||
@@ -392,7 +394,10 @@ impl Session {
|
||||
features: &config.features,
|
||||
});
|
||||
|
||||
let item_collector = ItemCollector::new(tx_event, conversation_id, sub_id.clone());
|
||||
|
||||
TurnContext {
|
||||
sub_id,
|
||||
client,
|
||||
cwd: session_configuration.cwd.clone(),
|
||||
base_instructions: session_configuration.base_instructions.clone(),
|
||||
@@ -404,7 +409,7 @@ impl Session {
|
||||
is_review_mode: false,
|
||||
final_output_json_schema: None,
|
||||
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
||||
item_collector: ItemCollector::new(tx_event, conversation_id, "turn_id".to_string()),
|
||||
item_collector,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -587,7 +592,7 @@ impl Session {
|
||||
})
|
||||
.chain(post_session_configured_error_events.into_iter());
|
||||
for event in events {
|
||||
sess.send_event(event).await;
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
Ok(sess)
|
||||
@@ -638,6 +643,15 @@ impl Session {
|
||||
}
|
||||
|
||||
pub(crate) async fn new_turn(&self, updates: SessionSettingsUpdate) -> Arc<TurnContext> {
|
||||
let sub_id = self.next_internal_sub_id();
|
||||
self.new_turn_with_sub_id(sub_id, updates).await
|
||||
}
|
||||
|
||||
pub(crate) async fn new_turn_with_sub_id(
|
||||
&self,
|
||||
sub_id: String,
|
||||
updates: SessionSettingsUpdate,
|
||||
) -> Arc<TurnContext> {
|
||||
let session_configuration = {
|
||||
let mut state = self.state.lock().await;
|
||||
let session_configuration = state.session_configuration.clone().apply(&updates);
|
||||
@@ -652,6 +666,7 @@ impl Session {
|
||||
&session_configuration,
|
||||
self.conversation_id,
|
||||
self.get_tx_event(),
|
||||
sub_id,
|
||||
);
|
||||
if let Some(final_schema) = updates.final_output_json_schema {
|
||||
turn_context.final_output_json_schema = final_schema;
|
||||
@@ -678,7 +693,15 @@ impl Session {
|
||||
}
|
||||
|
||||
/// Persist the event to rollout and send it to clients.
|
||||
pub(crate) async fn send_event(&self, event: Event) {
|
||||
pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) {
|
||||
let event = Event {
|
||||
id: turn_context.sub_id.clone(),
|
||||
msg,
|
||||
};
|
||||
self.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_event_raw(&self, event: Event) {
|
||||
// Persist the event into rollout (recorder filters as needed)
|
||||
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
@@ -694,12 +717,13 @@ impl Session {
|
||||
/// default `ReviewDecision` (`Denied`).
|
||||
pub async fn request_command_approval(
|
||||
&self,
|
||||
sub_id: String,
|
||||
turn_context: &TurnContext,
|
||||
call_id: String,
|
||||
command: Vec<String>,
|
||||
cwd: PathBuf,
|
||||
reason: Option<String>,
|
||||
) -> ReviewDecision {
|
||||
let sub_id = turn_context.sub_id.clone();
|
||||
// Add the tx_approve callback to the map before sending the request.
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
@@ -718,28 +742,26 @@ impl Session {
|
||||
}
|
||||
|
||||
let parsed_cmd = parse_command(&command);
|
||||
let event = Event {
|
||||
id: event_id,
|
||||
msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
call_id,
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
parsed_cmd,
|
||||
}),
|
||||
};
|
||||
self.send_event(event).await;
|
||||
let event = EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
call_id,
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
parsed_cmd,
|
||||
});
|
||||
self.send_event(turn_context, event).await;
|
||||
rx_approve.await.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub async fn request_patch_approval(
|
||||
&self,
|
||||
sub_id: String,
|
||||
turn_context: &TurnContext,
|
||||
call_id: String,
|
||||
action: &ApplyPatchAction,
|
||||
reason: Option<String>,
|
||||
grant_root: Option<PathBuf>,
|
||||
) -> oneshot::Receiver<ReviewDecision> {
|
||||
let sub_id = turn_context.sub_id.clone();
|
||||
// Add the tx_approve callback to the map before sending the request.
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
@@ -757,16 +779,13 @@ impl Session {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
}
|
||||
|
||||
let event = Event {
|
||||
id: event_id,
|
||||
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id,
|
||||
changes: convert_apply_patch_to_protocol(action),
|
||||
reason,
|
||||
grant_root,
|
||||
}),
|
||||
};
|
||||
self.send_event(event).await;
|
||||
let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id,
|
||||
changes: convert_apply_patch_to_protocol(action),
|
||||
reason,
|
||||
grant_root,
|
||||
});
|
||||
self.send_event(turn_context, event).await;
|
||||
rx_approve
|
||||
}
|
||||
|
||||
@@ -878,7 +897,6 @@ impl Session {
|
||||
|
||||
async fn update_token_usage_info(
|
||||
&self,
|
||||
sub_id: &str,
|
||||
turn_context: &TurnContext,
|
||||
token_usage: Option<&TokenUsage>,
|
||||
) {
|
||||
@@ -891,37 +909,38 @@ impl Session {
|
||||
);
|
||||
}
|
||||
}
|
||||
self.send_token_count_event(sub_id).await;
|
||||
self.send_token_count_event(turn_context).await;
|
||||
}
|
||||
|
||||
async fn update_rate_limits(&self, sub_id: &str, new_rate_limits: RateLimitSnapshot) {
|
||||
async fn update_rate_limits(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
new_rate_limits: RateLimitSnapshot,
|
||||
) {
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_rate_limits(new_rate_limits);
|
||||
}
|
||||
self.send_token_count_event(sub_id).await;
|
||||
self.send_token_count_event(turn_context).await;
|
||||
}
|
||||
|
||||
async fn send_token_count_event(&self, sub_id: &str) {
|
||||
async fn send_token_count_event(&self, turn_context: &TurnContext) {
|
||||
let (info, rate_limits) = {
|
||||
let state = self.state.lock().await;
|
||||
state.token_info_and_rate_limits()
|
||||
};
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::TokenCount(TokenCountEvent { info, rate_limits }),
|
||||
};
|
||||
self.send_event(event).await;
|
||||
let event = EventMsg::TokenCount(TokenCountEvent { info, rate_limits });
|
||||
self.send_event(turn_context, event).await;
|
||||
}
|
||||
|
||||
async fn set_total_tokens_full(&self, sub_id: &str, turn_context: &TurnContext) {
|
||||
async fn set_total_tokens_full(&self, turn_context: &TurnContext) {
|
||||
let context_window = turn_context.client.get_model_context_window();
|
||||
if let Some(context_window) = context_window {
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_token_usage_full(context_window);
|
||||
}
|
||||
self.send_token_count_event(sub_id).await;
|
||||
self.send_token_count_event(turn_context).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -951,24 +970,22 @@ impl Session {
|
||||
/// Helper that emits a BackgroundEvent with the given message. This keeps
|
||||
/// the call‑sites terse so adding more diagnostics does not clutter the
|
||||
/// core agent logic.
|
||||
pub(crate) async fn notify_background_event(&self, sub_id: &str, message: impl Into<String>) {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
|
||||
message: message.into(),
|
||||
}),
|
||||
};
|
||||
self.send_event(event).await;
|
||||
pub(crate) async fn notify_background_event(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
message: impl Into<String>,
|
||||
) {
|
||||
let event = EventMsg::BackgroundEvent(BackgroundEventEvent {
|
||||
message: message.into(),
|
||||
});
|
||||
self.send_event(turn_context, event).await;
|
||||
}
|
||||
|
||||
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::StreamError(StreamErrorEvent {
|
||||
message: message.into(),
|
||||
}),
|
||||
};
|
||||
self.send_event(event).await;
|
||||
async fn notify_stream_error(&self, turn_context: &TurnContext, message: impl Into<String>) {
|
||||
let event = EventMsg::StreamError(StreamErrorEvent {
|
||||
message: message.into(),
|
||||
});
|
||||
self.send_event(turn_context, event).await;
|
||||
}
|
||||
|
||||
/// Build the full turn input by concatenating the current conversation
|
||||
@@ -1129,7 +1146,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
Op::UserInput { items } => (items, SessionSettingsUpdate::default()),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let current_context = sess.new_turn(updates).await;
|
||||
let current_context = sess.new_turn_with_sub_id(sub.id.clone(), updates).await;
|
||||
current_context
|
||||
.client
|
||||
.get_otel_event_manager()
|
||||
@@ -1145,11 +1162,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
&env_item,
|
||||
sess.show_raw_agent_reasoning(),
|
||||
) {
|
||||
let event = Event {
|
||||
id: sub.id.clone(),
|
||||
msg,
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event(¤t_context, msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1158,7 +1171,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
.started_completed(TurnItem::UserMessage(UserMessageItem::new(&items)))
|
||||
.await;
|
||||
|
||||
sess.spawn_task(Arc::clone(¤t_context), sub.id, items, RegularTask)
|
||||
sess.spawn_task(Arc::clone(¤t_context), items, RegularTask)
|
||||
.await;
|
||||
previous_context = Some(current_context);
|
||||
}
|
||||
@@ -1216,7 +1229,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
),
|
||||
};
|
||||
|
||||
sess_clone.send_event(event).await;
|
||||
sess_clone.send_event_raw(event).await;
|
||||
});
|
||||
}
|
||||
Op::ListMcpTools => {
|
||||
@@ -1249,7 +1262,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
},
|
||||
),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
Op::ListCustomPrompts => {
|
||||
let sub_id = sub.id.clone();
|
||||
@@ -1267,10 +1280,12 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
custom_prompts,
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
Op::Compact => {
|
||||
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
|
||||
let turn_context = sess
|
||||
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
|
||||
.await;
|
||||
// Attempt to inject input into current task
|
||||
if let Err(items) = sess
|
||||
.inject_input(vec![UserInput::Text {
|
||||
@@ -1278,7 +1293,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
}])
|
||||
.await
|
||||
{
|
||||
sess.spawn_task(Arc::clone(&turn_context), sub.id, items, CompactTask)
|
||||
sess.spawn_task(Arc::clone(&turn_context), items, CompactTask)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@@ -1302,14 +1317,14 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
message: "Failed to shutdown rollout recorder".to_string(),
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
let event = Event {
|
||||
id: sub.id.clone(),
|
||||
msg: EventMsg::ShutdownComplete,
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event_raw(event).await;
|
||||
break;
|
||||
}
|
||||
Op::GetPath => {
|
||||
@@ -1337,10 +1352,12 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
path,
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
Op::Review { review_request } => {
|
||||
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
|
||||
let turn_context = sess
|
||||
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
|
||||
.await;
|
||||
spawn_review_thread(
|
||||
sess.clone(),
|
||||
config.clone(),
|
||||
@@ -1416,6 +1433,7 @@ async fn spawn_review_thread(
|
||||
);
|
||||
|
||||
let review_turn_context = TurnContext {
|
||||
sub_id: sub_id.to_string(),
|
||||
client,
|
||||
tools_config,
|
||||
user_instructions: None,
|
||||
@@ -1439,17 +1457,11 @@ async fn spawn_review_thread(
|
||||
text: review_prompt,
|
||||
}];
|
||||
let tc = Arc::new(review_turn_context);
|
||||
|
||||
// Clone sub_id for the upcoming announcement before moving it into the task.
|
||||
let sub_id_for_event = sub_id.clone();
|
||||
sess.spawn_task(tc.clone(), sub_id, input, ReviewTask).await;
|
||||
sess.spawn_task(tc.clone(), input, ReviewTask).await;
|
||||
|
||||
// Announce entering review mode so UIs can switch modes.
|
||||
sess.send_event(Event {
|
||||
id: sub_id_for_event,
|
||||
msg: EventMsg::EnteredReviewMode(review_request),
|
||||
})
|
||||
.await;
|
||||
sess.send_event(&tc, EventMsg::EnteredReviewMode(review_request))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Takes a user message as input and runs a loop where, at each turn, the model
|
||||
@@ -1472,7 +1484,6 @@ async fn spawn_review_thread(
|
||||
pub(crate) async fn run_task(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
sub_id: String,
|
||||
input: Vec<UserInput>,
|
||||
task_kind: TaskKind,
|
||||
cancellation_token: CancellationToken,
|
||||
@@ -1480,13 +1491,10 @@ pub(crate) async fn run_task(
|
||||
if input.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::TaskStarted(TaskStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
let event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
// For review threads, keep an isolated in-memory history so the
|
||||
@@ -1557,7 +1565,6 @@ pub(crate) async fn run_task(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
sub_id.clone(),
|
||||
turn_input,
|
||||
task_kind,
|
||||
cancellation_token.child_token(),
|
||||
@@ -1689,15 +1696,12 @@ pub(crate) async fn run_task(
|
||||
let current_tokens = total_usage_tokens
|
||||
.map(|tokens| tokens.to_string())
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: format!(
|
||||
"Conversation is still above the token limit after automatic summarization (limit {limit_str}, current {current_tokens}). Please start a new session or trim your input."
|
||||
),
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
let event = EventMsg::Error(ErrorEvent {
|
||||
message: format!(
|
||||
"Conversation is still above the token limit after automatic summarization (limit {limit_str}, current {current_tokens}). Please start a new session or trim your input."
|
||||
),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
break;
|
||||
}
|
||||
auto_compact_recently_attempted = true;
|
||||
@@ -1714,7 +1718,7 @@ pub(crate) async fn run_task(
|
||||
sess.notifier()
|
||||
.notify(&UserNotification::AgentTurnComplete {
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
turn_id: sub_id.clone(),
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
cwd: turn_context.cwd.display().to_string(),
|
||||
input_messages: turn_input_messages,
|
||||
last_assistant_message: last_agent_message.clone(),
|
||||
@@ -1729,13 +1733,10 @@ pub(crate) async fn run_task(
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Turn error: {e:#}");
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: e.to_string(),
|
||||
}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
let event = EventMsg::Error(ErrorEvent {
|
||||
message: e.to_string(),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
// let the user continue the conversation
|
||||
break;
|
||||
}
|
||||
@@ -1752,7 +1753,7 @@ pub(crate) async fn run_task(
|
||||
if turn_context.is_review_mode {
|
||||
exit_review_mode(
|
||||
sess.clone(),
|
||||
sub_id.clone(),
|
||||
Arc::clone(&turn_context),
|
||||
last_agent_message.as_deref().map(parse_review_output_event),
|
||||
)
|
||||
.await;
|
||||
@@ -1790,7 +1791,6 @@ async fn run_turn(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
sub_id: String,
|
||||
input: Vec<ResponseItem>,
|
||||
task_kind: TaskKind,
|
||||
cancellation_token: CancellationToken,
|
||||
@@ -1821,7 +1821,6 @@ async fn run_turn(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
&sub_id,
|
||||
&prompt,
|
||||
task_kind,
|
||||
cancellation_token.child_token(),
|
||||
@@ -1834,13 +1833,14 @@ async fn run_turn(
|
||||
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
|
||||
Err(e @ CodexErr::Fatal(_)) => return Err(e),
|
||||
Err(e @ CodexErr::ContextWindowExceeded) => {
|
||||
sess.set_total_tokens_full(&sub_id, &turn_context).await;
|
||||
sess.set_total_tokens_full(turn_context.as_ref()).await;
|
||||
return Err(e);
|
||||
}
|
||||
Err(CodexErr::UsageLimitReached(e)) => {
|
||||
let rate_limits = e.rate_limits.clone();
|
||||
if let Some(rate_limits) = rate_limits {
|
||||
sess.update_rate_limits(&sub_id, rate_limits).await;
|
||||
sess.update_rate_limits(turn_context.as_ref(), rate_limits)
|
||||
.await;
|
||||
}
|
||||
return Err(CodexErr::UsageLimitReached(e));
|
||||
}
|
||||
@@ -1862,7 +1862,7 @@ async fn run_turn(
|
||||
// user understands what is happening instead of staring
|
||||
// at a seemingly frozen screen.
|
||||
sess.notify_stream_error(
|
||||
&sub_id,
|
||||
turn_context.as_ref(),
|
||||
format!("Re-connecting... {retries}/{max_retries}"),
|
||||
)
|
||||
.await;
|
||||
@@ -1898,7 +1898,6 @@ async fn try_run_turn(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
sub_id: &str,
|
||||
prompt: &Prompt,
|
||||
task_kind: TaskKind,
|
||||
cancellation_token: CancellationToken,
|
||||
@@ -1979,7 +1978,6 @@ async fn try_run_turn(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
sub_id.to_string(),
|
||||
);
|
||||
let mut output: FuturesOrdered<BoxFuture<CodexResult<ProcessedResponseItem>>> =
|
||||
FuturesOrdered::new();
|
||||
@@ -2028,7 +2026,6 @@ async fn try_run_turn(
|
||||
let response = handle_non_tool_response_item(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
sub_id,
|
||||
item.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -2077,7 +2074,7 @@ async fn try_run_turn(
|
||||
let _ = sess
|
||||
.tx_event
|
||||
.send(Event {
|
||||
id: sub_id.to_string(),
|
||||
id: turn_context.sub_id.clone(),
|
||||
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
|
||||
})
|
||||
.await;
|
||||
@@ -2085,13 +2082,14 @@ async fn try_run_turn(
|
||||
ResponseEvent::RateLimits(snapshot) => {
|
||||
// Update internal state with latest rate limits, but defer sending until
|
||||
// token usage is available to avoid duplicate TokenCount events.
|
||||
sess.update_rate_limits(sub_id, snapshot).await;
|
||||
sess.update_rate_limits(turn_context.as_ref(), snapshot)
|
||||
.await;
|
||||
}
|
||||
ResponseEvent::Completed {
|
||||
response_id: _,
|
||||
token_usage,
|
||||
} => {
|
||||
sess.update_token_usage_info(sub_id, turn_context.as_ref(), token_usage.as_ref())
|
||||
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
|
||||
.await;
|
||||
|
||||
let processed_items = output
|
||||
@@ -2105,11 +2103,7 @@ async fn try_run_turn(
|
||||
};
|
||||
if let Ok(Some(unified_diff)) = unified_diff {
|
||||
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event(&turn_context, msg).await;
|
||||
}
|
||||
|
||||
let result = TurnRunResult {
|
||||
@@ -2123,38 +2117,27 @@ async fn try_run_turn(
|
||||
// In review child threads, suppress assistant text deltas; the
|
||||
// UI will show a selection popup from the final ReviewOutput.
|
||||
if !turn_context.is_review_mode {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta });
|
||||
sess.send_event(&turn_context, event).await;
|
||||
} else {
|
||||
trace!("suppressing OutputTextDelta in review mode");
|
||||
}
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta(delta) => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
let event = EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta });
|
||||
sess.send_event(&turn_context, event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryPartAdded => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
let event =
|
||||
EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningContentDelta(delta) => {
|
||||
if sess.show_raw_agent_reasoning() {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningRawContentDelta(
|
||||
AgentReasoningRawContentDeltaEvent { delta },
|
||||
),
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
let event = EventMsg::AgentReasoningRawContentDelta(
|
||||
AgentReasoningRawContentDeltaEvent { delta },
|
||||
);
|
||||
sess.send_event(&turn_context, event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2164,7 +2147,6 @@ async fn try_run_turn(
|
||||
async fn handle_non_tool_response_item(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
sub_id: &str,
|
||||
item: ResponseItem,
|
||||
) -> CodexResult<Option<ResponseInputItem>> {
|
||||
debug!(?item, "Output item");
|
||||
@@ -2181,11 +2163,7 @@ async fn handle_non_tool_response_item(
|
||||
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning()),
|
||||
};
|
||||
for msg in msgs {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg,
|
||||
};
|
||||
sess.send_event(event).await;
|
||||
sess.send_event(&turn_context, msg).await;
|
||||
}
|
||||
}
|
||||
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
|
||||
@@ -2255,16 +2233,13 @@ fn convert_call_tool_result_to_function_call_output_payload(
|
||||
/// and records a developer message with the review output.
|
||||
pub(crate) async fn exit_review_mode(
|
||||
session: Arc<Session>,
|
||||
task_sub_id: String,
|
||||
turn_context: Arc<TurnContext>,
|
||||
review_output: Option<ReviewOutputEvent>,
|
||||
) {
|
||||
let event = Event {
|
||||
id: task_sub_id,
|
||||
msg: EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
|
||||
review_output: review_output.clone(),
|
||||
}),
|
||||
};
|
||||
session.send_event(event).await;
|
||||
let event = EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
|
||||
review_output: review_output.clone(),
|
||||
});
|
||||
session.send_event(turn_context.as_ref(), event).await;
|
||||
|
||||
let mut user_message = String::new();
|
||||
if let Some(out) = review_output {
|
||||
@@ -2676,6 +2651,7 @@ mod tests {
|
||||
&session_configuration,
|
||||
conversation_id,
|
||||
tx_event.clone(),
|
||||
"turn_id".to_string(),
|
||||
);
|
||||
|
||||
let session = Session {
|
||||
@@ -2744,6 +2720,7 @@ mod tests {
|
||||
&session_configuration,
|
||||
conversation_id,
|
||||
tx_event.clone(),
|
||||
"turn_id".to_string(),
|
||||
));
|
||||
|
||||
let session = Arc::new(Session {
|
||||
@@ -2774,7 +2751,6 @@ mod tests {
|
||||
self: Arc<Self>,
|
||||
_session: Arc<SessionTaskContext>,
|
||||
_ctx: Arc<TurnContext>,
|
||||
_sub_id: String,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
@@ -2787,9 +2763,9 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, sub_id: &str) {
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
if let TaskKind::Review = self.kind {
|
||||
exit_review_mode(session.clone_session(), sub_id.to_string(), None).await;
|
||||
exit_review_mode(session.clone_session(), ctx, None).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2798,13 +2774,11 @@ mod tests {
|
||||
#[test_log::test]
|
||||
async fn abort_regular_task_emits_turn_aborted_only() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx();
|
||||
let sub_id = "sub-regular".to_string();
|
||||
let input = vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
}];
|
||||
sess.spawn_task(
|
||||
Arc::clone(&tc),
|
||||
sub_id.clone(),
|
||||
input,
|
||||
NeverEndingTask {
|
||||
kind: TaskKind::Regular,
|
||||
@@ -2829,13 +2803,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn abort_gracefuly_emits_turn_aborted_only() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx();
|
||||
let sub_id = "sub-regular".to_string();
|
||||
let input = vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
}];
|
||||
sess.spawn_task(
|
||||
Arc::clone(&tc),
|
||||
sub_id.clone(),
|
||||
input,
|
||||
NeverEndingTask {
|
||||
kind: TaskKind::Regular,
|
||||
@@ -2857,13 +2829,11 @@ mod tests {
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx();
|
||||
let sub_id = "sub-review".to_string();
|
||||
let input = vec![UserInput::Text {
|
||||
text: "start review".to_string(),
|
||||
}];
|
||||
sess.spawn_task(
|
||||
Arc::clone(&tc),
|
||||
sub_id.clone(),
|
||||
input,
|
||||
NeverEndingTask {
|
||||
kind: TaskKind::Review,
|
||||
@@ -2935,7 +2905,6 @@ mod tests {
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn_context),
|
||||
tracker,
|
||||
"sub-id".to_string(),
|
||||
call,
|
||||
)
|
||||
.await
|
||||
@@ -3095,7 +3064,6 @@ mod tests {
|
||||
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
|
||||
|
||||
let tool_name = "shell";
|
||||
let sub_id = "test-sub".to_string();
|
||||
let call_id = "test-call".to_string();
|
||||
|
||||
let resp = handle_container_exec_with_params(
|
||||
@@ -3104,7 +3072,6 @@ mod tests {
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn_context),
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
sub_id,
|
||||
call_id,
|
||||
)
|
||||
.await;
|
||||
@@ -3132,7 +3099,6 @@ mod tests {
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn_context),
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
"test-sub".to_string(),
|
||||
"test-call-2".to_string(),
|
||||
)
|
||||
.await;
|
||||
|
||||
Reference in New Issue
Block a user