Add ItemStarted/ItemCompleted events for UserInputItem (#5306)

Adds a new ItemStarted event and delivers UserMessage as the first item
type (more to come).


Renames `InputItem` to `UserInput` considering we're using the `Item`
suffix for actual items.
This commit is contained in:
pakrym-oai
2025-10-20 13:34:44 -07:00
committed by GitHub
parent 5e4f3bbb0b
commit 9c903c4716
49 changed files with 435 additions and 202 deletions

View File

@@ -10,12 +10,15 @@ use crate::event_mapping::map_response_item_to_event_messages;
use crate::function_tool::FunctionCallError;
use crate::parse_command::parse_command;
use crate::review_format::format_review_findings_block;
use crate::state::ItemCollector;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::McpAuthStatus;
@@ -77,7 +80,6 @@ use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecApprovalRequestEvent;
use crate::protocol::InputItem;
use crate::protocol::ListCustomPromptsResponseEvent;
use crate::protocol::Op;
use crate::protocol::RateLimitSnapshot;
@@ -122,6 +124,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::user_input::UserInput;
pub mod compact;
use self::compact::build_compacted_history;
@@ -264,6 +267,7 @@ pub(crate) struct TurnContext {
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) item_collector: ItemCollector,
}
impl TurnContext {
@@ -352,6 +356,7 @@ impl Session {
provider: ModelProviderInfo,
session_configuration: &SessionConfiguration,
conversation_id: ConversationId,
tx_event: Sender<Event>,
) -> TurnContext {
let config = session_configuration.original_config_do_not_use.clone();
let model_family = find_family_for_model(&session_configuration.model)
@@ -397,6 +402,7 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(tx_event, conversation_id, "turn_id".to_string()),
}
}
@@ -656,6 +662,7 @@ impl Session {
session_configuration.provider.clone(),
&session_configuration,
self.conversation_id,
self.get_tx_event(),
);
if let Some(final_schema) = updates.final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
@@ -986,7 +993,7 @@ impl Session {
}
/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
@@ -1157,6 +1164,11 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
}
}
current_context
.item_collector
.started_completed(TurnItem::UserMessage(UserMessageItem::new(&items)))
.await;
sess.spawn_task(Arc::clone(&current_context), sub.id, items, RegularTask)
.await;
previous_context = Some(current_context);
@@ -1268,7 +1280,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![InputItem::Text {
.inject_input(vec![UserInput::Text {
text: compact::SUMMARIZATION_PROMPT.to_string(),
}])
.await
@@ -1422,10 +1434,15 @@ async fn spawn_review_thread(
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(
sess.get_tx_event(),
sess.conversation_id,
sub_id.to_string(),
),
};
// Seed the child task with the review prompt as the initial user message.
let input: Vec<InputItem> = vec![InputItem::Text {
let input: Vec<UserInput> = vec![UserInput::Text {
text: review_prompt,
}];
let tc = Arc::new(review_turn_context);
@@ -1463,7 +1480,7 @@ pub(crate) async fn run_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
task_kind: TaskKind,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -2624,6 +2641,15 @@ mod tests {
tool_approvals: Mutex::new(ApprovalStore::default()),
};
let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
);
let session = Session {
conversation_id,
tx_event,
@@ -2633,13 +2659,6 @@ mod tests {
next_internal_sub_id: AtomicU64::new(0),
};
let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
);
(session, turn_context)
}
@@ -2690,6 +2709,15 @@ mod tests {
tool_approvals: Mutex::new(ApprovalStore::default()),
};
let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
));
let session = Arc::new(Session {
conversation_id,
tx_event,
@@ -2699,13 +2727,6 @@ mod tests {
next_internal_sub_id: AtomicU64::new(0),
});
let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
));
(session, turn_context, rx_event)
}
@@ -2726,7 +2747,7 @@ mod tests {
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_sub_id: String,
_input: Vec<InputItem>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
if self.listen_to_cancellation_token {
@@ -2750,7 +2771,7 @@ mod tests {
async fn abort_regular_task_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-regular".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "hello".to_string(),
}];
sess.spawn_task(
@@ -2781,7 +2802,7 @@ mod tests {
async fn abort_gracefuly_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-regular".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "hello".to_string(),
}];
sess.spawn_task(
@@ -2809,7 +2830,7 @@ mod tests {
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-review".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "start review".to_string(),
}];
sess.spawn_task(

View File

@@ -12,7 +12,6 @@ use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::InputItem;
use crate::protocol::InputMessageKind;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnContextItem;
@@ -24,6 +23,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use futures::prelude::*;
pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
@@ -41,7 +41,7 @@ pub(crate) async fn run_inline_auto_compact_task(
turn_context: Arc<TurnContext>,
) {
let sub_id = sess.next_internal_sub_id();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: SUMMARIZATION_PROMPT.to_string(),
}];
run_compact_task_inner(sess, turn_context, sub_id, input).await;
@@ -51,7 +51,7 @@ pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
) -> Option<String> {
let start_event = Event {
id: sub_id.clone(),
@@ -68,7 +68,7 @@ async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut turn_input = sess

View File

@@ -71,6 +71,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::ConversationPath(_) => false,
| EventMsg::ConversationPath(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_) => false,
}
}

