Replay EventMsgs from Response Items when resuming a session with history. (#3123)

### Overview

This PR introduces the following changes:
	1.	Adds a unified mechanism to convert ResponseItem into EventMsg.
2. Ensures that when a session is initialized with initial history, a
vector of EventMsg is sent along with the session configuration. This
allows clients to re-render the UI accordingly.
	3. 	Added integration testing

### Caveats

This implementation does not send every EventMsg that was previously
dispatched to clients. The excluded events fall into two categories:
	•	“Arguably” rolled-out events
Examples include tool calls and apply-patch calls. While these events
are conceptually rolled out, we currently only roll out ResponseItems.
These events are already being handled elsewhere and transformed into
EventMsg before being sent.
	•	Non-rolled-out events
Certain events such as TurnDiff, Error, and TokenCount are not rolled
out at all.

### Future Directions

At present, resuming a session involves maintaining two states:
	•	UI State
Clients can replay most of the important UI from the provided EventMsg
history.
	•	Model State
The model receives the complete session history to reconstruct its
internal state.

This design provides a solid foundation. If, in the future, more precise
UI reconstruction is needed, we have two potential paths:
1. Introduce a third data structure that allows us to derive both
ResponseItems and EventMsgs.
2. Clearly divide responsibilities: the core system ensures the
integrity of the model state, while clients are responsible for
reconstructing the UI.
This commit is contained in:
Ahmed Ibrahim
2025-09-03 21:47:00 -07:00
committed by GitHub
parent bea64569c1
commit f2036572b6
8 changed files with 213 additions and 57 deletions

View File

@@ -9,6 +9,7 @@ use std::sync::atomic::AtomicU64;
use std::time::Duration;
use crate::AuthManager;
use crate::event_mapping::map_response_item_to_event_messages;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
@@ -75,9 +76,7 @@ use crate::project_doc::get_user_instructions;
use crate::protocol::AgentMessageDeltaEvent;
use crate::protocol::AgentMessageEvent;
use crate::protocol::AgentReasoningDeltaEvent;
use crate::protocol::AgentReasoningEvent;
use crate::protocol::AgentReasoningRawContentDeltaEvent;
use crate::protocol::AgentReasoningRawContentEvent;
use crate::protocol::AgentReasoningSectionBreakEvent;
use crate::protocol::ApplyPatchApprovalRequestEvent;
use crate::protocol::AskForApproval;
@@ -102,7 +101,6 @@ use crate::protocol::Submission;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnDiffEvent;
use crate::protocol::WebSearchBeginEvent;
use crate::protocol::WebSearchEndEvent;
use crate::rollout::RolloutRecorder;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
@@ -117,12 +115,9 @@ use codex_protocol::custom_prompts::CustomPrompt;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::LocalShellAction;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::ShellToolCallParams;
use codex_protocol::models::WebSearchAction;
// A convenience extension trait for acquiring mutex locks where poisoning is
// unrecoverable and should abort the program. This avoids scattered `.unwrap()`
@@ -199,6 +194,7 @@ impl Codex {
config.clone(),
auth_manager.clone(),
tx_event.clone(),
conversation_history.clone(),
)
.await
.map_err(|e| {
@@ -361,6 +357,7 @@ impl Session {
config: Arc<Config>,
auth_manager: Arc<AuthManager>,
tx_event: Sender<Event>,
initial_history: InitialHistory,
) -> anyhow::Result<(Arc<Self>, TurnContext)> {
let session_id = Uuid::new_v4();
let ConfigureSession {
@@ -480,6 +477,11 @@ impl Session {
});
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = match &initial_history {
InitialHistory::New => None,
InitialHistory::Resumed(items) => Some(sess.build_initial_messages(items)),
};
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
@@ -487,6 +489,7 @@ impl Session {
model,
history_log_id,
history_entry_count,
initial_messages,
}),
})
.chain(post_session_configured_error_events.into_iter());
@@ -552,6 +555,17 @@ impl Session {
self.record_conversation_items(&items).await;
}
/// build the initial messages vector for SessionConfigured by converting
/// ResponseItems into EventMsg.
fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec<EventMsg> {
items
.iter()
.flat_map(|item| {
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
})
.collect()
}
/// Sends the given event to the client and swallows the send event, if
/// any, logging it as an error.
pub(crate) async fn send_event(&self, event: Event) {
@@ -1903,53 +1917,6 @@ async fn handle_response_item(
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");
let output = match item {
ResponseItem::Message { content, .. } => {
for item in content {
if let ContentItem::OutputText { text } = item {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
};
sess.tx_event.send(event).await.ok();
}
}
None
}
ResponseItem::Reasoning {
id: _,
summary,
content,
encrypted_content: _,
} => {
for item in summary {
let text = match item {
ReasoningItemReasoningSummary::SummaryText { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
if sess.show_raw_agent_reasoning
&& let Some(content) = content
{
for item in content {
let text = match item {
ReasoningItemContent::ReasoningText { text } => text,
ReasoningItemContent::Text { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
text,
}),
};
sess.tx_event.send(event).await.ok();
}
}
None
}
ResponseItem::FunctionCall {
name,
arguments,
@@ -2039,12 +2006,14 @@ async fn handle_response_item(
debug!("unexpected CustomToolCallOutput from stream");
None
}
ResponseItem::WebSearchCall { id, action, .. } => {
if let WebSearchAction::Search { query } = action {
let call_id = id.unwrap_or_else(|| "".to_string());
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let msgs = map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning);
for msg in msgs {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchEnd(WebSearchEndEvent { call_id, query }),
msg,
};
sess.tx_event.send(event).await.ok();
}

View File

@@ -0,0 +1,74 @@
use crate::protocol::AgentMessageEvent;
use crate::protocol::AgentReasoningEvent;
use crate::protocol::AgentReasoningRawContentEvent;
use crate::protocol::EventMsg;
use crate::protocol::WebSearchEndEvent;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::WebSearchAction;
/// Convert a `ResponseItem` into zero or more `EventMsg` values that the UI can render.
///
/// When `show_raw_agent_reasoning` is false, raw reasoning content events are omitted.
pub(crate) fn map_response_item_to_event_messages(
item: &ResponseItem,
show_raw_agent_reasoning: bool,
) -> Vec<EventMsg> {
match item {
ResponseItem::Message { content, .. } => {
let mut events = Vec::new();
for content_item in content {
if let ContentItem::OutputText { text } = content_item {
events.push(EventMsg::AgentMessage(AgentMessageEvent {
message: text.clone(),
}));
}
}
events
}
ResponseItem::Reasoning {
summary, content, ..
} => {
let mut events = Vec::new();
for ReasoningItemReasoningSummary::SummaryText { text } in summary {
events.push(EventMsg::AgentReasoning(AgentReasoningEvent {
text: text.clone(),
}));
}
if let Some(items) = content.as_ref().filter(|_| show_raw_agent_reasoning) {
for c in items {
let text = match c {
ReasoningItemContent::ReasoningText { text }
| ReasoningItemContent::Text { text } => text,
};
events.push(EventMsg::AgentReasoningRawContent(
AgentReasoningRawContentEvent { text: text.clone() },
));
}
}
events
}
ResponseItem::WebSearchCall { id, action, .. } => match action {
WebSearchAction::Search { query } => {
let call_id = id.clone().unwrap_or_else(|| "".to_string());
vec![EventMsg::WebSearchEnd(WebSearchEndEvent {
call_id,
query: query.clone(),
})]
}
WebSearchAction::Other => Vec::new(),
},
// Variants that require side effects are handled by higher layers and do not emit events here.
ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::Other => Vec::new(),
}
}

View File

@@ -40,6 +40,7 @@ pub use model_provider_info::WireApi;
pub use model_provider_info::built_in_model_providers;
pub use model_provider_info::create_oss_provider_with_base_url;
mod conversation_manager;
mod event_mapping;
pub use conversation_manager::ConversationManager;
pub use conversation_manager::NewConversation;
// Re-export common auth types for workspace consumers

View File

@@ -13,6 +13,7 @@ use core_test_support::load_default_config_for_test;
use core_test_support::load_sse_fixture_with_id;
use core_test_support::wait_for_event;
use serde_json::json;
use std::io::Write;
use tempfile::TempDir;
use wiremock::Mock;
use wiremock::MockServer;
@@ -108,6 +109,107 @@ fn write_auth_json(
fake_jwt
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resume_includes_initial_messages_and_sends_prior_items() {
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// Create a fake rollout session file with one prior assistant message.
let tmpdir = TempDir::new().unwrap();
let session_path = tmpdir.path().join("resume-session.jsonl");
let mut f = std::fs::File::create(&session_path).unwrap();
// First line: meta (content not used by reader other than non-empty)
writeln!(f, "{}", serde_json::json!({"meta":"test"})).unwrap();
// Prior item: assistant message
let prior_item = codex_protocol::models::ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![codex_protocol::models::ContentItem::OutputText {
text: "resumed assistant message".to_string(),
}],
};
writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap();
drop(f);
// Mock server that will receive the resumed request
let server = MockServer::start().await;
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
// Configure Codex to resume from our file
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
config.experimental_resume = Some(session_path.clone());
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
let NewConversation {
conversation: codex,
session_configured,
..
} = conversation_manager
.new_conversation(config)
.await
.expect("create new conversation");
// 1) Assert initial_messages contains the prior assistant message as an EventMsg
let initial_msgs = session_configured
.initial_messages
.clone()
.expect("expected initial messages for resumed session");
let initial_json = serde_json::to_value(&initial_msgs).unwrap();
let expected_initial_json = serde_json::json!([
{ "type": "agent_message", "message": "resumed assistant message" }
]);
assert_eq!(initial_json, expected_initial_json);
// 2) Submit new input; the request body must include the prior item followed by the new user input.
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let request = &server.received_requests().await.unwrap()[0];
let request_body = request.body_json::<serde_json::Value>().unwrap();
let expected_input = serde_json::json!([
{
"type": "message",
"id": null,
"role": "assistant",
"content": [{ "type": "output_text", "text": "resumed assistant message" }]
},
{
"type": "message",
"id": null,
"role": "user",
"content": [{ "type": "input_text", "text": "hello" }]
}
]);
assert_eq!(request_body["input"], expected_input);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn includes_session_id_and_model_headers_in_request() {
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {

View File

@@ -515,6 +515,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
model,
history_log_id: _,
history_entry_count: _,
initial_messages: _,
} = session_configured_event;
ts_println!(

View File

@@ -260,6 +260,7 @@ mod tests {
model: "gpt-4o".to_string(),
history_log_id: 1,
history_entry_count: 1000,
initial_messages: None,
}),
};
@@ -289,6 +290,7 @@ mod tests {
model: "gpt-4o".to_string(),
history_log_id: 1,
history_entry_count: 1000,
initial_messages: None,
};
let event = Event {
id: "1".to_string(),

View File

@@ -839,6 +839,11 @@ pub struct SessionConfiguredEvent {
/// Current number of entries in the history log.
pub history_entry_count: usize,
/// Optional initial messages (as events) for resumed sessions.
/// When present, UIs can use these to seed the history.
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_messages: Option<Vec<EventMsg>>,
}
/// User's decision in response to an ExecApprovalRequest.
@@ -914,6 +919,7 @@ mod tests {
model: "codex-mini-latest".to_string(),
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
}),
};
let serialized = serde_json::to_string(&event).unwrap();

View File

@@ -646,6 +646,7 @@ pub(crate) fn new_session_info(
session_id: _,
history_log_id: _,
history_entry_count: _,
initial_messages: _,
} = event;
if is_first_event {
let cwd_str = match relativize_to_home(&config.cwd) {