chore: decompose submission loop (#5854)

This commit is contained in:
jif-oai
2025-10-28 15:23:46 +00:00
committed by GitHub
parent 266419217e
commit 5ba2a17576

View File

@@ -82,7 +82,6 @@ use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecApprovalRequestEvent;
use crate::protocol::ListCustomPromptsResponseEvent;
use crate::protocol::Op;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::ReviewDecision;
@@ -103,13 +102,10 @@ use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
use crate::state::TaskKind;
use crate::tasks::CompactTask;
use crate::tasks::GhostSnapshotTask;
use crate::tasks::RegularTask;
use crate::tasks::ReviewTask;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use crate::tasks::UndoTask;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::parallel::ToolCallRuntime;
@@ -125,7 +121,6 @@ use codex_async_utils::OrCancelExt;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::custom_prompts::CustomPrompt;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
@@ -1243,9 +1238,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
debug!(?sub, "Submission");
match sub.op {
match sub.op.clone() {
Op::Interrupt => {
sess.interrupt_task().await;
handlers::interrupt(&sess).await;
}
Op::OverrideTurnContext {
cwd,
@@ -1255,246 +1250,336 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
effort,
summary,
} => {
let updates = SessionSettingsUpdate {
cwd,
approval_policy,
sandbox_policy,
model,
reasoning_effort: effort,
reasoning_summary: summary,
..Default::default()
};
sess.update_settings(updates).await;
}
Op::UserInput { .. } | Op::UserTurn { .. } => {
let (items, updates) = match sub.op {
Op::UserTurn {
handlers::override_turn_context(
&sess,
SessionSettingsUpdate {
cwd,
approval_policy,
sandbox_policy,
model,
effort,
summary,
final_output_json_schema,
items,
} => (
items,
SessionSettingsUpdate {
cwd: Some(cwd),
approval_policy: Some(approval_policy),
sandbox_policy: Some(sandbox_policy),
model: Some(model),
reasoning_effort: Some(effort),
reasoning_summary: Some(summary),
final_output_json_schema: Some(final_output_json_schema),
},
),
Op::UserInput { items } => (items, SessionSettingsUpdate::default()),
_ => unreachable!(),
};
let current_context = sess.new_turn_with_sub_id(sub.id.clone(), updates).await;
current_context
.client
.get_otel_event_manager()
.user_prompt(&items);
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
if let Some(env_item) = sess
.build_environment_update_item(previous_context.as_ref(), &current_context)
{
sess.record_conversation_items(
&current_context,
std::slice::from_ref(&env_item),
)
.await;
}
sess.spawn_task(Arc::clone(&current_context), items, RegularTask)
.await;
previous_context = Some(current_context);
}
}
Op::ExecApproval { id, decision } => match decision {
ReviewDecision::Abort => {
sess.interrupt_task().await;
}
other => sess.notify_approval(&id, other).await,
},
Op::PatchApproval { id, decision } => match decision {
ReviewDecision::Abort => {
sess.interrupt_task().await;
}
other => sess.notify_approval(&id, other).await,
},
Op::AddToHistory { text } => {
let id = sess.conversation_id;
let config = config.clone();
tokio::spawn(async move {
if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await
{
warn!("failed to append to message history: {e}");
}
});
}
Op::GetHistoryEntryRequest { offset, log_id } => {
let config = config.clone();
let sess_clone = sess.clone();
let sub_id = sub.id.clone();
tokio::spawn(async move {
// Run lookup in blocking thread because it does file IO + locking.
let entry_opt = tokio::task::spawn_blocking(move || {
crate::message_history::lookup(log_id, offset, &config)
})
.await
.unwrap_or(None);
let event = Event {
id: sub_id,
msg: EventMsg::GetHistoryEntryResponse(
crate::protocol::GetHistoryEntryResponseEvent {
offset,
log_id,
entry: entry_opt.map(|e| {
codex_protocol::message_history::HistoryEntry {
conversation_id: e.session_id,
ts: e.ts,
text: e.text,
}
}),
},
),
};
sess_clone.send_event_raw(event).await;
});
}
Op::ListMcpTools => {
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
let tools = sess.services.mcp_connection_manager.list_all_tools();
let (auth_status_entries, resources, resource_templates) = tokio::join!(
compute_auth_statuses(
config.mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
),
sess.services.mcp_connection_manager.list_all_resources(),
sess.services
.mcp_connection_manager
.list_all_resource_templates()
);
let auth_statuses = auth_status_entries
.iter()
.map(|(name, entry)| (name.clone(), entry.auth_status))
.collect();
let event = Event {
id: sub_id,
msg: EventMsg::McpListToolsResponse(
crate::protocol::McpListToolsResponseEvent {
tools,
resources,
resource_templates,
auth_statuses,
},
),
};
sess.send_event_raw(event).await;
}
Op::ListCustomPrompts => {
let sub_id = sub.id.clone();
let custom_prompts: Vec<CustomPrompt> =
if let Some(dir) = crate::custom_prompts::default_prompts_dir() {
crate::custom_prompts::discover_prompts_in(&dir).await
} else {
Vec::new()
};
let event = Event {
id: sub_id,
msg: EventMsg::ListCustomPromptsResponse(ListCustomPromptsResponseEvent {
custom_prompts,
}),
};
sess.send_event_raw(event).await;
}
Op::Undo => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
.await;
sess.spawn_task(turn_context, Vec::new(), UndoTask::new())
.await;
}
Op::Compact => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
.await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![UserInput::Text {
text: compact::SUMMARIZATION_PROMPT.to_string(),
}])
.await
{
sess.spawn_task(Arc::clone(&turn_context), items, CompactTask)
.await;
}
}
Op::Shutdown => {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
info!("Shutting down Codex instance");
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
let recorder_opt = {
let mut guard = sess.services.rollout.lock().await;
guard.take()
};
if let Some(rec) = recorder_opt
&& let Err(e) = rec.shutdown().await
{
warn!("failed to shutdown rollout recorder: {e}");
let event = Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
sess.send_event_raw(event).await;
}
let event = Event {
id: sub.id.clone(),
msg: EventMsg::ShutdownComplete,
};
sess.send_event_raw(event).await;
break;
}
Op::Review { review_request } => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
.await;
spawn_review_thread(
sess.clone(),
config.clone(),
turn_context.clone(),
sub.id,
review_request,
reasoning_effort: effort,
reasoning_summary: summary,
..Default::default()
},
)
.await;
}
_ => {
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
Op::UserInput { .. } | Op::UserTurn { .. } => {
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op, &mut previous_context)
.await;
}
Op::ExecApproval { id, decision } => {
handlers::exec_approval(&sess, id, decision).await;
}
Op::PatchApproval { id, decision } => {
handlers::patch_approval(&sess, id, decision).await;
}
Op::AddToHistory { text } => {
handlers::add_to_history(&sess, &config, text).await;
}
Op::GetHistoryEntryRequest { offset, log_id } => {
handlers::get_history_entry_request(&sess, &config, sub.id.clone(), offset, log_id)
.await;
}
Op::ListMcpTools => {
handlers::list_mcp_tools(&sess, &config, sub.id.clone()).await;
}
Op::ListCustomPrompts => {
handlers::list_custom_prompts(&sess, sub.id.clone()).await;
}
Op::Undo => {
handlers::undo(&sess, sub.id.clone()).await;
}
Op::Compact => {
handlers::compact(&sess, sub.id.clone()).await;
}
Op::Shutdown => {
if handlers::shutdown(&sess, sub.id.clone()).await {
break;
}
}
Op::Review { review_request } => {
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
}
_ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
}
debug!("Agent loop exited");
}
/// Operation handlers
mod handlers {
use crate::codex::Session;
use crate::codex::SessionSettingsUpdate;
use crate::codex::TurnContext;
use crate::codex::compact;
use crate::codex::spawn_review_thread;
use crate::config::Config;
use crate::mcp::auth::compute_auth_statuses;
use crate::tasks::CompactTask;
use crate::tasks::RegularTask;
use crate::tasks::UndoTask;
use codex_protocol::custom_prompts::CustomPrompt;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ListCustomPromptsResponseEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
pub async fn interrupt(sess: &Arc<Session>) {
sess.interrupt_task().await;
}
pub async fn override_turn_context(sess: &Session, updates: SessionSettingsUpdate) {
sess.update_settings(updates).await;
}
pub async fn user_input_or_turn(
sess: &Arc<Session>,
sub_id: String,
op: Op,
previous_context: &mut Option<Arc<TurnContext>>,
) {
let (items, updates) = match op {
Op::UserTurn {
cwd,
approval_policy,
sandbox_policy,
model,
effort,
summary,
final_output_json_schema,
items,
} => (
items,
SessionSettingsUpdate {
cwd: Some(cwd),
approval_policy: Some(approval_policy),
sandbox_policy: Some(sandbox_policy),
model: Some(model),
reasoning_effort: Some(effort),
reasoning_summary: Some(summary),
final_output_json_schema: Some(final_output_json_schema),
},
),
Op::UserInput { items } => (items, SessionSettingsUpdate::default()),
_ => unreachable!(),
};
let current_context = sess.new_turn_with_sub_id(sub_id, updates).await;
current_context
.client
.get_otel_event_manager()
.user_prompt(&items);
// Attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
if let Some(env_item) =
sess.build_environment_update_item(previous_context.as_ref(), &current_context)
{
sess.record_conversation_items(&current_context, std::slice::from_ref(&env_item))
.await;
}
sess.spawn_task(Arc::clone(&current_context), items, RegularTask)
.await;
*previous_context = Some(current_context);
}
}
pub async fn exec_approval(sess: &Arc<Session>, id: String, decision: ReviewDecision) {
match decision {
ReviewDecision::Abort => {
sess.interrupt_task().await;
}
other => sess.notify_approval(&id, other).await,
}
}
pub async fn patch_approval(sess: &Arc<Session>, id: String, decision: ReviewDecision) {
match decision {
ReviewDecision::Abort => {
sess.interrupt_task().await;
}
other => sess.notify_approval(&id, other).await,
}
}
pub async fn add_to_history(sess: &Arc<Session>, config: &Arc<Config>, text: String) {
let id = sess.conversation_id;
let config = Arc::clone(config);
tokio::spawn(async move {
if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await {
warn!("failed to append to message history: {e}");
}
});
}
pub async fn get_history_entry_request(
sess: &Arc<Session>,
config: &Arc<Config>,
sub_id: String,
offset: usize,
log_id: u64,
) {
let config = Arc::clone(config);
let sess_clone = Arc::clone(sess);
tokio::spawn(async move {
// Run lookup in blocking thread because it does file IO + locking.
let entry_opt = tokio::task::spawn_blocking(move || {
crate::message_history::lookup(log_id, offset, &config)
})
.await
.unwrap_or(None);
let event = Event {
id: sub_id,
msg: EventMsg::GetHistoryEntryResponse(
crate::protocol::GetHistoryEntryResponseEvent {
offset,
log_id,
entry: entry_opt.map(|e| codex_protocol::message_history::HistoryEntry {
conversation_id: e.session_id,
ts: e.ts,
text: e.text,
}),
},
),
};
sess_clone.send_event_raw(event).await;
});
}
pub async fn list_mcp_tools(sess: &Session, config: &Arc<Config>, sub_id: String) {
// This is a cheap lookup from the connection manager's cache.
let tools = sess.services.mcp_connection_manager.list_all_tools();
let (auth_status_entries, resources, resource_templates) = tokio::join!(
compute_auth_statuses(
config.mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
),
sess.services.mcp_connection_manager.list_all_resources(),
sess.services
.mcp_connection_manager
.list_all_resource_templates()
);
let auth_statuses = auth_status_entries
.iter()
.map(|(name, entry)| (name.clone(), entry.auth_status))
.collect();
let event = Event {
id: sub_id,
msg: EventMsg::McpListToolsResponse(crate::protocol::McpListToolsResponseEvent {
tools,
resources,
resource_templates,
auth_statuses,
}),
};
sess.send_event_raw(event).await;
}
pub async fn list_custom_prompts(sess: &Session, sub_id: String) {
let custom_prompts: Vec<CustomPrompt> =
if let Some(dir) = crate::custom_prompts::default_prompts_dir() {
crate::custom_prompts::discover_prompts_in(&dir).await
} else {
Vec::new()
};
let event = Event {
id: sub_id,
msg: EventMsg::ListCustomPromptsResponse(ListCustomPromptsResponseEvent {
custom_prompts,
}),
};
sess.send_event_raw(event).await;
}
pub async fn undo(sess: &Arc<Session>, sub_id: String) {
let turn_context = sess
.new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default())
.await;
sess.spawn_task(turn_context, Vec::new(), UndoTask::new())
.await;
}
pub async fn compact(sess: &Arc<Session>, sub_id: String) {
let turn_context = sess
.new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default())
.await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![UserInput::Text {
text: compact::SUMMARIZATION_PROMPT.to_string(),
}])
.await
{
sess.spawn_task(Arc::clone(&turn_context), items, CompactTask)
.await;
}
}
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
info!("Shutting down Codex instance");
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
let recorder_opt = {
let mut guard = sess.services.rollout.lock().await;
guard.take()
};
if let Some(rec) = recorder_opt
&& let Err(e) = rec.shutdown().await
{
warn!("failed to shutdown rollout recorder: {e}");
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
sess.send_event_raw(event).await;
}
let event = Event {
id: sub_id,
msg: EventMsg::ShutdownComplete,
};
sess.send_event_raw(event).await;
true
}
pub async fn review(
sess: &Arc<Session>,
config: &Arc<Config>,
sub_id: String,
review_request: ReviewRequest,
) {
let turn_context = sess
.new_turn_with_sub_id(sub_id.clone(), SessionSettingsUpdate::default())
.await;
spawn_review_thread(
Arc::clone(sess),
Arc::clone(config),
turn_context.clone(),
sub_id,
review_request,
)
.await;
}
}
/// Spawn a review thread using the given prompt.
async fn spawn_review_thread(
sess: Arc<Session>,