Add new thread items and rewire event parsing to use them (#5418)

1. Adds AgentMessage,  Reasoning,  WebSearch items.
2. Switches the ResponseItem parsing to use new items and then also emit
3. Removes user-item kind and filters out "special" (environment) user
items when returning to clients.
This commit is contained in:
pakrym-oai
2025-10-22 10:14:50 -07:00
committed by GitHub
parent 34c5a9eaa9
commit 3c90728a29
23 changed files with 775 additions and 476 deletions

View File

@@ -7,12 +7,11 @@ use std::sync::atomic::AtomicU64;
use crate::AuthManager;
use crate::client_common::REVIEW_PROMPT;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::function_tool::FunctionCallError;
use crate::mcp::auth::McpAuthStatusEntry;
use crate::parse_command::parse_command;
use crate::parse_turn_item;
use crate::review_format::format_review_findings_block;
use crate::state::ItemCollector;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
@@ -20,9 +19,10 @@ use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
@@ -270,7 +270,6 @@ pub(crate) struct TurnContext {
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) item_collector: ItemCollector,
}
impl TurnContext {
@@ -359,7 +358,6 @@ impl Session {
provider: ModelProviderInfo,
session_configuration: &SessionConfiguration,
conversation_id: ConversationId,
tx_event: Sender<Event>,
sub_id: String,
) -> TurnContext {
let config = session_configuration.original_config_do_not_use.clone();
@@ -394,8 +392,6 @@ impl Session {
features: &config.features,
});
let item_collector = ItemCollector::new(tx_event, conversation_id, sub_id.clone());
TurnContext {
sub_id,
client,
@@ -409,7 +405,6 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
item_collector,
}
}
@@ -665,7 +660,6 @@ impl Session {
session_configuration.provider.clone(),
&session_configuration,
self.conversation_id,
self.get_tx_event(),
sub_id,
);
if let Some(final_schema) = updates.final_output_json_schema {
@@ -710,6 +704,59 @@ impl Session {
}
}
async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) {
self.send_event(
turn_context,
EventMsg::ItemStarted(ItemStartedEvent {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item: item.clone(),
}),
)
.await;
}
async fn emit_turn_item_completed(
&self,
turn_context: &TurnContext,
item: TurnItem,
emit_raw_agent_reasoning: bool,
) {
self.send_event(
turn_context,
EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item: item.clone(),
}),
)
.await;
self.emit_turn_item_legacy_events(turn_context, &item, emit_raw_agent_reasoning)
.await;
}
async fn emit_turn_item_started_completed(
&self,
turn_context: &TurnContext,
item: TurnItem,
emit_raw_agent_reasoning: bool,
) {
self.emit_turn_item_started(turn_context, &item).await;
self.emit_turn_item_completed(turn_context, item, emit_raw_agent_reasoning)
.await;
}
async fn emit_turn_item_legacy_events(
&self,
turn_context: &TurnContext,
item: &TurnItem,
emit_raw_agent_reasoning: bool,
) {
for event in item.as_legacy_events(emit_raw_agent_reasoning) {
self.send_event(turn_context, event).await;
}
}
/// Emit an exec approval request event and await the user's decision.
///
/// The request is keyed by `sub_id`/`call_id` so matching responses are delivered
@@ -946,24 +993,22 @@ impl Session {
/// Record a user input item to conversation history and also persist a
/// corresponding UserMessage EventMsg to rollout.
async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) {
async fn record_input_and_rollout_usermsg(
&self,
turn_context: &TurnContext,
response_input: &ResponseInputItem,
) {
let response_item: ResponseItem = response_input.clone().into();
// Add to conversation history and persist response item to rollout
self.record_conversation_items(std::slice::from_ref(&response_item))
.await;
// Derive user message events and persist only UserMessage to rollout
let msgs =
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning());
let user_msgs: Vec<RolloutItem> = msgs
.into_iter()
.filter_map(|m| match m {
EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))),
_ => None,
})
.collect();
if !user_msgs.is_empty() {
self.persist_rollout_items(&user_msgs).await;
let turn_item = parse_turn_item(&response_item);
if let Some(item @ TurnItem::UserMessage(_)) = turn_item {
self.emit_turn_item_started_completed(turn_context, item, false)
.await;
}
}
@@ -1158,19 +1203,8 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
{
sess.record_conversation_items(std::slice::from_ref(&env_item))
.await;
for msg in map_response_item_to_event_messages(
&env_item,
sess.show_raw_agent_reasoning(),
) {
sess.send_event(&current_context, msg).await;
}
}
current_context
.item_collector
.started_completed(TurnItem::UserMessage(UserMessageItem::new(&items)))
.await;
sess.spawn_task(Arc::clone(&current_context), items, RegularTask)
.await;
previous_context = Some(current_context);
@@ -1444,11 +1478,6 @@ async fn spawn_review_thread(
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(
sess.get_tx_event(),
sess.conversation_id,
sub_id.to_string(),
),
};
// Seed the child task with the review prompt as the initial user message.
@@ -1506,7 +1535,7 @@ pub(crate) async fn run_task(
review_thread_history.extend(sess.build_initial_context(turn_context.as_ref()));
review_thread_history.push(initial_input_for_turn.into());
} else {
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
.await;
}
@@ -2023,9 +2052,10 @@ async fn try_run_turn(
}
Ok(None) => {
let response = handle_non_tool_response_item(
Arc::clone(&sess),
sess.as_ref(),
Arc::clone(&turn_context),
item.clone(),
sess.show_raw_agent_reasoning(),
)
.await?;
add_completed(ProcessedResponseItem { item, response });
@@ -2144,9 +2174,10 @@ async fn try_run_turn(
}
async fn handle_non_tool_response_item(
sess: Arc<Session>,
sess: &Session,
turn_context: Arc<TurnContext>,
item: ResponseItem,
show_raw_agent_reasoning: bool,
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");
@@ -2154,15 +2185,20 @@ async fn handle_non_tool_response_item(
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let msgs = match &item {
let turn_item = match &item {
ResponseItem::Message { .. } if turn_context.is_review_mode => {
trace!("suppressing assistant Message in review mode");
Vec::new()
None
}
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning()),
_ => parse_turn_item(&item),
};
for msg in msgs {
sess.send_event(&turn_context, msg).await;
if let Some(turn_item) = turn_item {
sess.emit_turn_item_started_completed(
turn_context.as_ref(),
turn_item,
show_raw_agent_reasoning,
)
.await;
}
}
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
@@ -2649,7 +2685,6 @@ mod tests {
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
"turn_id".to_string(),
);
@@ -2718,7 +2753,6 @@ mod tests {
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
"turn_id".to_string(),
));