View File

@@ -0,0 +1,68 @@
use async_channel::Sender;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use tracing::error;
#[derive(Debug)]
pub(crate) struct ItemCollector {
thread_id: ConversationId,
turn_id: String,
tx_event: Sender<Event>,
}
impl ItemCollector {
pub fn new(
tx_event: Sender<Event>,
thread_id: ConversationId,
turn_id: String,
) -> ItemCollector {
ItemCollector {
tx_event,
thread_id,
turn_id,
}
}
pub async fn started(&self, item: TurnItem) {
let err = self
.tx_event
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemStarted(ItemStartedEvent {
thread_id: self.thread_id,
turn_id: self.turn_id.clone(),
item,
}),
})
.await;
if let Err(e) = err {
error!("failed to send item started event: {e}");
}
}
pub async fn completed(&self, item: TurnItem) {
let err = self
.tx_event
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id: self.thread_id,
turn_id: self.turn_id.clone(),
item,
}),
})
.await;
if let Err(e) = err {
error!("failed to send item completed event: {e}");
}
}
pub async fn started_completed(&self, item: TurnItem) {
self.started(item.clone()).await;
self.completed(item).await;
}
}

View File

@@ -1,7 +1,9 @@
mod item_collector;
mod service;
mod session;
mod turn;
pub(crate) use item_collector::ItemCollector;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;

View File

@@ -5,8 +5,8 @@ use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::codex::compact;
use crate::protocol::InputItem;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
use super::SessionTask;
use super::SessionTaskContext;
@@ -25,7 +25,7 @@ impl SessionTask for CompactTask {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
compact::run_compact_task(session.clone_session(), ctx, sub_id, input).await

View File

@@ -17,13 +17,13 @@ use crate::codex::Session;
use crate::codex::TurnContext;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::InputItem;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
pub(crate) use compact::CompactTask;
pub(crate) use regular::RegularTask;
@@ -56,7 +56,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String>;
@@ -70,7 +70,7 @@ impl Session {
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
task: T,
) {
self.abort_all_tasks(TurnAbortReason::Replaced).await;

View File

@@ -5,8 +5,8 @@ use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::codex::run_task;
use crate::protocol::InputItem;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
use super::SessionTask;
use super::SessionTaskContext;
@@ -25,7 +25,7 @@ impl SessionTask for RegularTask {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
let sess = session.clone_session();

View File

@@ -6,8 +6,8 @@ use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::codex::exit_review_mode;
use crate::codex::run_task;
use crate::protocol::InputItem;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
use super::SessionTask;
use super::SessionTaskContext;
@@ -26,7 +26,7 @@ impl SessionTask for ReviewTask {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
let sess = session.clone_session();

View File

@@ -5,13 +5,13 @@ use tokio::fs;
use crate::function_tool::FunctionCallError;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::InputItem;
use crate::protocol::ViewImageToolCallEvent;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use codex_protocol::user_input::UserInput;
pub struct ViewImageHandler;
@@ -67,7 +67,7 @@ impl ToolHandler for ViewImageHandler {
let event_path = abs_path.clone();
session
.inject_input(vec![InputItem::LocalImage { path: abs_path }])
.inject_input(vec![UserInput::LocalImage { path: abs_path }])
.await
.map_err(|_| {
FunctionCallError::RespondToModel(