From 2b96f9f5693e16006056a2a2b9048a9758dcbeff Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 3 Sep 2025 22:34:50 -0700 Subject: [PATCH] Dividing UserMsgs into categories to send it back to the tui (#3127) This PR does the following: - divides user msgs into 3 categories: plain, user instructions, and environment context - Centralizes adding user instructions and environment context to a degree - Improve the integration testing Building on top of #3123 Specifically this [comment](https://github.com/openai/codex/pull/3123#discussion_r2319885089). We need to send the user message while ignoring the User Instructions and Environment Context we attach. --- codex-rs/core/src/client_common.rs | 16 ------ codex-rs/core/src/codex.rs | 4 +- codex-rs/core/src/environment_context.rs | 10 ++-- codex-rs/core/src/event_mapping.rs | 40 ++++++++++++--- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/user_instructions.rs | 42 +++++++++++++++ codex-rs/core/tests/suite/client.rs | 40 +++++++++++++-- .../src/event_processor_with_human_output.rs | 1 + codex-rs/mcp-server/src/codex_tool_runner.rs | 1 + codex-rs/protocol/src/models.rs | 2 + codex-rs/protocol/src/protocol.rs | 51 +++++++++++++++++++ codex-rs/tui/src/chatwidget.rs | 1 + 12 files changed, 175 insertions(+), 34 deletions(-) create mode 100644 codex-rs/core/src/user_instructions.rs diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index e326ea33..d561d369 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -6,7 +6,6 @@ use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::config_types::Verbosity as VerbosityConfig; -use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use futures::Stream; use serde::Serialize; @@ -20,10 +19,6 @@ use tokio::sync::mpsc; /// with this content. const BASE_INSTRUCTIONS: &str = include_str!("../prompt.md"); -/// wraps user instructions message in a tag for the model to parse more easily. -const USER_INSTRUCTIONS_START: &str = "\n\n"; -const USER_INSTRUCTIONS_END: &str = "\n\n"; - /// API request payload for a single model turn #[derive(Default, Debug, Clone)] pub struct Prompt { @@ -68,17 +63,6 @@ impl Prompt { pub(crate) fn get_formatted_input(&self) -> Vec { self.input.clone() } - - /// Creates a formatted user instructions message from a string - pub(crate) fn format_user_instructions_message(ui: &str) -> ResponseItem { - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: format!("{USER_INSTRUCTIONS_START}{ui}{USER_INSTRUCTIONS_END}"), - }], - } - } } #[derive(Debug)] diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 1843ffd5..287bfe8f 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -107,6 +107,7 @@ use crate::safety::assess_command_safety; use crate::safety::assess_safety_for_untrusted_command; use crate::shell; use crate::turn_diff_tracker::TurnDiffTracker; +use crate::user_instructions::UserInstructions; use crate::user_notification::UserNotification; use crate::util::backoff; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; @@ -482,6 +483,7 @@ impl Session { InitialHistory::New => None, InitialHistory::Resumed(items) => Some(sess.build_initial_messages(items)), }; + let events = std::iter::once(Event { id: INITIAL_SUBMIT_ID.to_owned(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { @@ -540,7 +542,7 @@ impl Session { // TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages. let mut conversation_items = Vec::::with_capacity(2); if let Some(user_instructions) = turn_context.user_instructions.as_deref() { - conversation_items.push(Prompt::format_user_instructions_message(user_instructions)); + conversation_items.push(UserInstructions::new(user_instructions.to_string()).into()); } conversation_items.push(ResponseItem::from(EnvironmentContext::new( Some(turn_context.cwd.clone()), diff --git a/codex-rs/core/src/environment_context.rs b/codex-rs/core/src/environment_context.rs index b7ee8625..0a8b09d1 100644 --- a/codex-rs/core/src/environment_context.rs +++ b/codex-rs/core/src/environment_context.rs @@ -8,12 +8,10 @@ use crate::shell::Shell; use codex_protocol::config_types::SandboxMode; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::ENVIRONMENT_CONTEXT_CLOSE_TAG; +use codex_protocol::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG; use std::path::PathBuf; -/// wraps environment context message in a tag for the model to parse more easily. -pub(crate) const ENVIRONMENT_CONTEXT_START: &str = ""; -pub(crate) const ENVIRONMENT_CONTEXT_END: &str = ""; - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, DeriveDisplay)] #[serde(rename_all = "kebab-case")] #[strum(serialize_all = "kebab-case")] @@ -79,7 +77,7 @@ impl EnvironmentContext { /// /// ``` pub fn serialize_to_xml(self) -> String { - let mut lines = vec![ENVIRONMENT_CONTEXT_START.to_string()]; + let mut lines = vec![ENVIRONMENT_CONTEXT_OPEN_TAG.to_string()]; if let Some(cwd) = self.cwd { lines.push(format!(" {}", cwd.to_string_lossy())); } @@ -101,7 +99,7 @@ impl EnvironmentContext { { lines.push(format!(" {shell_name}")); } - lines.push(ENVIRONMENT_CONTEXT_END.to_string()); + lines.push(ENVIRONMENT_CONTEXT_CLOSE_TAG.to_string()); lines.join("\n") } } diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index ab2d2031..c9ad8a16 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -2,6 +2,8 @@ use crate::protocol::AgentMessageEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::AgentReasoningRawContentEvent; use crate::protocol::EventMsg; +use crate::protocol::InputMessageKind; +use crate::protocol::UserMessageEvent; use crate::protocol::WebSearchEndEvent; use codex_protocol::models::ContentItem; use codex_protocol::models::ReasoningItemContent; @@ -17,15 +19,37 @@ pub(crate) fn map_response_item_to_event_messages( show_raw_agent_reasoning: bool, ) -> Vec { match item { - ResponseItem::Message { content, .. } => { - let mut events = Vec::new(); - for content_item in content { - if let ContentItem::OutputText { text } = content_item { - events.push(EventMsg::AgentMessage(AgentMessageEvent { - message: text.clone(), - })); - } + ResponseItem::Message { role, content, .. } => { + // Do not surface system messages as user events. + if role == "system" { + return Vec::new(); } + + let events: Vec = content + .iter() + .filter_map(|content_item| match content_item { + ContentItem::OutputText { text } => { + Some(EventMsg::AgentMessage(AgentMessageEvent { + message: text.clone(), + })) + } + ContentItem::InputText { text } => { + let trimmed = text.trim_start(); + let kind = if trimmed.starts_with("") { + Some(InputMessageKind::EnvironmentContext) + } else if trimmed.starts_with("") { + Some(InputMessageKind::UserInstructions) + } else { + Some(InputMessageKind::Plain) + }; + Some(EventMsg::UserMessage(UserMessageEvent { + message: text.clone(), + kind, + })) + } + _ => None, + }) + .collect(); events } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 38ac4846..f24fa0cb 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -34,6 +34,7 @@ mod mcp_tool_call; mod message_history; mod model_provider_info; pub mod parse_command; +mod user_instructions; pub use model_provider_info::BUILT_IN_OSS_MODEL_PROVIDER_ID; pub use model_provider_info::ModelProviderInfo; pub use model_provider_info::WireApi; diff --git a/codex-rs/core/src/user_instructions.rs b/codex-rs/core/src/user_instructions.rs new file mode 100644 index 00000000..76054edc --- /dev/null +++ b/codex-rs/core/src/user_instructions.rs @@ -0,0 +1,42 @@ +use serde::Deserialize; +use serde::Serialize; + +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::USER_INSTRUCTIONS_CLOSE_TAG; +use codex_protocol::protocol::USER_INSTRUCTIONS_OPEN_TAG; + +/// Wraps user instructions in a tag so the model can classify them easily. + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename = "user_instructions", rename_all = "snake_case")] +pub(crate) struct UserInstructions { + text: String, +} + +impl UserInstructions { + pub fn new>(text: T) -> Self { + Self { text: text.into() } + } + + /// Serializes the user instructions to an XML-like tagged block that starts + /// with so clients can classify it. + pub fn serialize_to_xml(self) -> String { + format!( + "{USER_INSTRUCTIONS_OPEN_TAG}\n\n{}\n\n{USER_INSTRUCTIONS_CLOSE_TAG}", + self.text + ) + } +} + +impl From for ResponseItem { + fn from(ui: UserInstructions) -> Self { + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: ui.serialize_to_xml(), + }], + } + } +} diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index f69637fc..1bf7ef0c 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -118,12 +118,37 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { return; } - // Create a fake rollout session file with one prior assistant message. + // Create a fake rollout session file with prior user + system + assistant messages. let tmpdir = TempDir::new().unwrap(); let session_path = tmpdir.path().join("resume-session.jsonl"); let mut f = std::fs::File::create(&session_path).unwrap(); // First line: meta (content not used by reader other than non-empty) - writeln!(f, "{}", serde_json::json!({"meta":"test"})).unwrap(); + writeln!( + f, + "{}", + serde_json::json!({"meta":"test","instructions":"be nice"}) + ) + .unwrap(); + + // Prior item: user message (should be delivered) + let prior_user = codex_protocol::models::ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![codex_protocol::models::ContentItem::InputText { + text: "resumed user message".to_string(), + }], + }; + writeln!(f, "{}", serde_json::to_string(&prior_user).unwrap()).unwrap(); + + // Prior item: system message (excluded from API history) + let prior_system = codex_protocol::models::ResponseItem::Message { + id: None, + role: "system".to_string(), + content: vec![codex_protocol::models::ContentItem::OutputText { + text: "resumed system instruction".to_string(), + }], + }; + writeln!(f, "{}", serde_json::to_string(&prior_system).unwrap()).unwrap(); // Prior item: assistant message let prior_item = codex_protocol::models::ResponseItem::Message { @@ -157,6 +182,8 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { let mut config = load_default_config_for_test(&codex_home); config.model_provider = model_provider; config.experimental_resume = Some(session_path.clone()); + // Also configure user instructions to ensure they are NOT delivered on resume. + config.user_instructions = Some("be nice".to_string()); let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")); @@ -169,13 +196,14 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { .await .expect("create new conversation"); - // 1) Assert initial_messages contains the prior assistant message as an EventMsg + // 1) Assert initial_messages contains the prior user + assistant messages as EventMsg entries let initial_msgs = session_configured .initial_messages .clone() .expect("expected initial messages for resumed session"); let initial_json = serde_json::to_value(&initial_msgs).unwrap(); let expected_initial_json = serde_json::json!([ + { "type": "user_message", "message": "resumed user message", "kind": "plain" }, { "type": "agent_message", "message": "resumed assistant message" } ]); assert_eq!(initial_json, expected_initial_json); @@ -194,6 +222,12 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { let request = &server.received_requests().await.unwrap()[0]; let request_body = request.body_json::().unwrap(); let expected_input = serde_json::json!([ + { + "type": "message", + "id": null, + "role": "user", + "content": [{ "type": "input_text", "text": "resumed user message" }] + }, { "type": "message", "id": null, diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index ae0cf9ef..0ee60b93 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -552,6 +552,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { }, EventMsg::ShutdownComplete => return CodexStatus::Shutdown, EventMsg::ConversationHistory(_) => {} + EventMsg::UserMessage(_) => {} } CodexStatus::Running } diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index b297dadd..48d520b5 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -279,6 +279,7 @@ async fn run_codex_tool_session_inner( | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) | EventMsg::ConversationHistory(_) + | EventMsg::UserMessage(_) | EventMsg::ShutdownComplete => { // For now, we do not do anything extra for these // events. Note that diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index 6cdfd933..71b51709 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -309,6 +309,8 @@ impl std::ops::Deref for FunctionCallOutputPayload { } } +// (Moved event mapping logic into codex-core to avoid coupling protocol to UI-facing events.) + #[cfg(test)] mod tests { use super::*; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 2e58f778..7bcd818a 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -27,6 +27,13 @@ use crate::models::ResponseItem; use crate::parse_command::ParsedCommand; use crate::plan_tool::UpdatePlanArgs; +/// Open/close tags for special user-input blocks. Used across crates to avoid +/// duplicated hardcoded strings. +pub const USER_INSTRUCTIONS_OPEN_TAG: &str = ""; +pub const USER_INSTRUCTIONS_CLOSE_TAG: &str = ""; +pub const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = ""; +pub const ENVIRONMENT_CONTEXT_CLOSE_TAG: &str = ""; + /// Submission Queue Entry - requests from user #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Submission { @@ -417,6 +424,9 @@ pub enum EventMsg { /// Agent text output message AgentMessage(AgentMessageEvent), + /// User/system input message (what was sent to the model) + UserMessage(UserMessageEvent), + /// Agent text output delta message AgentMessageDelta(AgentMessageDeltaEvent), @@ -610,6 +620,47 @@ pub struct AgentMessageEvent { pub message: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum InputMessageKind { + /// Plain user text (default) + Plain, + /// XML-wrapped user instructions (...) + UserInstructions, + /// XML-wrapped environment context (...) + EnvironmentContext, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct UserMessageEvent { + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub kind: Option, +} + +impl From<(T, U)> for InputMessageKind +where + T: AsRef, + U: AsRef, +{ + fn from(value: (T, U)) -> Self { + let (_role, message) = value; + let message = message.as_ref(); + let trimmed = message.trim(); + if trimmed.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) + && trimmed.ends_with(ENVIRONMENT_CONTEXT_CLOSE_TAG) + { + InputMessageKind::EnvironmentContext + } else if trimmed.starts_with(USER_INSTRUCTIONS_OPEN_TAG) + && trimmed.ends_with(USER_INSTRUCTIONS_CLOSE_TAG) + { + InputMessageKind::UserInstructions + } else { + InputMessageKind::Plain + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct AgentMessageDeltaEvent { pub delta: String, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index f86658a8..0fe11ca5 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1010,6 +1010,7 @@ impl ChatWidget { self.on_background_event(message) } EventMsg::StreamError(StreamErrorEvent { message }) => self.on_stream_error(message), + EventMsg::UserMessage(..) => {} EventMsg::ConversationHistory(ev) => { // Forward to App so it can process backtrack flows. self.app_event_tx