chore: use tokio mutex and async function to prevent blocking a worker (#3850)
### Why Use `tokio::sync::Mutex` `std::sync::Mutex` are not _async-aware_. As a result, they will block the entire thread instead of just yielding the task. Furthermore they can be poisoned which is not the case of `tokio` Mutex. This allows the Tokio runtime to continue running other tasks while waiting for the lock, preventing deadlocks and performance bottlenecks. In general, this is preferred in async environment
This commit is contained in:
@@ -3,8 +3,6 @@ use std::collections::HashMap;
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::Mutex;
|
|
||||||
use std::sync::MutexGuard;
|
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@@ -31,6 +29,7 @@ use mcp_types::CallToolResult;
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tokio::task::AbortHandle;
|
use tokio::task::AbortHandle;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
@@ -135,21 +134,6 @@ mod compact;
|
|||||||
use self::compact::build_compacted_history;
|
use self::compact::build_compacted_history;
|
||||||
use self::compact::collect_user_messages;
|
use self::compact::collect_user_messages;
|
||||||
|
|
||||||
// A convenience extension trait for acquiring mutex locks where poisoning is
|
|
||||||
// unrecoverable and should abort the program. This avoids scattered `.unwrap()`
|
|
||||||
// calls on `lock()` while still surfacing a clear panic message when a lock is
|
|
||||||
// poisoned.
|
|
||||||
trait MutexExt<T> {
|
|
||||||
fn lock_unchecked(&self) -> MutexGuard<'_, T>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> MutexExt<T> for Mutex<T> {
|
|
||||||
fn lock_unchecked(&self) -> MutexGuard<'_, T> {
|
|
||||||
#[expect(clippy::expect_used)]
|
|
||||||
self.lock().expect("poisoned lock")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The high-level interface to the Codex system.
|
/// The high-level interface to the Codex system.
|
||||||
/// It operates as a queue pair where you send submissions and receive events.
|
/// It operates as a queue pair where you send submissions and receive events.
|
||||||
pub struct Codex {
|
pub struct Codex {
|
||||||
@@ -272,7 +256,6 @@ struct State {
|
|||||||
pending_input: Vec<ResponseInputItem>,
|
pending_input: Vec<ResponseInputItem>,
|
||||||
history: ConversationHistory,
|
history: ConversationHistory,
|
||||||
token_info: Option<TokenUsageInfo>,
|
token_info: Option<TokenUsageInfo>,
|
||||||
next_internal_sub_id: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Context for an initialized model agent
|
/// Context for an initialized model agent
|
||||||
@@ -298,6 +281,7 @@ pub(crate) struct Session {
|
|||||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||||
user_shell: shell::Shell,
|
user_shell: shell::Shell,
|
||||||
show_raw_agent_reasoning: bool,
|
show_raw_agent_reasoning: bool,
|
||||||
|
next_internal_sub_id: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The context needed for a single turn of the conversation.
|
/// The context needed for a single turn of the conversation.
|
||||||
@@ -500,6 +484,7 @@ impl Session {
|
|||||||
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
||||||
user_shell: default_shell,
|
user_shell: default_shell,
|
||||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||||
|
next_internal_sub_id: AtomicU64::new(0),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||||
@@ -528,16 +513,16 @@ impl Session {
|
|||||||
Ok((sess, turn_context))
|
Ok((sess, turn_context))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_task(&self, task: AgentTask) {
|
pub async fn set_task(&self, task: AgentTask) {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
if let Some(current_task) = state.current_task.take() {
|
if let Some(current_task) = state.current_task.take() {
|
||||||
current_task.abort(TurnAbortReason::Replaced);
|
current_task.abort(TurnAbortReason::Replaced);
|
||||||
}
|
}
|
||||||
state.current_task = Some(task);
|
state.current_task = Some(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_task(&self, sub_id: &str) {
|
pub async fn remove_task(&self, sub_id: &str) {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
if let Some(task) = &state.current_task
|
if let Some(task) = &state.current_task
|
||||||
&& task.sub_id == sub_id
|
&& task.sub_id == sub_id
|
||||||
{
|
{
|
||||||
@@ -546,9 +531,9 @@ impl Session {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn next_internal_sub_id(&self) -> String {
|
fn next_internal_sub_id(&self) -> String {
|
||||||
let mut state = self.state.lock_unchecked();
|
let id = self
|
||||||
let id = state.next_internal_sub_id;
|
.next_internal_sub_id
|
||||||
state.next_internal_sub_id += 1;
|
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
format!("auto-compact-{id}")
|
format!("auto-compact-{id}")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -571,7 +556,7 @@ impl Session {
|
|||||||
let reconstructed_history =
|
let reconstructed_history =
|
||||||
self.reconstruct_history_from_rollout(turn_context, &rollout_items);
|
self.reconstruct_history_from_rollout(turn_context, &rollout_items);
|
||||||
if !reconstructed_history.is_empty() {
|
if !reconstructed_history.is_empty() {
|
||||||
self.record_into_history(&reconstructed_history);
|
self.record_into_history(&reconstructed_history).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If persisting, persist all rollout items as-is (recorder filters)
|
// If persisting, persist all rollout items as-is (recorder filters)
|
||||||
@@ -604,7 +589,7 @@ impl Session {
|
|||||||
let (tx_approve, rx_approve) = oneshot::channel();
|
let (tx_approve, rx_approve) = oneshot::channel();
|
||||||
let event_id = sub_id.clone();
|
let event_id = sub_id.clone();
|
||||||
let prev_entry = {
|
let prev_entry = {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
state.pending_approvals.insert(sub_id, tx_approve)
|
state.pending_approvals.insert(sub_id, tx_approve)
|
||||||
};
|
};
|
||||||
if prev_entry.is_some() {
|
if prev_entry.is_some() {
|
||||||
@@ -636,7 +621,7 @@ impl Session {
|
|||||||
let (tx_approve, rx_approve) = oneshot::channel();
|
let (tx_approve, rx_approve) = oneshot::channel();
|
||||||
let event_id = sub_id.clone();
|
let event_id = sub_id.clone();
|
||||||
let prev_entry = {
|
let prev_entry = {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
state.pending_approvals.insert(sub_id, tx_approve)
|
state.pending_approvals.insert(sub_id, tx_approve)
|
||||||
};
|
};
|
||||||
if prev_entry.is_some() {
|
if prev_entry.is_some() {
|
||||||
@@ -656,9 +641,9 @@ impl Session {
|
|||||||
rx_approve
|
rx_approve
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
|
pub async fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
|
||||||
let entry = {
|
let entry = {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
state.pending_approvals.remove(sub_id)
|
state.pending_approvals.remove(sub_id)
|
||||||
};
|
};
|
||||||
match entry {
|
match entry {
|
||||||
@@ -671,15 +656,15 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_approved_command(&self, cmd: Vec<String>) {
|
pub async fn add_approved_command(&self, cmd: Vec<String>) {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
state.approved_commands.insert(cmd);
|
state.approved_commands.insert(cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Records input items: always append to conversation history and
|
/// Records input items: always append to conversation history and
|
||||||
/// persist these response items to rollout.
|
/// persist these response items to rollout.
|
||||||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||||
self.record_into_history(items);
|
self.record_into_history(items).await;
|
||||||
self.persist_rollout_response_items(items).await;
|
self.persist_rollout_response_items(items).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -711,11 +696,9 @@ impl Session {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Append ResponseItems to the in-memory conversation history only.
|
/// Append ResponseItems to the in-memory conversation history only.
|
||||||
fn record_into_history(&self, items: &[ResponseItem]) {
|
async fn record_into_history(&self, items: &[ResponseItem]) {
|
||||||
self.state
|
let mut state = self.state.lock().await;
|
||||||
.lock_unchecked()
|
state.history.record_items(items.iter());
|
||||||
.history
|
|
||||||
.record_items(items.iter());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
|
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
|
||||||
@@ -743,8 +726,8 @@ impl Session {
|
|||||||
|
|
||||||
async fn persist_rollout_items(&self, items: &[RolloutItem]) {
|
async fn persist_rollout_items(&self, items: &[RolloutItem]) {
|
||||||
let recorder = {
|
let recorder = {
|
||||||
let guard = self.rollout.lock_unchecked();
|
let guard = self.rollout.lock().await;
|
||||||
guard.as_ref().cloned()
|
guard.clone()
|
||||||
};
|
};
|
||||||
if let Some(rec) = recorder
|
if let Some(rec) = recorder
|
||||||
&& let Err(e) = rec.record_items(items).await
|
&& let Err(e) = rec.record_items(items).await
|
||||||
@@ -753,12 +736,12 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_token_usage_info(
|
async fn update_token_usage_info(
|
||||||
&self,
|
&self,
|
||||||
turn_context: &TurnContext,
|
turn_context: &TurnContext,
|
||||||
token_usage: &Option<TokenUsage>,
|
token_usage: &Option<TokenUsage>,
|
||||||
) -> Option<TokenUsageInfo> {
|
) -> Option<TokenUsageInfo> {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
let info = TokenUsageInfo::new_or_append(
|
let info = TokenUsageInfo::new_or_append(
|
||||||
&state.token_info,
|
&state.token_info,
|
||||||
token_usage,
|
token_usage,
|
||||||
@@ -973,13 +956,17 @@ impl Session {
|
|||||||
|
|
||||||
/// Build the full turn input by concatenating the current conversation
|
/// Build the full turn input by concatenating the current conversation
|
||||||
/// history with additional items for this turn.
|
/// history with additional items for this turn.
|
||||||
pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
|
pub async fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
|
||||||
[self.state.lock_unchecked().history.contents(), extra].concat()
|
let history = {
|
||||||
|
let state = self.state.lock().await;
|
||||||
|
state.history.contents()
|
||||||
|
};
|
||||||
|
[history, extra].concat()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the input if there was no task running to inject into
|
/// Returns the input if there was no task running to inject into
|
||||||
pub fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
|
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
if state.current_task.is_some() {
|
if state.current_task.is_some() {
|
||||||
state.pending_input.push(input.into());
|
state.pending_input.push(input.into());
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -988,8 +975,8 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
if state.pending_input.is_empty() {
|
if state.pending_input.is_empty() {
|
||||||
Vec::with_capacity(0)
|
Vec::with_capacity(0)
|
||||||
} else {
|
} else {
|
||||||
@@ -1011,9 +998,9 @@ impl Session {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn interrupt_task(&self) {
|
pub async fn interrupt_task(&self) {
|
||||||
info!("interrupt received: abort current task, if any");
|
info!("interrupt received: abort current task, if any");
|
||||||
let mut state = self.state.lock_unchecked();
|
let mut state = self.state.lock().await;
|
||||||
state.pending_approvals.clear();
|
state.pending_approvals.clear();
|
||||||
state.pending_input.clear();
|
state.pending_input.clear();
|
||||||
if let Some(task) = state.current_task.take() {
|
if let Some(task) = state.current_task.take() {
|
||||||
@@ -1021,6 +1008,16 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn interrupt_task_sync(&self) {
|
||||||
|
if let Ok(mut state) = self.state.try_lock() {
|
||||||
|
state.pending_approvals.clear();
|
||||||
|
state.pending_input.clear();
|
||||||
|
if let Some(task) = state.current_task.take() {
|
||||||
|
task.abort(TurnAbortReason::Interrupted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn the configured notifier (if any) with the given JSON payload as
|
/// Spawn the configured notifier (if any) with the given JSON payload as
|
||||||
/// the last argument. Failures are logged but otherwise ignored so that
|
/// the last argument. Failures are logged but otherwise ignored so that
|
||||||
/// notification issues do not interfere with the main workflow.
|
/// notification issues do not interfere with the main workflow.
|
||||||
@@ -1053,7 +1050,7 @@ impl Session {
|
|||||||
|
|
||||||
impl Drop for Session {
|
impl Drop for Session {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.interrupt_task();
|
self.interrupt_task_sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1184,7 +1181,7 @@ async fn submission_loop(
|
|||||||
debug!(?sub, "Submission");
|
debug!(?sub, "Submission");
|
||||||
match sub.op {
|
match sub.op {
|
||||||
Op::Interrupt => {
|
Op::Interrupt => {
|
||||||
sess.interrupt_task();
|
sess.interrupt_task().await;
|
||||||
}
|
}
|
||||||
Op::OverrideTurnContext {
|
Op::OverrideTurnContext {
|
||||||
cwd,
|
cwd,
|
||||||
@@ -1277,11 +1274,11 @@ async fn submission_loop(
|
|||||||
}
|
}
|
||||||
Op::UserInput { items } => {
|
Op::UserInput { items } => {
|
||||||
// attempt to inject input into current task
|
// attempt to inject input into current task
|
||||||
if let Err(items) = sess.inject_input(items) {
|
if let Err(items) = sess.inject_input(items).await {
|
||||||
// no current task, spawn a new one
|
// no current task, spawn a new one
|
||||||
let task =
|
let task =
|
||||||
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
|
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
|
||||||
sess.set_task(task);
|
sess.set_task(task).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Op::UserTurn {
|
Op::UserTurn {
|
||||||
@@ -1294,7 +1291,7 @@ async fn submission_loop(
|
|||||||
summary,
|
summary,
|
||||||
} => {
|
} => {
|
||||||
// attempt to inject input into current task
|
// attempt to inject input into current task
|
||||||
if let Err(items) = sess.inject_input(items) {
|
if let Err(items) = sess.inject_input(items).await {
|
||||||
// Derive a fresh TurnContext for this turn using the provided overrides.
|
// Derive a fresh TurnContext for this turn using the provided overrides.
|
||||||
let provider = turn_context.client.get_provider();
|
let provider = turn_context.client.get_provider();
|
||||||
let auth_manager = turn_context.client.get_auth_manager();
|
let auth_manager = turn_context.client.get_auth_manager();
|
||||||
@@ -1360,20 +1357,20 @@ async fn submission_loop(
|
|||||||
// no current task, spawn a new one with the per‑turn context
|
// no current task, spawn a new one with the per‑turn context
|
||||||
let task =
|
let task =
|
||||||
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
|
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
|
||||||
sess.set_task(task);
|
sess.set_task(task).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Op::ExecApproval { id, decision } => match decision {
|
Op::ExecApproval { id, decision } => match decision {
|
||||||
ReviewDecision::Abort => {
|
ReviewDecision::Abort => {
|
||||||
sess.interrupt_task();
|
sess.interrupt_task().await;
|
||||||
}
|
}
|
||||||
other => sess.notify_approval(&id, other),
|
other => sess.notify_approval(&id, other).await,
|
||||||
},
|
},
|
||||||
Op::PatchApproval { id, decision } => match decision {
|
Op::PatchApproval { id, decision } => match decision {
|
||||||
ReviewDecision::Abort => {
|
ReviewDecision::Abort => {
|
||||||
sess.interrupt_task();
|
sess.interrupt_task().await;
|
||||||
}
|
}
|
||||||
other => sess.notify_approval(&id, other),
|
other => sess.notify_approval(&id, other).await,
|
||||||
},
|
},
|
||||||
Op::AddToHistory { text } => {
|
Op::AddToHistory { text } => {
|
||||||
let id = sess.conversation_id;
|
let id = sess.conversation_id;
|
||||||
@@ -1452,15 +1449,19 @@ async fn submission_loop(
|
|||||||
}
|
}
|
||||||
Op::Compact => {
|
Op::Compact => {
|
||||||
// Attempt to inject input into current task
|
// Attempt to inject input into current task
|
||||||
if let Err(items) = sess.inject_input(vec![InputItem::Text {
|
if let Err(items) = sess
|
||||||
text: compact::COMPACT_TRIGGER_TEXT.to_string(),
|
.inject_input(vec![InputItem::Text {
|
||||||
}]) {
|
text: compact::COMPACT_TRIGGER_TEXT.to_string(),
|
||||||
|
}])
|
||||||
|
.await
|
||||||
|
{
|
||||||
compact::spawn_compact_task(
|
compact::spawn_compact_task(
|
||||||
sess.clone(),
|
sess.clone(),
|
||||||
Arc::clone(&turn_context),
|
Arc::clone(&turn_context),
|
||||||
sub.id,
|
sub.id,
|
||||||
items,
|
items,
|
||||||
);
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Op::Shutdown => {
|
Op::Shutdown => {
|
||||||
@@ -1468,7 +1469,10 @@ async fn submission_loop(
|
|||||||
|
|
||||||
// Gracefully flush and shutdown rollout recorder on session end so tests
|
// Gracefully flush and shutdown rollout recorder on session end so tests
|
||||||
// that inspect the rollout file do not race with the background writer.
|
// that inspect the rollout file do not race with the background writer.
|
||||||
let recorder_opt = sess.rollout.lock_unchecked().take();
|
let recorder_opt = {
|
||||||
|
let mut guard = sess.rollout.lock().await;
|
||||||
|
guard.take()
|
||||||
|
};
|
||||||
if let Some(rec) = recorder_opt
|
if let Some(rec) = recorder_opt
|
||||||
&& let Err(e) = rec.shutdown().await
|
&& let Err(e) = rec.shutdown().await
|
||||||
{
|
{
|
||||||
@@ -1493,7 +1497,7 @@ async fn submission_loop(
|
|||||||
let sub_id = sub.id.clone();
|
let sub_id = sub.id.clone();
|
||||||
// Flush rollout writes before returning the path so readers observe a consistent file.
|
// Flush rollout writes before returning the path so readers observe a consistent file.
|
||||||
let (path, rec_opt) = {
|
let (path, rec_opt) = {
|
||||||
let guard = sess.rollout.lock_unchecked();
|
let guard = sess.rollout.lock().await;
|
||||||
match guard.as_ref() {
|
match guard.as_ref() {
|
||||||
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
|
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
|
||||||
None => {
|
None => {
|
||||||
@@ -1604,7 +1608,7 @@ async fn spawn_review_thread(
|
|||||||
// Clone sub_id for the upcoming announcement before moving it into the task.
|
// Clone sub_id for the upcoming announcement before moving it into the task.
|
||||||
let sub_id_for_event = sub_id.clone();
|
let sub_id_for_event = sub_id.clone();
|
||||||
let task = AgentTask::review(sess.clone(), tc.clone(), sub_id, input);
|
let task = AgentTask::review(sess.clone(), tc.clone(), sub_id, input);
|
||||||
sess.set_task(task);
|
sess.set_task(task).await;
|
||||||
|
|
||||||
// Announce entering review mode so UIs can switch modes.
|
// Announce entering review mode so UIs can switch modes.
|
||||||
sess.send_event(Event {
|
sess.send_event(Event {
|
||||||
@@ -1675,6 +1679,7 @@ async fn run_task(
|
|||||||
// may support this, the model might not.
|
// may support this, the model might not.
|
||||||
let pending_input = sess
|
let pending_input = sess
|
||||||
.get_pending_input()
|
.get_pending_input()
|
||||||
|
.await
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(ResponseItem::from)
|
.map(ResponseItem::from)
|
||||||
.collect::<Vec<ResponseItem>>();
|
.collect::<Vec<ResponseItem>>();
|
||||||
@@ -1696,7 +1701,7 @@ async fn run_task(
|
|||||||
review_thread_history.clone()
|
review_thread_history.clone()
|
||||||
} else {
|
} else {
|
||||||
sess.record_conversation_items(&pending_input).await;
|
sess.record_conversation_items(&pending_input).await;
|
||||||
sess.turn_input_with_history(pending_input)
|
sess.turn_input_with_history(pending_input).await
|
||||||
};
|
};
|
||||||
|
|
||||||
let turn_input_messages: Vec<String> = turn_input
|
let turn_input_messages: Vec<String> = turn_input
|
||||||
@@ -1908,7 +1913,7 @@ async fn run_task(
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
sess.remove_task(&sub_id);
|
sess.remove_task(&sub_id).await;
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: sub_id,
|
id: sub_id,
|
||||||
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
|
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
|
||||||
@@ -2141,7 +2146,9 @@ async fn try_run_turn(
|
|||||||
response_id: _,
|
response_id: _,
|
||||||
token_usage,
|
token_usage,
|
||||||
} => {
|
} => {
|
||||||
let info = sess.update_token_usage_info(turn_context, &token_usage);
|
let info = sess
|
||||||
|
.update_token_usage_info(turn_context, &token_usage)
|
||||||
|
.await;
|
||||||
let _ = sess
|
let _ = sess
|
||||||
.send_event(Event {
|
.send_event(Event {
|
||||||
id: sub_id.to_string(),
|
id: sub_id.to_string(),
|
||||||
@@ -2475,7 +2482,10 @@ async fn handle_function_call(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let abs = turn_context.resolve_path(Some(args.path));
|
let abs = turn_context.resolve_path(Some(args.path));
|
||||||
let output = match sess.inject_input(vec![InputItem::LocalImage { path: abs }]) {
|
let output = match sess
|
||||||
|
.inject_input(vec![InputItem::LocalImage { path: abs }])
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(()) => FunctionCallOutputPayload {
|
Ok(()) => FunctionCallOutputPayload {
|
||||||
content: "attached local image path".to_string(),
|
content: "attached local image path".to_string(),
|
||||||
success: Some(true),
|
success: Some(true),
|
||||||
@@ -2789,7 +2799,7 @@ async fn handle_container_exec_with_params(
|
|||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let safety = {
|
let safety = {
|
||||||
let state = sess.state.lock_unchecked();
|
let state = sess.state.lock().await;
|
||||||
assess_command_safety(
|
assess_command_safety(
|
||||||
¶ms.command,
|
¶ms.command,
|
||||||
turn_context.approval_policy,
|
turn_context.approval_policy,
|
||||||
@@ -2818,7 +2828,7 @@ async fn handle_container_exec_with_params(
|
|||||||
match rx_approve.await.unwrap_or_default() {
|
match rx_approve.await.unwrap_or_default() {
|
||||||
ReviewDecision::Approved => (),
|
ReviewDecision::Approved => (),
|
||||||
ReviewDecision::ApprovedForSession => {
|
ReviewDecision::ApprovedForSession => {
|
||||||
sess.add_approved_command(params.command.clone());
|
sess.add_approved_command(params.command.clone()).await;
|
||||||
}
|
}
|
||||||
ReviewDecision::Denied | ReviewDecision::Abort => {
|
ReviewDecision::Denied | ReviewDecision::Abort => {
|
||||||
return ResponseInputItem::FunctionCallOutput {
|
return ResponseInputItem::FunctionCallOutput {
|
||||||
@@ -2991,7 +3001,7 @@ async fn handle_sandbox_error(
|
|||||||
// remainder of the session so future
|
// remainder of the session so future
|
||||||
// executions skip the sandbox directly.
|
// executions skip the sandbox directly.
|
||||||
// TODO(ragona): Isn't this a bug? It always saves the command in an | fork?
|
// TODO(ragona): Isn't this a bug? It always saves the command in an | fork?
|
||||||
sess.add_approved_command(params.command.clone());
|
sess.add_approved_command(params.command.clone()).await;
|
||||||
// Inform UI we are retrying without sandbox.
|
// Inform UI we are retrying without sandbox.
|
||||||
sess.notify_background_event(&sub_id, "retrying command without sandbox")
|
sess.notify_background_event(&sub_id, "retrying command without sandbox")
|
||||||
.await;
|
.await;
|
||||||
@@ -3356,7 +3366,7 @@ mod tests {
|
|||||||
}),
|
}),
|
||||||
));
|
));
|
||||||
|
|
||||||
let actual = session.state.lock_unchecked().history.contents();
|
let actual = tokio_test::block_on(async { session.state.lock().await.history.contents() });
|
||||||
assert_eq!(expected, actual);
|
assert_eq!(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3369,7 +3379,7 @@ mod tests {
|
|||||||
session.record_initial_history(&turn_context, InitialHistory::Forked(rollout_items)),
|
session.record_initial_history(&turn_context, InitialHistory::Forked(rollout_items)),
|
||||||
);
|
);
|
||||||
|
|
||||||
let actual = session.state.lock_unchecked().history.contents();
|
let actual = tokio_test::block_on(async { session.state.lock().await.history.contents() });
|
||||||
assert_eq!(expected, actual);
|
assert_eq!(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3611,6 +3621,7 @@ mod tests {
|
|||||||
codex_linux_sandbox_exe: None,
|
codex_linux_sandbox_exe: None,
|
||||||
user_shell: shell::Shell::Unknown,
|
user_shell: shell::Shell::Unknown,
|
||||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||||
|
next_internal_sub_id: AtomicU64::new(0),
|
||||||
};
|
};
|
||||||
(session, turn_context)
|
(session, turn_context)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::AgentTask;
|
use super::AgentTask;
|
||||||
use super::MutexExt;
|
|
||||||
use super::Session;
|
use super::Session;
|
||||||
use super::TurnContext;
|
use super::TurnContext;
|
||||||
use super::get_last_assistant_message_from_turn;
|
use super::get_last_assistant_message_from_turn;
|
||||||
@@ -37,7 +36,7 @@ struct HistoryBridgeTemplate<'a> {
|
|||||||
summary_text: &'a str,
|
summary_text: &'a str,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn spawn_compact_task(
|
pub(super) async fn spawn_compact_task(
|
||||||
sess: Arc<Session>,
|
sess: Arc<Session>,
|
||||||
turn_context: Arc<TurnContext>,
|
turn_context: Arc<TurnContext>,
|
||||||
sub_id: String,
|
sub_id: String,
|
||||||
@@ -50,7 +49,7 @@ pub(super) fn spawn_compact_task(
|
|||||||
input,
|
input,
|
||||||
SUMMARIZATION_PROMPT.to_string(),
|
SUMMARIZATION_PROMPT.to_string(),
|
||||||
);
|
);
|
||||||
sess.set_task(task);
|
sess.set_task(task).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn run_inline_auto_compact_task(
|
pub(super) async fn run_inline_auto_compact_task(
|
||||||
@@ -114,7 +113,9 @@ async fn run_compact_task_inner(
|
|||||||
) {
|
) {
|
||||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||||
let instructions_override = compact_instructions;
|
let instructions_override = compact_instructions;
|
||||||
let turn_input = sess.turn_input_with_history(vec![initial_input_for_turn.clone().into()]);
|
let turn_input = sess
|
||||||
|
.turn_input_with_history(vec![initial_input_for_turn.clone().into()])
|
||||||
|
.await;
|
||||||
|
|
||||||
let prompt = Prompt {
|
let prompt = Prompt {
|
||||||
input: turn_input,
|
input: turn_input,
|
||||||
@@ -173,10 +174,10 @@ async fn run_compact_task_inner(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if remove_task_on_completion {
|
if remove_task_on_completion {
|
||||||
sess.remove_task(&sub_id);
|
sess.remove_task(&sub_id).await;
|
||||||
}
|
}
|
||||||
let history_snapshot = {
|
let history_snapshot = {
|
||||||
let state = sess.state.lock_unchecked();
|
let state = sess.state.lock().await;
|
||||||
state.history.contents()
|
state.history.contents()
|
||||||
};
|
};
|
||||||
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
|
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
|
||||||
@@ -184,7 +185,7 @@ async fn run_compact_task_inner(
|
|||||||
let initial_context = sess.build_initial_context(turn_context.as_ref());
|
let initial_context = sess.build_initial_context(turn_context.as_ref());
|
||||||
let new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
|
let new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
|
||||||
{
|
{
|
||||||
let mut state = sess.state.lock_unchecked();
|
let mut state = sess.state.lock().await;
|
||||||
state.history.replace(new_history);
|
state.history.replace(new_history);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -288,7 +289,7 @@ async fn drain_to_completed(
|
|||||||
};
|
};
|
||||||
match event {
|
match event {
|
||||||
Ok(ResponseEvent::OutputItemDone(item)) => {
|
Ok(ResponseEvent::OutputItemDone(item)) => {
|
||||||
let mut state = sess.state.lock_unchecked();
|
let mut state = sess.state.lock().await;
|
||||||
state.history.record_items(std::slice::from_ref(&item));
|
state.history.record_items(std::slice::from_ref(&item));
|
||||||
}
|
}
|
||||||
Ok(ResponseEvent::Completed { .. }) => {
|
Ok(ResponseEvent::Completed { .. }) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user