Create independent TurnContexts (#5308)
The goal of this change: 1. Unify user input and user turn implementation. 2. Have a single place where turn/session setting overrides are applied. 3. Have a single place where turn context is created. 4. Create TurnContext only for actual turn and have a separate structure for current session settings (reuse ConfigureSession)
This commit is contained in:
@@ -107,6 +107,7 @@ use crate::rollout::RolloutRecorderParams;
|
||||
use crate::shell;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::SessionServices;
|
||||
use crate::state::SessionState;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::CompactTask;
|
||||
use crate::tasks::RegularTask;
|
||||
@@ -169,7 +170,7 @@ impl Codex {
|
||||
|
||||
let config = Arc::new(config);
|
||||
|
||||
let configure_session = ConfigureSession {
|
||||
let session_configuration = SessionConfiguration {
|
||||
provider: config.model_provider.clone(),
|
||||
model: config.model.clone(),
|
||||
model_reasoning_effort: config.model_reasoning_effort,
|
||||
@@ -178,13 +179,13 @@ impl Codex {
|
||||
base_instructions: config.base_instructions.clone(),
|
||||
approval_policy: config.approval_policy,
|
||||
sandbox_policy: config.sandbox_policy.clone(),
|
||||
notify: UserNotifier::new(config.notify.clone()),
|
||||
cwd: config.cwd.clone(),
|
||||
original_config_do_not_use: Arc::clone(&config),
|
||||
};
|
||||
|
||||
// Generate a unique ID for the lifetime of this Codex session.
|
||||
let (session, turn_context) = Session::new(
|
||||
configure_session,
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
config.clone(),
|
||||
auth_manager.clone(),
|
||||
tx_event.clone(),
|
||||
@@ -199,7 +200,7 @@ impl Codex {
|
||||
let conversation_id = session.conversation_id;
|
||||
|
||||
// This task will run until Op::Shutdown is received.
|
||||
tokio::spawn(submission_loop(session, turn_context, config, rx_sub));
|
||||
tokio::spawn(submission_loop(session, config, rx_sub));
|
||||
let codex = Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub,
|
||||
@@ -243,8 +244,6 @@ impl Codex {
|
||||
}
|
||||
}
|
||||
|
||||
use crate::state::SessionState;
|
||||
|
||||
/// Context for an initialized model agent
|
||||
///
|
||||
/// A session has at most 1 running task at a time, and can be interrupted by user input.
|
||||
@@ -283,8 +282,8 @@ impl TurnContext {
|
||||
}
|
||||
}
|
||||
|
||||
/// Configure the model session.
|
||||
struct ConfigureSession {
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SessionConfiguration {
|
||||
/// Provider identifier ("openai", "openrouter", ...).
|
||||
provider: ModelProviderInfo,
|
||||
|
||||
@@ -305,8 +304,6 @@ struct ConfigureSession {
|
||||
/// How to sandbox commands executed in the system
|
||||
sandbox_policy: SandboxPolicy,
|
||||
|
||||
notify: UserNotifier,
|
||||
|
||||
/// Working directory that should be treated as the *root* of the
|
||||
/// session. All relative paths supplied by the model as well as the
|
||||
/// execution sandbox are resolved against this directory **instead**
|
||||
@@ -315,32 +312,118 @@ struct ConfigureSession {
|
||||
/// `ConfigureSession` operation so that the business-logic layer can
|
||||
/// operate deterministically.
|
||||
cwd: PathBuf,
|
||||
|
||||
// TODO(pakrym): Remove config from here
|
||||
original_config_do_not_use: Arc<Config>,
|
||||
}
|
||||
|
||||
impl SessionConfiguration {
|
||||
pub(crate) fn apply(&self, updates: &SessionSettingsUpdate) -> Self {
|
||||
let mut next_configuration = self.clone();
|
||||
if let Some(model) = updates.model.clone() {
|
||||
next_configuration.model = model;
|
||||
}
|
||||
if let Some(effort) = updates.reasoning_effort {
|
||||
next_configuration.model_reasoning_effort = effort;
|
||||
}
|
||||
if let Some(summary) = updates.reasoning_summary {
|
||||
next_configuration.model_reasoning_summary = summary;
|
||||
}
|
||||
if let Some(approval_policy) = updates.approval_policy {
|
||||
next_configuration.approval_policy = approval_policy;
|
||||
}
|
||||
if let Some(sandbox_policy) = updates.sandbox_policy.clone() {
|
||||
next_configuration.sandbox_policy = sandbox_policy;
|
||||
}
|
||||
if let Some(cwd) = updates.cwd.clone() {
|
||||
next_configuration.cwd = cwd;
|
||||
}
|
||||
next_configuration
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct SessionSettingsUpdate {
|
||||
pub(crate) cwd: Option<PathBuf>,
|
||||
pub(crate) approval_policy: Option<AskForApproval>,
|
||||
pub(crate) sandbox_policy: Option<SandboxPolicy>,
|
||||
pub(crate) model: Option<String>,
|
||||
pub(crate) reasoning_effort: Option<Option<ReasoningEffortConfig>>,
|
||||
pub(crate) reasoning_summary: Option<ReasoningSummaryConfig>,
|
||||
pub(crate) final_output_json_schema: Option<Option<Value>>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
fn make_turn_context(
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
otel_event_manager: &OtelEventManager,
|
||||
provider: ModelProviderInfo,
|
||||
session_configuration: &SessionConfiguration,
|
||||
conversation_id: ConversationId,
|
||||
) -> TurnContext {
|
||||
let config = session_configuration.original_config_do_not_use.clone();
|
||||
let model_family = find_family_for_model(&session_configuration.model)
|
||||
.unwrap_or_else(|| config.model_family.clone());
|
||||
let mut per_turn_config = (*config).clone();
|
||||
per_turn_config.model = session_configuration.model.clone();
|
||||
per_turn_config.model_family = model_family.clone();
|
||||
per_turn_config.model_reasoning_effort = session_configuration.model_reasoning_effort;
|
||||
per_turn_config.model_reasoning_summary = session_configuration.model_reasoning_summary;
|
||||
if let Some(model_info) = get_model_info(&model_family) {
|
||||
per_turn_config.model_context_window = Some(model_info.context_window);
|
||||
}
|
||||
|
||||
let otel_event_manager = otel_event_manager.clone().with_model(
|
||||
session_configuration.model.as_str(),
|
||||
session_configuration.model.as_str(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
Arc::new(per_turn_config),
|
||||
auth_manager,
|
||||
otel_event_manager,
|
||||
provider,
|
||||
session_configuration.model_reasoning_effort,
|
||||
session_configuration.model_reasoning_summary,
|
||||
conversation_id,
|
||||
);
|
||||
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &model_family,
|
||||
features: &config.features,
|
||||
});
|
||||
|
||||
TurnContext {
|
||||
client,
|
||||
cwd: session_configuration.cwd.clone(),
|
||||
base_instructions: session_configuration.base_instructions.clone(),
|
||||
user_instructions: session_configuration.user_instructions.clone(),
|
||||
approval_policy: session_configuration.approval_policy,
|
||||
sandbox_policy: session_configuration.sandbox_policy.clone(),
|
||||
shell_environment_policy: config.shell_environment_policy.clone(),
|
||||
tools_config,
|
||||
is_review_mode: false,
|
||||
final_output_json_schema: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn new(
|
||||
configure_session: ConfigureSession,
|
||||
session_configuration: SessionConfiguration,
|
||||
config: Arc<Config>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
tx_event: Sender<Event>,
|
||||
initial_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
) -> anyhow::Result<(Arc<Self>, TurnContext)> {
|
||||
let ConfigureSession {
|
||||
provider,
|
||||
model,
|
||||
model_reasoning_effort,
|
||||
model_reasoning_summary,
|
||||
user_instructions,
|
||||
base_instructions,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
notify,
|
||||
cwd,
|
||||
} = configure_session;
|
||||
debug!("Configuring session: model={model}; provider={provider:?}");
|
||||
if !cwd.is_absolute() {
|
||||
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
"Configuring session: model={}; provider={:?}",
|
||||
session_configuration.model, session_configuration.provider
|
||||
);
|
||||
if !session_configuration.cwd.is_absolute() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"cwd is not absolute: {:?}",
|
||||
session_configuration.cwd
|
||||
));
|
||||
}
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
@@ -350,7 +433,7 @@ impl Session {
|
||||
conversation_id,
|
||||
RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
user_instructions.clone(),
|
||||
session_configuration.user_instructions.clone(),
|
||||
session_source,
|
||||
),
|
||||
)
|
||||
@@ -406,8 +489,6 @@ impl Session {
|
||||
anyhow::anyhow!("failed to initialize rollout recorder: {e:#}")
|
||||
})?;
|
||||
let rollout_path = rollout_recorder.rollout_path.clone();
|
||||
// Create the mutable state for the Session.
|
||||
let state = SessionState::new();
|
||||
|
||||
// Handle MCP manager result and record any startup failures.
|
||||
let (mcp_connection_manager, failed_clients) = match mcp_res {
|
||||
@@ -475,45 +556,24 @@ impl Session {
|
||||
config.active_profile.clone(),
|
||||
);
|
||||
|
||||
// Now that the conversation id is final (may have been updated by resume),
|
||||
// construct the model client.
|
||||
let client = ModelClient::new(
|
||||
config.clone(),
|
||||
Some(auth_manager.clone()),
|
||||
otel_event_manager,
|
||||
provider.clone(),
|
||||
model_reasoning_effort,
|
||||
model_reasoning_summary,
|
||||
conversation_id,
|
||||
);
|
||||
let turn_context = TurnContext {
|
||||
client,
|
||||
tools_config: ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &config.model_family,
|
||||
features: &config.features,
|
||||
}),
|
||||
user_instructions,
|
||||
base_instructions,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
shell_environment_policy: config.shell_environment_policy.clone(),
|
||||
cwd,
|
||||
is_review_mode: false,
|
||||
final_output_json_schema: None,
|
||||
};
|
||||
// Create the mutable state for the Session.
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager,
|
||||
session_manager: ExecSessionManager::default(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
notifier: notify,
|
||||
notifier: UserNotifier::new(config.notify.clone()),
|
||||
rollout: Mutex::new(Some(rollout_recorder)),
|
||||
user_shell: default_shell,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
executor: Executor::new(ExecutorConfig::new(
|
||||
turn_context.sandbox_policy.clone(),
|
||||
turn_context.cwd.clone(),
|
||||
session_configuration.sandbox_policy.clone(),
|
||||
session_configuration.cwd.clone(),
|
||||
config.codex_linux_sandbox_exe.clone(),
|
||||
)),
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
otel_event_manager,
|
||||
};
|
||||
|
||||
let sess = Arc::new(Session {
|
||||
@@ -528,15 +588,14 @@ impl Session {
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = initial_history.get_event_msgs();
|
||||
sess.record_initial_history(&turn_context, initial_history)
|
||||
.await;
|
||||
sess.record_initial_history(initial_history).await;
|
||||
|
||||
let events = std::iter::once(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
|
||||
session_id: conversation_id,
|
||||
model,
|
||||
reasoning_effort: model_reasoning_effort,
|
||||
model: session_configuration.model.clone(),
|
||||
reasoning_effort: session_configuration.model_reasoning_effort,
|
||||
history_log_id,
|
||||
history_entry_count,
|
||||
initial_messages,
|
||||
@@ -548,7 +607,7 @@ impl Session {
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
|
||||
Ok((sess, turn_context))
|
||||
Ok(sess)
|
||||
}
|
||||
|
||||
pub(crate) fn get_tx_event(&self) -> Sender<Event> {
|
||||
@@ -562,15 +621,12 @@ impl Session {
|
||||
format!("auto-compact-{id}")
|
||||
}
|
||||
|
||||
async fn record_initial_history(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
conversation_history: InitialHistory,
|
||||
) {
|
||||
async fn record_initial_history(&self, conversation_history: InitialHistory) {
|
||||
let turn_context = self.new_turn(SessionSettingsUpdate::default()).await;
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
// Build and record initial items (user instructions + environment context)
|
||||
let items = self.build_initial_context(turn_context);
|
||||
let items = self.build_initial_context(&turn_context);
|
||||
self.record_conversation_items(&items).await;
|
||||
}
|
||||
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
|
||||
@@ -579,7 +635,7 @@ impl Session {
|
||||
|
||||
// Always add response items to conversation history
|
||||
let reconstructed_history =
|
||||
self.reconstruct_history_from_rollout(turn_context, &rollout_items);
|
||||
self.reconstruct_history_from_rollout(&turn_context, &rollout_items);
|
||||
if !reconstructed_history.is_empty() {
|
||||
self.record_into_history(&reconstructed_history).await;
|
||||
}
|
||||
@@ -592,6 +648,52 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn update_settings(&self, updates: SessionSettingsUpdate) {
|
||||
let mut state = self.state.lock().await;
|
||||
|
||||
state.session_configuration = state.session_configuration.apply(&updates);
|
||||
}
|
||||
|
||||
pub(crate) async fn new_turn(&self, updates: SessionSettingsUpdate) -> Arc<TurnContext> {
|
||||
let current_configuration = self.state.lock().await.session_configuration.clone();
|
||||
let session_configuration = current_configuration.apply(&updates);
|
||||
|
||||
self.services.executor.update_environment(
|
||||
session_configuration.sandbox_policy.clone(),
|
||||
session_configuration.cwd.clone(),
|
||||
);
|
||||
|
||||
let mut turn_context: TurnContext = Self::make_turn_context(
|
||||
Some(Arc::clone(&self.services.auth_manager)),
|
||||
&self.services.otel_event_manager,
|
||||
session_configuration.provider.clone(),
|
||||
&session_configuration,
|
||||
self.conversation_id,
|
||||
);
|
||||
if let Some(final_schema) = updates.final_output_json_schema {
|
||||
turn_context.final_output_json_schema = final_schema;
|
||||
}
|
||||
Arc::new(turn_context)
|
||||
}
|
||||
|
||||
fn build_environment_update_item(
|
||||
&self,
|
||||
previous: Option<&Arc<TurnContext>>,
|
||||
next: &TurnContext,
|
||||
) -> Option<ResponseItem> {
|
||||
let prev = previous?;
|
||||
|
||||
let prev_context = EnvironmentContext::from(prev.as_ref());
|
||||
let next_context = EnvironmentContext::from(next);
|
||||
if prev_context.equals_except_shell(&next_context) {
|
||||
return None;
|
||||
}
|
||||
Some(ResponseItem::from(EnvironmentContext::diff(
|
||||
prev.as_ref(),
|
||||
next,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Persist the event to rollout and send it to clients.
|
||||
pub(crate) async fn send_event(&self, event: Event) {
|
||||
// Persist the event into rollout (recorder filters as needed)
|
||||
@@ -1188,14 +1290,8 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
async fn submission_loop(
|
||||
sess: Arc<Session>,
|
||||
turn_context: TurnContext,
|
||||
config: Arc<Config>,
|
||||
rx_sub: Receiver<Submission>,
|
||||
) {
|
||||
// Wrap once to avoid cloning TurnContext for each task.
|
||||
let mut turn_context = Arc::new(turn_context);
|
||||
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
|
||||
let mut previous_context: Option<Arc<TurnContext>> = None;
|
||||
// To break out of this loop, send Op::Shutdown.
|
||||
while let Ok(sub) = rx_sub.recv().await {
|
||||
debug!(?sub, "Submission");
|
||||
@@ -1211,174 +1307,58 @@ async fn submission_loop(
|
||||
effort,
|
||||
summary,
|
||||
} => {
|
||||
// Recalculate the persistent turn context with provided overrides.
|
||||
let prev = Arc::clone(&turn_context);
|
||||
let provider = prev.client.get_provider();
|
||||
|
||||
// Effective model + family
|
||||
let (effective_model, effective_family) = if let Some(ref m) = model {
|
||||
let fam =
|
||||
find_family_for_model(m).unwrap_or_else(|| config.model_family.clone());
|
||||
(m.clone(), fam)
|
||||
} else {
|
||||
(prev.client.get_model(), prev.client.get_model_family())
|
||||
let updates = SessionSettingsUpdate {
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
model,
|
||||
reasoning_effort: effort,
|
||||
reasoning_summary: summary,
|
||||
..Default::default()
|
||||
};
|
||||
sess.update_settings(updates).await;
|
||||
}
|
||||
|
||||
// Effective reasoning settings
|
||||
let effective_effort = effort.unwrap_or(prev.client.get_reasoning_effort());
|
||||
let effective_summary = summary.unwrap_or(prev.client.get_reasoning_summary());
|
||||
|
||||
let auth_manager = prev.client.get_auth_manager();
|
||||
|
||||
// Build updated config for the client
|
||||
let mut updated_config = (*config).clone();
|
||||
updated_config.model = effective_model.clone();
|
||||
updated_config.model_family = effective_family.clone();
|
||||
if let Some(model_info) = get_model_info(&effective_family) {
|
||||
updated_config.model_context_window = Some(model_info.context_window);
|
||||
}
|
||||
|
||||
let otel_event_manager = prev.client.get_otel_event_manager().with_model(
|
||||
updated_config.model.as_str(),
|
||||
updated_config.model_family.slug.as_str(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
Arc::new(updated_config),
|
||||
auth_manager,
|
||||
otel_event_manager,
|
||||
provider,
|
||||
effective_effort,
|
||||
effective_summary,
|
||||
sess.conversation_id,
|
||||
);
|
||||
|
||||
let new_approval_policy = approval_policy.unwrap_or(prev.approval_policy);
|
||||
let new_sandbox_policy = sandbox_policy
|
||||
.clone()
|
||||
.unwrap_or(prev.sandbox_policy.clone());
|
||||
let new_cwd = cwd.clone().unwrap_or_else(|| prev.cwd.clone());
|
||||
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &effective_family,
|
||||
features: &config.features,
|
||||
});
|
||||
|
||||
let new_turn_context = TurnContext {
|
||||
client,
|
||||
tools_config,
|
||||
user_instructions: prev.user_instructions.clone(),
|
||||
base_instructions: prev.base_instructions.clone(),
|
||||
approval_policy: new_approval_policy,
|
||||
sandbox_policy: new_sandbox_policy.clone(),
|
||||
shell_environment_policy: prev.shell_environment_policy.clone(),
|
||||
cwd: new_cwd.clone(),
|
||||
is_review_mode: false,
|
||||
final_output_json_schema: None,
|
||||
};
|
||||
|
||||
// Install the new persistent context for subsequent tasks/turns.
|
||||
turn_context = Arc::new(new_turn_context);
|
||||
|
||||
// Optionally persist changes to model / effort
|
||||
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
|
||||
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
|
||||
Op::UserInput { .. } | Op::UserTurn { .. } => {
|
||||
let (items, updates) = match sub.op {
|
||||
Op::UserTurn {
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
// Shell is not configurable from turn to turn
|
||||
None,
|
||||
))])
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Op::UserInput { items } => {
|
||||
turn_context
|
||||
.client
|
||||
.get_otel_event_manager()
|
||||
.user_prompt(&items);
|
||||
// attempt to inject input into current task
|
||||
if let Err(items) = sess.inject_input(items).await {
|
||||
// no current task, spawn a new one
|
||||
sess.spawn_task(Arc::clone(&turn_context), sub.id, items, RegularTask)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Op::UserTurn {
|
||||
items,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
final_output_json_schema,
|
||||
} => {
|
||||
turn_context
|
||||
.client
|
||||
.get_otel_event_manager()
|
||||
.user_prompt(&items);
|
||||
// attempt to inject input into current task
|
||||
if let Err(items) = sess.inject_input(items).await {
|
||||
// Derive a fresh TurnContext for this turn using the provided overrides.
|
||||
let provider = turn_context.client.get_provider();
|
||||
let auth_manager = turn_context.client.get_auth_manager();
|
||||
|
||||
// Derive a model family for the requested model; fall back to the session's.
|
||||
let model_family = find_family_for_model(&model)
|
||||
.unwrap_or_else(|| config.model_family.clone());
|
||||
|
||||
// Create a per‑turn Config clone with the requested model/family.
|
||||
let mut per_turn_config = (*config).clone();
|
||||
per_turn_config.model = model.clone();
|
||||
per_turn_config.model_family = model_family.clone();
|
||||
if let Some(model_info) = get_model_info(&model_family) {
|
||||
per_turn_config.model_context_window = Some(model_info.context_window);
|
||||
}
|
||||
|
||||
let otel_event_manager =
|
||||
turn_context.client.get_otel_event_manager().with_model(
|
||||
per_turn_config.model.as_str(),
|
||||
per_turn_config.model_family.slug.as_str(),
|
||||
);
|
||||
|
||||
// Build a new client with per‑turn reasoning settings.
|
||||
// Reuse the same provider and session id; auth defaults to env/API key.
|
||||
let client = ModelClient::new(
|
||||
Arc::new(per_turn_config),
|
||||
auth_manager,
|
||||
otel_event_manager,
|
||||
provider,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
sess.conversation_id,
|
||||
);
|
||||
|
||||
let fresh_turn_context = TurnContext {
|
||||
client,
|
||||
tools_config: ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &model_family,
|
||||
features: &config.features,
|
||||
}),
|
||||
user_instructions: turn_context.user_instructions.clone(),
|
||||
base_instructions: turn_context.base_instructions.clone(),
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
shell_environment_policy: turn_context.shell_environment_policy.clone(),
|
||||
cwd,
|
||||
is_review_mode: false,
|
||||
final_output_json_schema,
|
||||
};
|
||||
|
||||
// if the environment context has changed, record it in the conversation history
|
||||
let previous_env_context = EnvironmentContext::from(turn_context.as_ref());
|
||||
let new_env_context = EnvironmentContext::from(&fresh_turn_context);
|
||||
if !new_env_context.equals_except_shell(&previous_env_context) {
|
||||
let env_response_item = ResponseItem::from(new_env_context);
|
||||
sess.record_conversation_items(std::slice::from_ref(&env_response_item))
|
||||
items,
|
||||
} => (
|
||||
items,
|
||||
SessionSettingsUpdate {
|
||||
cwd: Some(cwd),
|
||||
approval_policy: Some(approval_policy),
|
||||
sandbox_policy: Some(sandbox_policy),
|
||||
model: Some(model),
|
||||
reasoning_effort: Some(effort),
|
||||
reasoning_summary: Some(summary),
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
},
|
||||
),
|
||||
Op::UserInput { items } => (items, SessionSettingsUpdate::default()),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let current_context = sess.new_turn(updates).await;
|
||||
current_context
|
||||
.client
|
||||
.get_otel_event_manager()
|
||||
.user_prompt(&items);
|
||||
// attempt to inject input into current task
|
||||
if let Err(items) = sess.inject_input(items).await {
|
||||
if let Some(env_item) = sess
|
||||
.build_environment_update_item(previous_context.as_ref(), ¤t_context)
|
||||
{
|
||||
sess.record_conversation_items(std::slice::from_ref(&env_item))
|
||||
.await;
|
||||
for msg in map_response_item_to_event_messages(
|
||||
&env_response_item,
|
||||
&env_item,
|
||||
sess.show_raw_agent_reasoning(),
|
||||
) {
|
||||
let event = Event {
|
||||
@@ -1389,12 +1369,9 @@ async fn submission_loop(
|
||||
}
|
||||
}
|
||||
|
||||
// Install the new persistent context for subsequent tasks/turns.
|
||||
turn_context = Arc::new(fresh_turn_context);
|
||||
|
||||
// no current task, spawn a new one with the per-turn context
|
||||
sess.spawn_task(Arc::clone(&turn_context), sub.id, items, RegularTask)
|
||||
sess.spawn_task(Arc::clone(¤t_context), sub.id, items, RegularTask)
|
||||
.await;
|
||||
previous_context = Some(current_context);
|
||||
}
|
||||
}
|
||||
Op::ExecApproval { id, decision } => match decision {
|
||||
@@ -1500,6 +1477,7 @@ async fn submission_loop(
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
Op::Compact => {
|
||||
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
|
||||
// Attempt to inject input into current task
|
||||
if let Err(items) = sess
|
||||
.inject_input(vec![InputItem::Text {
|
||||
@@ -1569,6 +1547,7 @@ async fn submission_loop(
|
||||
sess.send_event(event).await;
|
||||
}
|
||||
Op::Review { review_request } => {
|
||||
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
|
||||
spawn_review_thread(
|
||||
sess.clone(),
|
||||
config.clone(),
|
||||
@@ -2588,14 +2567,13 @@ mod tests {
|
||||
let (session, turn_context) = make_session_and_context();
|
||||
let (rollout_items, expected) = sample_rollout(&session, &turn_context);
|
||||
|
||||
tokio_test::block_on(session.record_initial_history(
|
||||
&turn_context,
|
||||
InitialHistory::Resumed(ResumedHistory {
|
||||
tokio_test::block_on(session.record_initial_history(InitialHistory::Resumed(
|
||||
ResumedHistory {
|
||||
conversation_id: ConversationId::default(),
|
||||
history: rollout_items,
|
||||
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
|
||||
}),
|
||||
));
|
||||
},
|
||||
)));
|
||||
|
||||
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
|
||||
assert_eq!(expected, actual);
|
||||
@@ -2606,9 +2584,7 @@ mod tests {
|
||||
let (session, turn_context) = make_session_and_context();
|
||||
let (rollout_items, expected) = sample_rollout(&session, &turn_context);
|
||||
|
||||
tokio_test::block_on(
|
||||
session.record_initial_history(&turn_context, InitialHistory::Forked(rollout_items)),
|
||||
);
|
||||
tokio_test::block_on(session.record_initial_history(InitialHistory::Forked(rollout_items)));
|
||||
|
||||
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
|
||||
assert_eq!(expected, actual);
|
||||
@@ -2832,53 +2808,56 @@ mod tests {
|
||||
let config = Arc::new(config);
|
||||
let conversation_id = ConversationId::default();
|
||||
let otel_event_manager = otel_event_manager(conversation_id, config.as_ref());
|
||||
let client = ModelClient::new(
|
||||
config.clone(),
|
||||
None,
|
||||
otel_event_manager,
|
||||
config.model_provider.clone(),
|
||||
config.model_reasoning_effort,
|
||||
config.model_reasoning_summary,
|
||||
conversation_id,
|
||||
);
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &config.model_family,
|
||||
features: &config.features,
|
||||
});
|
||||
let turn_context = TurnContext {
|
||||
client,
|
||||
cwd: config.cwd.clone(),
|
||||
base_instructions: config.base_instructions.clone(),
|
||||
let auth_manager = AuthManager::shared(config.cwd.clone(), false);
|
||||
|
||||
let session_configuration = SessionConfiguration {
|
||||
provider: config.model_provider.clone(),
|
||||
model: config.model.clone(),
|
||||
model_reasoning_effort: config.model_reasoning_effort,
|
||||
model_reasoning_summary: config.model_reasoning_summary,
|
||||
user_instructions: config.user_instructions.clone(),
|
||||
base_instructions: config.base_instructions.clone(),
|
||||
approval_policy: config.approval_policy,
|
||||
sandbox_policy: config.sandbox_policy.clone(),
|
||||
shell_environment_policy: config.shell_environment_policy.clone(),
|
||||
tools_config,
|
||||
is_review_mode: false,
|
||||
final_output_json_schema: None,
|
||||
cwd: config.cwd.clone(),
|
||||
original_config_do_not_use: Arc::clone(&config),
|
||||
};
|
||||
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: McpConnectionManager::default(),
|
||||
session_manager: ExecSessionManager::default(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
notifier: UserNotifier::default(),
|
||||
notifier: UserNotifier::new(None),
|
||||
rollout: Mutex::new(None),
|
||||
user_shell: shell::Shell::Unknown,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
executor: Executor::new(ExecutorConfig::new(
|
||||
turn_context.sandbox_policy.clone(),
|
||||
turn_context.cwd.clone(),
|
||||
session_configuration.sandbox_policy.clone(),
|
||||
session_configuration.cwd.clone(),
|
||||
None,
|
||||
)),
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
otel_event_manager: otel_event_manager.clone(),
|
||||
};
|
||||
|
||||
let session = Session {
|
||||
conversation_id,
|
||||
tx_event,
|
||||
state: Mutex::new(SessionState::new()),
|
||||
state: Mutex::new(state),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
};
|
||||
|
||||
let turn_context = Session::make_turn_context(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
&otel_event_manager,
|
||||
session_configuration.provider.clone(),
|
||||
&session_configuration,
|
||||
conversation_id,
|
||||
);
|
||||
(session, turn_context)
|
||||
}
|
||||
|
||||
@@ -2900,53 +2879,56 @@ mod tests {
|
||||
let config = Arc::new(config);
|
||||
let conversation_id = ConversationId::default();
|
||||
let otel_event_manager = otel_event_manager(conversation_id, config.as_ref());
|
||||
let client = ModelClient::new(
|
||||
config.clone(),
|
||||
None,
|
||||
otel_event_manager,
|
||||
config.model_provider.clone(),
|
||||
config.model_reasoning_effort,
|
||||
config.model_reasoning_summary,
|
||||
conversation_id,
|
||||
);
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &config.model_family,
|
||||
features: &config.features,
|
||||
});
|
||||
let turn_context = Arc::new(TurnContext {
|
||||
client,
|
||||
cwd: config.cwd.clone(),
|
||||
base_instructions: config.base_instructions.clone(),
|
||||
let auth_manager = AuthManager::shared(config.cwd.clone(), false);
|
||||
|
||||
let session_configuration = SessionConfiguration {
|
||||
provider: config.model_provider.clone(),
|
||||
model: config.model.clone(),
|
||||
model_reasoning_effort: config.model_reasoning_effort,
|
||||
model_reasoning_summary: config.model_reasoning_summary,
|
||||
user_instructions: config.user_instructions.clone(),
|
||||
base_instructions: config.base_instructions.clone(),
|
||||
approval_policy: config.approval_policy,
|
||||
sandbox_policy: config.sandbox_policy.clone(),
|
||||
shell_environment_policy: config.shell_environment_policy.clone(),
|
||||
tools_config,
|
||||
is_review_mode: false,
|
||||
final_output_json_schema: None,
|
||||
});
|
||||
cwd: config.cwd.clone(),
|
||||
original_config_do_not_use: Arc::clone(&config),
|
||||
};
|
||||
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: McpConnectionManager::default(),
|
||||
session_manager: ExecSessionManager::default(),
|
||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||
notifier: UserNotifier::default(),
|
||||
notifier: UserNotifier::new(None),
|
||||
rollout: Mutex::new(None),
|
||||
user_shell: shell::Shell::Unknown,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
executor: Executor::new(ExecutorConfig::new(
|
||||
config.sandbox_policy.clone(),
|
||||
config.cwd.clone(),
|
||||
session_configuration.sandbox_policy.clone(),
|
||||
session_configuration.cwd.clone(),
|
||||
None,
|
||||
)),
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
otel_event_manager: otel_event_manager.clone(),
|
||||
};
|
||||
|
||||
let session = Arc::new(Session {
|
||||
conversation_id,
|
||||
tx_event,
|
||||
state: Mutex::new(SessionState::new()),
|
||||
state: Mutex::new(state),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
});
|
||||
|
||||
let turn_context = Arc::new(Session::make_turn_context(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
&otel_event_manager,
|
||||
session_configuration.provider.clone(),
|
||||
&session_configuration,
|
||||
conversation_id,
|
||||
));
|
||||
(session, turn_context, rx_event)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user