diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 59d7332e..47d80c5c 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -82,7 +82,6 @@ use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecApprovalRequestEvent; -use crate::protocol::ListCustomPromptsResponseEvent; use crate::protocol::Op; use crate::protocol::RateLimitSnapshot; use crate::protocol::ReviewDecision; @@ -103,13 +102,10 @@ use crate::state::ActiveTurn; use crate::state::SessionServices; use crate::state::SessionState; use crate::state::TaskKind; -use crate::tasks::CompactTask; use crate::tasks::GhostSnapshotTask; -use crate::tasks::RegularTask; use crate::tasks::ReviewTask; use crate::tasks::SessionTask; use crate::tasks::SessionTaskContext; -use crate::tasks::UndoTask; use crate::tools::ToolRouter; use crate::tools::context::SharedTurnDiffTracker; use crate::tools::parallel::ToolCallRuntime; @@ -125,7 +121,6 @@ use codex_async_utils::OrCancelExt; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; -use codex_protocol::custom_prompts::CustomPrompt; use codex_protocol::models::ContentItem; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; @@ -1243,9 +1238,9 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv // To break out of this loop, send Op::Shutdown. while let Ok(sub) = rx_sub.recv().await { debug!(?sub, "Submission"); - match sub.op { + match sub.op.clone() { Op::Interrupt => { - sess.interrupt_task().await; + handlers::interrupt(&sess).await; } Op::OverrideTurnContext { cwd, @@ -1255,246 +1250,336 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv effort, summary, } => { - let updates = SessionSettingsUpdate { - cwd, - approval_policy, - sandbox_policy, - model, - reasoning_effort: effort, - reasoning_summary: summary, - ..Default::default() - }; - sess.update_settings(updates).await; - } - - Op::UserInput { .. } | Op::UserTurn { .. } => { - let (items, updates) = match sub.op { - Op::UserTurn { + handlers::override_turn_context( + &sess, + SessionSettingsUpdate { cwd, approval_policy, sandbox_policy, model, - effort, - summary, - final_output_json_schema, - items, - } => ( - items, - SessionSettingsUpdate { - cwd: Some(cwd), - approval_policy: Some(approval_policy), - sandbox_policy: Some(sandbox_policy), - model: Some(model), - reasoning_effort: Some(effort), - reasoning_summary: Some(summary), - final_output_json_schema: Some(final_output_json_schema), - }, - ), - Op::UserInput { items } => (items, SessionSettingsUpdate::default()), - _ => unreachable!(), - }; - let current_context = sess.new_turn_with_sub_id(sub.id.clone(), updates).await; - current_context - .client - .get_otel_event_manager() - .user_prompt(&items); - // attempt to inject input into current task - if let Err(items) = sess.inject_input(items).await { - if let Some(env_item) = sess - .build_environment_update_item(previous_context.as_ref(), ¤t_context) - { - sess.record_conversation_items( - ¤t_context, - std::slice::from_ref(&env_item), - ) - .await; - } - - sess.spawn_task(Arc::clone(¤t_context), items, RegularTask) - .await; - previous_context = Some(current_context); - } - } - Op::ExecApproval { id, decision } => match decision { - ReviewDecision::Abort => { - sess.interrupt_task().await; - } - other => sess.notify_approval(&id, other).await, - }, - Op::PatchApproval { id, decision } => match decision { - ReviewDecision::Abort => { - sess.interrupt_task().await; - } - other => sess.notify_approval(&id, other).await, - }, - Op::AddToHistory { text } => { - let id = sess.conversation_id; - let config = config.clone(); - tokio::spawn(async move { - if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await - { - warn!("failed to append to message history: {e}"); - } - }); - } - - Op::GetHistoryEntryRequest { offset, log_id } => { - let config = config.clone(); - let sess_clone = sess.clone(); - let sub_id = sub.id.clone(); - - tokio::spawn(async move { - // Run lookup in blocking thread because it does file IO + locking. - let entry_opt = tokio::task::spawn_blocking(move || { - crate::message_history::lookup(log_id, offset, &config) - }) - .await - .unwrap_or(None); - - let event = Event { - id: sub_id, - msg: EventMsg::GetHistoryEntryResponse( - crate::protocol::GetHistoryEntryResponseEvent { - offset, - log_id, - entry: entry_opt.map(|e| { - codex_protocol::message_history::HistoryEntry { - conversation_id: e.session_id, - ts: e.ts, - text: e.text, - } - }), - }, - ), - }; - - sess_clone.send_event_raw(event).await; - }); - } - Op::ListMcpTools => { - let sub_id = sub.id.clone(); - - // This is a cheap lookup from the connection manager's cache. - let tools = sess.services.mcp_connection_manager.list_all_tools(); - let (auth_status_entries, resources, resource_templates) = tokio::join!( - compute_auth_statuses( - config.mcp_servers.iter(), - config.mcp_oauth_credentials_store_mode, - ), - sess.services.mcp_connection_manager.list_all_resources(), - sess.services - .mcp_connection_manager - .list_all_resource_templates() - ); - let auth_statuses = auth_status_entries - .iter() - .map(|(name, entry)| (name.clone(), entry.auth_status)) - .collect(); - let event = Event { - id: sub_id, - msg: EventMsg::McpListToolsResponse( - crate::protocol::McpListToolsResponseEvent { - tools, - resources, - resource_templates, - auth_statuses, - }, - ), - }; - sess.send_event_raw(event).await; - } - Op::ListCustomPrompts => { - let sub_id = sub.id.clone(); - - let custom_prompts: Vec = - if let Some(dir) = crate::custom_prompts::default_prompts_dir() { - crate::custom_prompts::discover_prompts_in(&dir).await - } else { - Vec::new() - }; - - let event = Event { - id: sub_id, - msg: EventMsg::ListCustomPromptsResponse(ListCustomPromptsResponseEvent { - custom_prompts, - }), - }; - sess.send_event_raw(event).await; - } - Op::Undo => { - let turn_context = sess - .new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default()) - .await; - sess.spawn_task(turn_context, Vec::new(), UndoTask::new()) - .await; - } - Op::Compact => { - let turn_context = sess - .new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default()) - .await; - // Attempt to inject input into current task - if let Err(items) = sess - .inject_input(vec![UserInput::Text { - text: compact::SUMMARIZATION_PROMPT.to_string(), - }]) - .await - { - sess.spawn_task(Arc::clone(&turn_context), items, CompactTask) - .await; - } - } - Op::Shutdown => { - sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - info!("Shutting down Codex instance"); - - // Gracefully flush and shutdown rollout recorder on session end so tests - // that inspect the rollout file do not race with the background writer. - let recorder_opt = { - let mut guard = sess.services.rollout.lock().await; - guard.take() - }; - if let Some(rec) = recorder_opt - && let Err(e) = rec.shutdown().await - { - warn!("failed to shutdown rollout recorder: {e}"); - let event = Event { - id: sub.id.clone(), - msg: EventMsg::Error(ErrorEvent { - message: "Failed to shutdown rollout recorder".to_string(), - }), - }; - sess.send_event_raw(event).await; - } - - let event = Event { - id: sub.id.clone(), - msg: EventMsg::ShutdownComplete, - }; - sess.send_event_raw(event).await; - break; - } - - Op::Review { review_request } => { - let turn_context = sess - .new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default()) - .await; - spawn_review_thread( - sess.clone(), - config.clone(), - turn_context.clone(), - sub.id, - review_request, + reasoning_effort: effort, + reasoning_summary: summary, + ..Default::default() + }, ) .await; } - _ => { - // Ignore unknown ops; enum is non_exhaustive to allow extensions. + Op::UserInput { .. } | Op::UserTurn { .. } => { + handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op, &mut previous_context) + .await; } + Op::ExecApproval { id, decision } => { + handlers::exec_approval(&sess, id, decision).await; + } + Op::PatchApproval { id, decision } => { + handlers::patch_approval(&sess, id, decision).await; + } + Op::AddToHistory { text } => { + handlers::add_to_history(&sess, &config, text).await; + } + Op::GetHistoryEntryRequest { offset, log_id } => { + handlers::get_history_entry_request(&sess, &config, sub.id.clone(), offset, log_id) + .await; + } + Op::ListMcpTools => { + handlers::list_mcp_tools(&sess, &config, sub.id.clone()).await; + } + Op::ListCustomPrompts => { + handlers::list_custom_prompts(&sess, sub.id.clone()).await; + } + Op::Undo => { + handlers::undo(&sess, sub.id.clone()).await; + } + Op::Compact => { + handlers::compact(&sess, sub.id.clone()).await; + } + Op::Shutdown => { + if handlers::shutdown(&sess, sub.id.clone()).await { + break; + } + } + Op::Review { review_request } => { + handlers::review(&sess, &config, sub.id.clone(), review_request).await; + } + _ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions. } } debug!("Agent loop exited"); } +/// Operation handlers +mod handlers { + use crate::codex::Session; + use crate::codex::SessionSettingsUpdate; + use crate::codex::TurnContext; + use crate::codex::compact; + use crate::codex::spawn_review_thread; + use crate::config::Config; + use crate::mcp::auth::compute_auth_statuses; + use crate::tasks::CompactTask; + use crate::tasks::RegularTask; + use crate::tasks::UndoTask; + use codex_protocol::custom_prompts::CustomPrompt; + use codex_protocol::protocol::ErrorEvent; + use codex_protocol::protocol::Event; + use codex_protocol::protocol::EventMsg; + use codex_protocol::protocol::ListCustomPromptsResponseEvent; + use codex_protocol::protocol::Op; + use codex_protocol::protocol::ReviewDecision; + use codex_protocol::protocol::ReviewRequest; + use codex_protocol::protocol::TurnAbortReason; + use codex_protocol::user_input::UserInput; + use std::sync::Arc; + use tracing::info; + use tracing::warn; + + pub async fn interrupt(sess: &Arc) { + sess.interrupt_task().await; + } + + pub async fn override_turn_context(sess: &Session, updates: SessionSettingsUpdate) { + sess.update_settings(updates).await; + } + + pub async fn user_input_or_turn( + sess: &Arc, + sub_id: String, + op: Op, + previous_context: &mut Option>, + ) { + let (items, updates) = match op { + Op::UserTurn { + cwd, + approval_policy, + sandbox_policy, + model, + effort, + summary, + final_output_json_schema, + items, + } => ( + items, + SessionSettingsUpdate { + cwd: Some(cwd), + approval_policy: Some(approval_policy), + sandbox_policy: Some(sandbox_policy), + model: Some(model), + reasoning_effort: Some(effort), + reasoning_summary: Some(summary), + final_output_json_schema: Some(final_output_json_schema), + }, + ), + Op::UserInput { items } => (items, SessionSettingsUpdate::default()), + _ => unreachable!(), + }; + + let current_context = sess.new_turn_with_sub_id(sub_id, updates).await; + current_context + .client + .get_otel_event_manager() + .user_prompt(&items); + + // Attempt to inject input into current task + if let Err(items) = sess.inject_input(items).await { + if let Some(env_item) = + sess.build_environment_update_item(previous_context.as_ref(), ¤t_context) + { + sess.record_conversation_items(¤t_context, std::slice::from_ref(&env_item)) + .await; + } + + sess.spawn_task(Arc::clone(¤t_context), items, RegularTask) + .await; + *previous_context = Some(current_context); + } + } + + pub async fn exec_approval(sess: &Arc, id: String, decision: ReviewDecision) { + match decision { + ReviewDecision::Abort => { + sess.interrupt_task().await; + } + other => sess.notify_approval(&id, other).await, + } + } + + pub async fn patch_approval(sess: &Arc, id: String, decision: ReviewDecision) { + match decision { + ReviewDecision::Abort => { + sess.interrupt_task().await; + } + other => sess.notify_approval(&id, other).await, + } + } + + pub async fn add_to_history(sess: &Arc, config: &Arc, text: String) { + let id = sess.conversation_id; + let config = Arc::clone(config); + tokio::spawn(async move { + if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await { + warn!("failed to append to message history: {e}"); + } + }); + } + + pub async fn get_history_entry_request( + sess: &Arc, + config: &Arc, + sub_id: String, + offset: usize, + log_id: u64, + ) { + let config = Arc::clone(config); + let sess_clone = Arc::clone(sess); + + tokio::spawn(async move { + // Run lookup in blocking thread because it does file IO + locking. + let entry_opt = tokio::task::spawn_blocking(move || { + crate::message_history::lookup(log_id, offset, &config) + }) + .await + .unwrap_or(None); + + let event = Event { + id: sub_id, + msg: EventMsg::GetHistoryEntryResponse( + crate::protocol::GetHistoryEntryResponseEvent { + offset, + log_id, + entry: entry_opt.map(|e| codex_protocol::message_history::HistoryEntry { + conversation_id: e.session_id, + ts: e.ts, + text: e.text, + }), + }, + ), + }; + + sess_clone.send_event_raw(event).await; + }); + } + + pub async fn list_mcp_tools(sess: &Session, config: &Arc, sub_id: String) { + // This is a cheap lookup from the connection manager's cache. + let tools = sess.services.mcp_connection_manager.list_all_tools(); + let (auth_status_entries, resources, resource_templates) = tokio::join!( + compute_auth_statuses( + config.mcp_servers.iter(), + config.mcp_oauth_credentials_store_mode, + ), + sess.services.mcp_connection_manager.list_all_resources(), + sess.services + .mcp_connection_manager + .list_all_resource_templates() + ); + let auth_statuses = auth_status_entries + .iter() + .map(|(name, entry)| (name.clone(), entry.auth_status)) + .collect(); + let event = Event { + id: sub_id, + msg: EventMsg::McpListToolsResponse(crate::protocol::McpListToolsResponseEvent { + tools, + resources, + resource_templates, + auth_statuses, + }), + }; + sess.send_event_raw(event).await; + } + + pub async fn list_custom_prompts(sess: &Session, sub_id: String) { + let custom_prompts: Vec = + if let Some(dir) = crate::custom_prompts::default_prompts_dir() { + crate::custom_prompts::discover_prompts_in(&dir).await + } else { + Vec::new() + }; + + let event = Event { + id: sub_id, + msg: EventMsg::ListCustomPromptsResponse(ListCustomPromptsResponseEvent { + custom_prompts, + }), + }; + sess.send_event_raw(event).await; + } + + pub async fn undo(sess: &Arc, sub_id: String) { + let turn_context = sess + .new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default()) + .await; + sess.spawn_task(turn_context, Vec::new(), UndoTask::new()) + .await; + } + + pub async fn compact(sess: &Arc, sub_id: String) { + let turn_context = sess + .new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default()) + .await; + // Attempt to inject input into current task + if let Err(items) = sess + .inject_input(vec![UserInput::Text { + text: compact::SUMMARIZATION_PROMPT.to_string(), + }]) + .await + { + sess.spawn_task(Arc::clone(&turn_context), items, CompactTask) + .await; + } + } + + pub async fn shutdown(sess: &Arc, sub_id: String) -> bool { + sess.abort_all_tasks(TurnAbortReason::Interrupted).await; + info!("Shutting down Codex instance"); + + // Gracefully flush and shutdown rollout recorder on session end so tests + // that inspect the rollout file do not race with the background writer. + let recorder_opt = { + let mut guard = sess.services.rollout.lock().await; + guard.take() + }; + if let Some(rec) = recorder_opt + && let Err(e) = rec.shutdown().await + { + warn!("failed to shutdown rollout recorder: {e}"); + let event = Event { + id: sub_id.clone(), + msg: EventMsg::Error(ErrorEvent { + message: "Failed to shutdown rollout recorder".to_string(), + }), + }; + sess.send_event_raw(event).await; + } + + let event = Event { + id: sub_id, + msg: EventMsg::ShutdownComplete, + }; + sess.send_event_raw(event).await; + true + } + + pub async fn review( + sess: &Arc, + config: &Arc, + sub_id: String, + review_request: ReviewRequest, + ) { + let turn_context = sess + .new_turn_with_sub_id(sub_id.clone(), SessionSettingsUpdate::default()) + .await; + spawn_review_thread( + Arc::clone(sess), + Arc::clone(config), + turn_context.clone(), + sub_id, + review_request, + ) + .await; + } +} + /// Spawn a review thread using the given prompt. async fn spawn_review_thread( sess: Arc,