diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 3b3d43bf..1843ffd5 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -9,6 +9,7 @@ use std::sync::atomic::AtomicU64; use std::time::Duration; use crate::AuthManager; +use crate::event_mapping::map_response_item_to_event_messages; use async_channel::Receiver; use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; @@ -75,9 +76,7 @@ use crate::project_doc::get_user_instructions; use crate::protocol::AgentMessageDeltaEvent; use crate::protocol::AgentMessageEvent; use crate::protocol::AgentReasoningDeltaEvent; -use crate::protocol::AgentReasoningEvent; use crate::protocol::AgentReasoningRawContentDeltaEvent; -use crate::protocol::AgentReasoningRawContentEvent; use crate::protocol::AgentReasoningSectionBreakEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; use crate::protocol::AskForApproval; @@ -102,7 +101,6 @@ use crate::protocol::Submission; use crate::protocol::TaskCompleteEvent; use crate::protocol::TurnDiffEvent; use crate::protocol::WebSearchBeginEvent; -use crate::protocol::WebSearchEndEvent; use crate::rollout::RolloutRecorder; use crate::safety::SafetyCheck; use crate::safety::assess_command_safety; @@ -117,12 +115,9 @@ use codex_protocol::custom_prompts::CustomPrompt; use codex_protocol::models::ContentItem; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::LocalShellAction; -use codex_protocol::models::ReasoningItemContent; -use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::models::ShellToolCallParams; -use codex_protocol::models::WebSearchAction; // A convenience extension trait for acquiring mutex locks where poisoning is // unrecoverable and should abort the program. This avoids scattered `.unwrap()` @@ -199,6 +194,7 @@ impl Codex { config.clone(), auth_manager.clone(), tx_event.clone(), + conversation_history.clone(), ) .await .map_err(|e| { @@ -361,6 +357,7 @@ impl Session { config: Arc, auth_manager: Arc, tx_event: Sender, + initial_history: InitialHistory, ) -> anyhow::Result<(Arc, TurnContext)> { let session_id = Uuid::new_v4(); let ConfigureSession { @@ -480,6 +477,11 @@ impl Session { }); // Dispatch the SessionConfiguredEvent first and then report any errors. + // If resuming, include converted initial messages in the payload so UIs can render them immediately. + let initial_messages = match &initial_history { + InitialHistory::New => None, + InitialHistory::Resumed(items) => Some(sess.build_initial_messages(items)), + }; let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { @@ -487,6 +489,7 @@ impl Session { model, history_log_id, history_entry_count, + initial_messages, }), }) .chain(post_session_configured_error_events.into_iter()); @@ -552,6 +555,17 @@ impl Session { self.record_conversation_items(&items).await; } + /// build the initial messages vector for SessionConfigured by converting + /// ResponseItems into EventMsg. + fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec { + items + .iter() + .flat_map(|item| { + map_response_item_to_event_messages(item, self.show_raw_agent_reasoning) + }) + .collect() + } + /// Sends the given event to the client and swallows the send event, if /// any, logging it as an error. pub(crate) async fn send_event(&self, event: Event) { @@ -1903,53 +1917,6 @@ async fn handle_response_item( ) -> CodexResult> { debug!(?item, "Output item"); let output = match item { - ResponseItem::Message { content, .. } => { - for item in content { - if let ContentItem::OutputText { text } = item { - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }), - }; - sess.tx_event.send(event).await.ok(); - } - } - None - } - ResponseItem::Reasoning { - id: _, - summary, - content, - encrypted_content: _, - } => { - for item in summary { - let text = match item { - ReasoningItemReasoningSummary::SummaryText { text } => text, - }; - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }), - }; - sess.tx_event.send(event).await.ok(); - } - if sess.show_raw_agent_reasoning - && let Some(content) = content - { - for item in content { - let text = match item { - ReasoningItemContent::ReasoningText { text } => text, - ReasoningItemContent::Text { text } => text, - }; - let event = Event { - id: sub_id.to_string(), - msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { - text, - }), - }; - sess.tx_event.send(event).await.ok(); - } - } - None - } ResponseItem::FunctionCall { name, arguments, @@ -2039,12 +2006,14 @@ async fn handle_response_item( debug!("unexpected CustomToolCallOutput from stream"); None } - ResponseItem::WebSearchCall { id, action, .. } => { - if let WebSearchAction::Search { query } = action { - let call_id = id.unwrap_or_else(|| "".to_string()); + ResponseItem::Message { .. } + | ResponseItem::Reasoning { .. } + | ResponseItem::WebSearchCall { .. } => { + let msgs = map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning); + for msg in msgs { let event = Event { id: sub_id.to_string(), - msg: EventMsg::WebSearchEnd(WebSearchEndEvent { call_id, query }), + msg, }; sess.tx_event.send(event).await.ok(); } diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs new file mode 100644 index 00000000..ab2d2031 --- /dev/null +++ b/codex-rs/core/src/event_mapping.rs @@ -0,0 +1,74 @@ +use crate::protocol::AgentMessageEvent; +use crate::protocol::AgentReasoningEvent; +use crate::protocol::AgentReasoningRawContentEvent; +use crate::protocol::EventMsg; +use crate::protocol::WebSearchEndEvent; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ReasoningItemContent; +use codex_protocol::models::ReasoningItemReasoningSummary; +use codex_protocol::models::ResponseItem; +use codex_protocol::models::WebSearchAction; + +/// Convert a `ResponseItem` into zero or more `EventMsg` values that the UI can render. +/// +/// When `show_raw_agent_reasoning` is false, raw reasoning content events are omitted. +pub(crate) fn map_response_item_to_event_messages( + item: &ResponseItem, + show_raw_agent_reasoning: bool, +) -> Vec { + match item { + ResponseItem::Message { content, .. } => { + let mut events = Vec::new(); + for content_item in content { + if let ContentItem::OutputText { text } = content_item { + events.push(EventMsg::AgentMessage(AgentMessageEvent { + message: text.clone(), + })); + } + } + events + } + + ResponseItem::Reasoning { + summary, content, .. + } => { + let mut events = Vec::new(); + for ReasoningItemReasoningSummary::SummaryText { text } in summary { + events.push(EventMsg::AgentReasoning(AgentReasoningEvent { + text: text.clone(), + })); + } + if let Some(items) = content.as_ref().filter(|_| show_raw_agent_reasoning) { + for c in items { + let text = match c { + ReasoningItemContent::ReasoningText { text } + | ReasoningItemContent::Text { text } => text, + }; + events.push(EventMsg::AgentReasoningRawContent( + AgentReasoningRawContentEvent { text: text.clone() }, + )); + } + } + events + } + + ResponseItem::WebSearchCall { id, action, .. } => match action { + WebSearchAction::Search { query } => { + let call_id = id.clone().unwrap_or_else(|| "".to_string()); + vec![EventMsg::WebSearchEnd(WebSearchEndEvent { + call_id, + query: query.clone(), + })] + } + WebSearchAction::Other => Vec::new(), + }, + + // Variants that require side effects are handled by higher layers and do not emit events here. + ResponseItem::FunctionCall { .. } + | ResponseItem::FunctionCallOutput { .. } + | ResponseItem::LocalShellCall { .. } + | ResponseItem::CustomToolCall { .. } + | ResponseItem::CustomToolCallOutput { .. } + | ResponseItem::Other => Vec::new(), + } +} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 9f731298..38ac4846 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -40,6 +40,7 @@ pub use model_provider_info::WireApi; pub use model_provider_info::built_in_model_providers; pub use model_provider_info::create_oss_provider_with_base_url; mod conversation_manager; +mod event_mapping; pub use conversation_manager::ConversationManager; pub use conversation_manager::NewConversation; // Re-export common auth types for workspace consumers diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index dd152dcb..f69637fc 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -13,6 +13,7 @@ use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; use core_test_support::wait_for_event; use serde_json::json; +use std::io::Write; use tempfile::TempDir; use wiremock::Mock; use wiremock::MockServer; @@ -108,6 +109,107 @@ fn write_auth_json( fake_jwt } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn resume_includes_initial_messages_and_sends_prior_items() { + if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + // Create a fake rollout session file with one prior assistant message. + let tmpdir = TempDir::new().unwrap(); + let session_path = tmpdir.path().join("resume-session.jsonl"); + let mut f = std::fs::File::create(&session_path).unwrap(); + // First line: meta (content not used by reader other than non-empty) + writeln!(f, "{}", serde_json::json!({"meta":"test"})).unwrap(); + + // Prior item: assistant message + let prior_item = codex_protocol::models::ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![codex_protocol::models::ContentItem::OutputText { + text: "resumed assistant message".to_string(), + }], + }; + writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap(); + drop(f); + + // Mock server that will receive the resumed request + let server = MockServer::start().await; + let first = ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_raw(sse_completed("resp1"), "text/event-stream"); + Mock::given(method("POST")) + .and(path("/v1/responses")) + .respond_with(first) + .expect(1) + .mount(&server) + .await; + + // Configure Codex to resume from our file + let model_provider = ModelProviderInfo { + base_url: Some(format!("{}/v1", server.uri())), + ..built_in_model_providers()["openai"].clone() + }; + let codex_home = TempDir::new().unwrap(); + let mut config = load_default_config_for_test(&codex_home); + config.model_provider = model_provider; + config.experimental_resume = Some(session_path.clone()); + + let conversation_manager = + ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")); + let NewConversation { + conversation: codex, + session_configured, + .. + } = conversation_manager + .new_conversation(config) + .await + .expect("create new conversation"); + + // 1) Assert initial_messages contains the prior assistant message as an EventMsg + let initial_msgs = session_configured + .initial_messages + .clone() + .expect("expected initial messages for resumed session"); + let initial_json = serde_json::to_value(&initial_msgs).unwrap(); + let expected_initial_json = serde_json::json!([ + { "type": "agent_message", "message": "resumed assistant message" } + ]); + assert_eq!(initial_json, expected_initial_json); + + // 2) Submit new input; the request body must include the prior item followed by the new user input. + codex + .submit(Op::UserInput { + items: vec![InputItem::Text { + text: "hello".into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let request = &server.received_requests().await.unwrap()[0]; + let request_body = request.body_json::().unwrap(); + let expected_input = serde_json::json!([ + { + "type": "message", + "id": null, + "role": "assistant", + "content": [{ "type": "output_text", "text": "resumed assistant message" }] + }, + { + "type": "message", + "id": null, + "role": "user", + "content": [{ "type": "input_text", "text": "hello" }] + } + ]); + assert_eq!(request_body["input"], expected_input); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn includes_session_id_and_model_headers_in_request() { if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index cc633277..ae0cf9ef 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -515,6 +515,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { model, history_log_id: _, history_entry_count: _, + initial_messages: _, } = session_configured_event; ts_println!( diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index abfc542e..c26c8e3d 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -260,6 +260,7 @@ mod tests { model: "gpt-4o".to_string(), history_log_id: 1, history_entry_count: 1000, + initial_messages: None, }), }; @@ -289,6 +290,7 @@ mod tests { model: "gpt-4o".to_string(), history_log_id: 1, history_entry_count: 1000, + initial_messages: None, }; let event = Event { id: "1".to_string(), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 315ec09f..2e58f778 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -839,6 +839,11 @@ pub struct SessionConfiguredEvent { /// Current number of entries in the history log. pub history_entry_count: usize, + + /// Optional initial messages (as events) for resumed sessions. + /// When present, UIs can use these to seed the history. + #[serde(skip_serializing_if = "Option::is_none")] + pub initial_messages: Option>, } /// User's decision in response to an ExecApprovalRequest. @@ -914,6 +919,7 @@ mod tests { model: "codex-mini-latest".to_string(), history_log_id: 0, history_entry_count: 0, + initial_messages: None, }), }; let serialized = serde_json::to_string(&event).unwrap(); diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 50f3e315..7295c8af 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -646,6 +646,7 @@ pub(crate) fn new_session_info( session_id: _, history_log_id: _, history_entry_count: _, + initial_messages: _, } = event; if is_first_event { let cwd_str = match relativize_to_home(&config.cwd) {