diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 5ad73350..874a5ab3 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -739,12 +739,15 @@ dependencies = [ "libc", "owo-colors", "predicates", + "pretty_assertions", + "serde", "serde_json", "shlex", "tempfile", "tokio", "tracing", "tracing-subscriber", + "ts-rs", "uuid", "walkdir", "wiremock", diff --git a/codex-rs/exec/Cargo.toml b/codex-rs/exec/Cargo.toml index 44281c73..8603e8fd 100644 --- a/codex-rs/exec/Cargo.toml +++ b/codex-rs/exec/Cargo.toml @@ -28,6 +28,7 @@ codex-core = { workspace = true } codex-ollama = { workspace = true } codex-protocol = { workspace = true } owo-colors = { workspace = true } +serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } shlex = { workspace = true } tokio = { workspace = true, features = [ @@ -39,12 +40,18 @@ tokio = { workspace = true, features = [ ] } tracing = { workspace = true, features = ["log"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } +ts-rs = { workspace = true, features = [ + "uuid-impl", + "serde-json-impl", + "no-serde-warnings", +] } [dev-dependencies] assert_cmd = { workspace = true } core_test_support = { workspace = true } libc = { workspace = true } predicates = { workspace = true } +pretty_assertions = { workspace = true } tempfile = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } diff --git a/codex-rs/exec/src/cli.rs b/codex-rs/exec/src/cli.rs index 2f5fe4fb..0df114cb 100644 --- a/codex-rs/exec/src/cli.rs +++ b/codex-rs/exec/src/cli.rs @@ -64,9 +64,20 @@ pub struct Cli { pub color: Color, /// Print events to stdout as JSONL. - #[arg(long = "json", default_value_t = false)] + #[arg( + long = "json", + default_value_t = false, + conflicts_with = "experimental_json" + )] pub json: bool, + #[arg( + long = "experimental-json", + default_value_t = false, + conflicts_with = "json" + )] + pub experimental_json: bool, + /// Whether to include the plan tool in the conversation. #[arg(long = "include-plan-tool", default_value_t = false)] pub include_plan_tool: bool, diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 1ba10c34..92114290 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -2,6 +2,7 @@ use std::path::Path; use codex_core::config::Config; use codex_core::protocol::Event; +use codex_core::protocol::SessionConfiguredEvent; pub(crate) enum CodexStatus { Running, @@ -11,7 +12,12 @@ pub(crate) enum CodexStatus { pub(crate) trait EventProcessor { /// Print summary of effective configuration and user prompt. - fn print_config_summary(&mut self, config: &Config, prompt: &str); + fn print_config_summary( + &mut self, + config: &Config, + prompt: &str, + session_configured: &SessionConfiguredEvent, + ); /// Handle a single event emitted by the agent. fn process_event(&mut self, event: Event) -> CodexStatus; diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index cb972bf9..4f231bab 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -141,7 +141,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { /// Print a concise summary of the effective configuration that will be used /// for the session. This mirrors the information shown in the TUI welcome /// screen. - fn print_config_summary(&mut self, config: &Config, prompt: &str) { + fn print_config_summary(&mut self, config: &Config, prompt: &str, _: &SessionConfiguredEvent) { const VERSION: &str = env!("CARGO_PKG_VERSION"); ts_println!( self, diff --git a/codex-rs/exec/src/event_processor_with_json_output.rs b/codex-rs/exec/src/event_processor_with_json_output.rs index 11e9732e..c9d1127f 100644 --- a/codex-rs/exec/src/event_processor_with_json_output.rs +++ b/codex-rs/exec/src/event_processor_with_json_output.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use codex_core::config::Config; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; +use codex_core::protocol::SessionConfiguredEvent; use codex_core::protocol::TaskCompleteEvent; use serde_json::json; @@ -23,7 +24,7 @@ impl EventProcessorWithJsonOutput { } impl EventProcessor for EventProcessorWithJsonOutput { - fn print_config_summary(&mut self, config: &Config, prompt: &str) { + fn print_config_summary(&mut self, config: &Config, prompt: &str, _: &SessionConfiguredEvent) { let entries = create_config_summary_entries(config) .into_iter() .map(|(key, value)| (key.to_string(), value)) diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs new file mode 100644 index 00000000..68c790b8 --- /dev/null +++ b/codex-rs/exec/src/exec_events.rs @@ -0,0 +1,146 @@ +use serde::Deserialize; +use serde::Serialize; +use ts_rs::TS; + +/// Top-level events emitted on the Codex Exec conversation stream. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +#[serde(tag = "type")] +pub enum ConversationEvent { + #[serde(rename = "session.created")] + SessionCreated(SessionCreatedEvent), + #[serde(rename = "item.completed")] + ItemCompleted(ItemCompletedEvent), + #[serde(rename = "error")] + Error(ConversationErrorEvent), +} + +/// Payload describing a newly created conversation item. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct SessionCreatedEvent { + pub session_id: String, +} + +/// Payload describing the completion of an existing conversation item. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct ItemCompletedEvent { + pub item: ConversationItem, +} + +/// Fatal error emitted by the stream. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct ConversationErrorEvent { + pub message: String, +} + +/// Canonical representation of a conversation item and its domain-specific payload. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct ConversationItem { + pub id: String, + #[serde(flatten)] + pub details: ConversationItemDetails, +} + +/// Typed payloads for each supported conversation item type. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +#[serde(tag = "item_type", rename_all = "snake_case")] +pub enum ConversationItemDetails { + AssistantMessage(AssistantMessageItem), + Reasoning(ReasoningItem), + CommandExecution(CommandExecutionItem), + FileChange(FileChangeItem), + McpToolCall(McpToolCallItem), + WebSearch(WebSearchItem), + Error(ErrorItem), +} + +/// Session conversation metadata. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct SessionItem { + pub session_id: String, +} + +/// Assistant message payload. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct AssistantMessageItem { + pub text: String, +} + +/// Model reasoning summary payload. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct ReasoningItem { + pub text: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default, TS)] +#[serde(rename_all = "snake_case")] +pub enum CommandExecutionStatus { + #[default] + InProgress, + Completed, + Failed, +} + +/// Local shell command execution payload. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct CommandExecutionItem { + pub command: String, + pub aggregated_output: String, + pub exit_code: i32, + pub status: CommandExecutionStatus, +} + +/// Single file change summary for a patch. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct FileUpdateChange { + pub path: String, + pub kind: PatchChangeKind, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +#[serde(rename_all = "snake_case")] +pub enum PatchApplyStatus { + Completed, + Failed, +} + +/// Patch application payload. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct FileChangeItem { + pub changes: Vec, + pub status: PatchApplyStatus, +} + +/// Known change kinds for a patch. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +#[serde(rename_all = "snake_case")] +pub enum PatchChangeKind { + Add, + Delete, + Update, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default, TS)] +#[serde(rename_all = "snake_case")] +pub enum McpToolCallStatus { + #[default] + InProgress, + Completed, + Failed, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct McpToolCallItem { + pub server: String, + pub tool: String, + pub status: McpToolCallStatus, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct WebSearchItem { + pub query: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)] +pub struct ErrorItem { + pub message: String, +} diff --git a/codex-rs/exec/src/experimental_event_processor_with_json_output.rs b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs new file mode 100644 index 00000000..6abbda7c --- /dev/null +++ b/codex-rs/exec/src/experimental_event_processor_with_json_output.rs @@ -0,0 +1,229 @@ +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::atomic::AtomicU64; + +use crate::event_processor::CodexStatus; +use crate::event_processor::EventProcessor; +use crate::event_processor::handle_last_message; +use crate::exec_events::AssistantMessageItem; +use crate::exec_events::CommandExecutionItem; +use crate::exec_events::CommandExecutionStatus; +use crate::exec_events::ConversationErrorEvent; +use crate::exec_events::ConversationEvent; +use crate::exec_events::ConversationItem; +use crate::exec_events::ConversationItemDetails; +use crate::exec_events::FileChangeItem; +use crate::exec_events::FileUpdateChange; +use crate::exec_events::ItemCompletedEvent; +use crate::exec_events::PatchApplyStatus; +use crate::exec_events::PatchChangeKind; +use crate::exec_events::ReasoningItem; +use crate::exec_events::SessionCreatedEvent; +use codex_core::config::Config; +use codex_core::protocol::AgentMessageEvent; +use codex_core::protocol::AgentReasoningEvent; +use codex_core::protocol::Event; +use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecCommandBeginEvent; +use codex_core::protocol::ExecCommandEndEvent; +use codex_core::protocol::FileChange; +use codex_core::protocol::PatchApplyBeginEvent; +use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::SessionConfiguredEvent; +use codex_core::protocol::TaskCompleteEvent; +use tracing::error; + +pub struct ExperimentalEventProcessorWithJsonOutput { + last_message_path: Option, + next_event_id: AtomicU64, + running_commands: HashMap>, + running_patch_applies: HashMap, +} + +impl ExperimentalEventProcessorWithJsonOutput { + pub fn new(last_message_path: Option) -> Self { + Self { + last_message_path, + next_event_id: AtomicU64::new(0), + running_commands: HashMap::new(), + running_patch_applies: HashMap::new(), + } + } + + pub fn collect_conversation_events(&mut self, event: &Event) -> Vec { + match &event.msg { + EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev), + EventMsg::AgentMessage(ev) => self.handle_agent_message(ev), + EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev), + EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev), + EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev), + EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev), + EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev), + EventMsg::Error(ev) => vec![ConversationEvent::Error(ConversationErrorEvent { + message: ev.message.clone(), + })], + EventMsg::StreamError(ev) => vec![ConversationEvent::Error(ConversationErrorEvent { + message: ev.message.clone(), + })], + _ => Vec::new(), + } + } + + fn get_next_item_id(&self) -> String { + format!( + "item_{}", + self.next_event_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + ) + } + + fn handle_session_configured( + &self, + payload: &SessionConfiguredEvent, + ) -> Vec { + vec![ConversationEvent::SessionCreated(SessionCreatedEvent { + session_id: payload.session_id.to_string(), + })] + } + + fn handle_agent_message(&self, payload: &AgentMessageEvent) -> Vec { + let item = ConversationItem { + id: self.get_next_item_id(), + + details: ConversationItemDetails::AssistantMessage(AssistantMessageItem { + text: payload.message.clone(), + }), + }; + + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item, + })] + } + + fn handle_reasoning_event(&self, ev: &AgentReasoningEvent) -> Vec { + let item = ConversationItem { + id: self.get_next_item_id(), + + details: ConversationItemDetails::Reasoning(ReasoningItem { + text: ev.text.clone(), + }), + }; + + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item, + })] + } + fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec { + self.running_commands + .insert(ev.call_id.clone(), ev.command.clone()); + + Vec::new() + } + + fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec { + self.running_patch_applies + .insert(ev.call_id.clone(), ev.clone()); + + Vec::new() + } + + fn map_change_kind(&self, kind: &FileChange) -> PatchChangeKind { + match kind { + FileChange::Add { .. } => PatchChangeKind::Add, + FileChange::Delete { .. } => PatchChangeKind::Delete, + FileChange::Update { .. } => PatchChangeKind::Update, + } + } + + fn handle_patch_apply_end(&mut self, ev: &PatchApplyEndEvent) -> Vec { + if let Some(running_patch_apply) = self.running_patch_applies.remove(&ev.call_id) { + let status = if ev.success { + PatchApplyStatus::Completed + } else { + PatchApplyStatus::Failed + }; + let item = ConversationItem { + id: self.get_next_item_id(), + + details: ConversationItemDetails::FileChange(FileChangeItem { + changes: running_patch_apply + .changes + .iter() + .map(|(path, change)| FileUpdateChange { + path: path.to_str().unwrap_or("").to_string(), + kind: self.map_change_kind(change), + }) + .collect(), + status, + }), + }; + + return vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item, + })]; + } + + Vec::new() + } + + fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec { + let command = self + .running_commands + .remove(&ev.call_id) + .map(|command| command.join(" ")) + .unwrap_or_default(); + let status = if ev.exit_code == 0 { + CommandExecutionStatus::Completed + } else { + CommandExecutionStatus::Failed + }; + let item = ConversationItem { + id: self.get_next_item_id(), + + details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + command, + aggregated_output: ev.aggregated_output.clone(), + exit_code: ev.exit_code, + status, + }), + }; + + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item, + })] + } +} + +impl EventProcessor for ExperimentalEventProcessorWithJsonOutput { + fn print_config_summary(&mut self, _: &Config, _: &str, ev: &SessionConfiguredEvent) { + self.process_event(Event { + id: "".to_string(), + msg: EventMsg::SessionConfigured(ev.clone()), + }); + } + + fn process_event(&mut self, event: Event) -> CodexStatus { + let aggregated = self.collect_conversation_events(&event); + for conv_event in aggregated { + match serde_json::to_string(&conv_event) { + Ok(line) => { + println!("{line}"); + } + Err(e) => { + error!("Failed to serialize event: {e:?}"); + } + } + } + + let Event { msg, .. } = event; + + if let EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) = msg { + if let Some(output_file) = self.last_message_path.as_deref() { + handle_last_message(last_agent_message.as_deref(), output_file); + } + CodexStatus::InitiateShutdown + } else { + CodexStatus::Running + } + } +} diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 9c561cfb..0ccb1e01 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -1,7 +1,9 @@ mod cli; mod event_processor; mod event_processor_with_human_output; -mod event_processor_with_json_output; +pub mod event_processor_with_json_output; +pub mod exec_events; +pub mod experimental_event_processor_with_json_output; use std::io::IsTerminal; use std::io::Read; @@ -24,7 +26,7 @@ use codex_core::protocol::TaskCompleteEvent; use codex_ollama::DEFAULT_OSS_MODEL; use codex_protocol::config_types::SandboxMode; use event_processor_with_human_output::EventProcessorWithHumanOutput; -use event_processor_with_json_output::EventProcessorWithJsonOutput; +use experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput; use serde_json::Value; use tracing::debug; use tracing::error; @@ -34,6 +36,7 @@ use tracing_subscriber::EnvFilter; use crate::cli::Command as ExecCommand; use crate::event_processor::CodexStatus; use crate::event_processor::EventProcessor; +use crate::event_processor_with_json_output::EventProcessorWithJsonOutput; use codex_core::find_conversation_path_by_id_str; pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> anyhow::Result<()> { @@ -50,6 +53,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any color, last_message_file, json: json_mode, + experimental_json, sandbox_mode: sandbox_mode_cli_arg, prompt, output_schema: output_schema_path, @@ -178,14 +182,22 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any }; let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?; - let mut event_processor: Box = if json_mode { - Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())) - } else { - Box::new(EventProcessorWithHumanOutput::create_with_ansi( + let mut event_processor: Box = match (json_mode, experimental_json) { + (_, true) => Box::new(ExperimentalEventProcessorWithJsonOutput::new( + last_message_file.clone(), + )), + (true, _) => { + eprintln!( + "The existing `--json` output format is being deprecated. Please try the new format using `--experimental-json`." + ); + + Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())) + } + _ => Box::new(EventProcessorWithHumanOutput::create_with_ansi( stdout_with_ansi, &config, last_message_file.clone(), - )) + )), }; if oss { @@ -194,10 +206,6 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any .map_err(|e| anyhow::anyhow!("OSS setup failed: {e}"))?; } - // Print the effective configuration and prompt so users can see what Codex - // is using. - event_processor.print_config_summary(&config, &prompt); - let default_cwd = config.cwd.to_path_buf(); let default_approval_policy = config.approval_policy; let default_sandbox_policy = config.sandbox_policy.clone(); @@ -230,11 +238,19 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any ) .await? } else { - conversation_manager.new_conversation(config).await? + conversation_manager + .new_conversation(config.clone()) + .await? } } else { - conversation_manager.new_conversation(config).await? + conversation_manager + .new_conversation(config.clone()) + .await? }; + // Print the effective configuration and prompt so users can see what Codex + // is using. + event_processor.print_config_summary(&config, &prompt, &session_configured); + info!("Codex initialized with event: {session_configured:?}"); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); diff --git a/codex-rs/exec/tests/all.rs b/codex-rs/exec/tests/all.rs index 7e136e4c..6fd2c163 100644 --- a/codex-rs/exec/tests/all.rs +++ b/codex-rs/exec/tests/all.rs @@ -1,3 +1,5 @@ // Single integration test binary that aggregates all test modules. // The submodules live in `tests/suite/`. mod suite; + +mod event_processor_with_json_output; diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs new file mode 100644 index 00000000..0ff8ed13 --- /dev/null +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -0,0 +1,371 @@ +use codex_core::protocol::AgentMessageEvent; +use codex_core::protocol::AgentReasoningEvent; +use codex_core::protocol::Event; +use codex_core::protocol::EventMsg; +use codex_core::protocol::ExecCommandBeginEvent; +use codex_core::protocol::ExecCommandEndEvent; +use codex_core::protocol::FileChange; +use codex_core::protocol::PatchApplyBeginEvent; +use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::SessionConfiguredEvent; +use codex_exec::exec_events::AssistantMessageItem; +use codex_exec::exec_events::CommandExecutionItem; +use codex_exec::exec_events::CommandExecutionStatus; +use codex_exec::exec_events::ConversationErrorEvent; +use codex_exec::exec_events::ConversationEvent; +use codex_exec::exec_events::ConversationItem; +use codex_exec::exec_events::ConversationItemDetails; +use codex_exec::exec_events::ItemCompletedEvent; +use codex_exec::exec_events::PatchApplyStatus; +use codex_exec::exec_events::PatchChangeKind; +use codex_exec::exec_events::ReasoningItem; +use codex_exec::exec_events::SessionCreatedEvent; +use codex_exec::experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput; +use pretty_assertions::assert_eq; +use std::path::PathBuf; +use std::time::Duration; + +fn event(id: &str, msg: EventMsg) -> Event { + Event { + id: id.to_string(), + msg, + } +} + +#[test] +fn session_configured_produces_session_created_event() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + let session_id = codex_protocol::mcp_protocol::ConversationId::from_string( + "67e55044-10b1-426f-9247-bb680e5fe0c8", + ) + .unwrap(); + let rollout_path = PathBuf::from("/tmp/rollout.json"); + let ev = event( + "e1", + EventMsg::SessionConfigured(SessionConfiguredEvent { + session_id, + model: "codex-mini-latest".to_string(), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + rollout_path, + }), + ); + let out = ep.collect_conversation_events(&ev); + assert_eq!( + out, + vec![ConversationEvent::SessionCreated(SessionCreatedEvent { + session_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(), + })] + ); +} + +#[test] +fn agent_reasoning_produces_item_completed_reasoning() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + let ev = event( + "e1", + EventMsg::AgentReasoning(AgentReasoningEvent { + text: "thinking...".to_string(), + }), + ); + let out = ep.collect_conversation_events(&ev); + assert_eq!( + out, + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::Reasoning(ReasoningItem { + text: "thinking...".to_string(), + }), + }, + })] + ); +} + +#[test] +fn agent_message_produces_item_completed_assistant_message() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + let ev = event( + "e1", + EventMsg::AgentMessage(AgentMessageEvent { + message: "hello".to_string(), + }), + ); + let out = ep.collect_conversation_events(&ev); + assert_eq!( + out, + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::AssistantMessage(AssistantMessageItem { + text: "hello".to_string(), + }), + }, + })] + ); +} + +#[test] +fn error_event_produces_error() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + let out = ep.collect_conversation_events(&event( + "e1", + EventMsg::Error(codex_core::protocol::ErrorEvent { + message: "boom".to_string(), + }), + )); + assert_eq!( + out, + vec![ConversationEvent::Error(ConversationErrorEvent { + message: "boom".to_string(), + })] + ); +} + +#[test] +fn stream_error_event_produces_error() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + let out = ep.collect_conversation_events(&event( + "e1", + EventMsg::StreamError(codex_core::protocol::StreamErrorEvent { + message: "retrying".to_string(), + }), + )); + assert_eq!( + out, + vec![ConversationEvent::Error(ConversationErrorEvent { + message: "retrying".to_string(), + })] + ); +} + +#[test] +fn exec_command_end_success_produces_completed_command_item() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + // Begin -> no output + let begin = event( + "c1", + EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id: "1".to_string(), + command: vec!["bash".to_string(), "-lc".to_string(), "echo hi".to_string()], + cwd: std::env::current_dir().unwrap(), + parsed_cmd: Vec::new(), + }), + ); + let out_begin = ep.collect_conversation_events(&begin); + assert!(out_begin.is_empty()); + + // End (success) -> item.completed (item_0) + let end_ok = event( + "c2", + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "1".to_string(), + stdout: String::new(), + stderr: String::new(), + aggregated_output: "hi\n".to_string(), + exit_code: 0, + duration: Duration::from_millis(5), + formatted_output: String::new(), + }), + ); + let out_ok = ep.collect_conversation_events(&end_ok); + assert_eq!( + out_ok, + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + command: "bash -lc echo hi".to_string(), + aggregated_output: "hi\n".to_string(), + exit_code: 0, + status: CommandExecutionStatus::Completed, + }), + }, + })] + ); +} + +#[test] +fn exec_command_end_failure_produces_failed_command_item() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + // Begin -> no output + let begin = event( + "c1", + EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id: "2".to_string(), + command: vec!["sh".to_string(), "-c".to_string(), "exit 1".to_string()], + cwd: std::env::current_dir().unwrap(), + parsed_cmd: Vec::new(), + }), + ); + assert!(ep.collect_conversation_events(&begin).is_empty()); + + // End (failure) -> item.completed (item_0) + let end_fail = event( + "c2", + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "2".to_string(), + stdout: String::new(), + stderr: String::new(), + aggregated_output: String::new(), + exit_code: 1, + duration: Duration::from_millis(2), + formatted_output: String::new(), + }), + ); + let out_fail = ep.collect_conversation_events(&end_fail); + assert_eq!( + out_fail, + vec![ConversationEvent::ItemCompleted(ItemCompletedEvent { + item: ConversationItem { + id: "item_0".to_string(), + details: ConversationItemDetails::CommandExecution(CommandExecutionItem { + command: "sh -c exit 1".to_string(), + aggregated_output: String::new(), + exit_code: 1, + status: CommandExecutionStatus::Failed, + }), + }, + })] + ); +} + +#[test] +fn patch_apply_success_produces_item_completed_patchapply() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + // Prepare a patch with multiple kinds of changes + let mut changes = std::collections::HashMap::new(); + changes.insert( + PathBuf::from("a/added.txt"), + FileChange::Add { + content: "+hello".to_string(), + }, + ); + changes.insert( + PathBuf::from("b/deleted.txt"), + FileChange::Delete { + content: "-goodbye".to_string(), + }, + ); + changes.insert( + PathBuf::from("c/modified.txt"), + FileChange::Update { + unified_diff: "--- c/modified.txt\n+++ c/modified.txt\n@@\n-old\n+new\n".to_string(), + move_path: Some(PathBuf::from("c/renamed.txt")), + }, + ); + + // Begin -> no output + let begin = event( + "p1", + EventMsg::PatchApplyBegin(PatchApplyBeginEvent { + call_id: "call-1".to_string(), + auto_approved: true, + changes: changes.clone(), + }), + ); + let out_begin = ep.collect_conversation_events(&begin); + assert!(out_begin.is_empty()); + + // End (success) -> item.completed (item_0) + let end = event( + "p2", + EventMsg::PatchApplyEnd(PatchApplyEndEvent { + call_id: "call-1".to_string(), + stdout: "applied 3 changes".to_string(), + stderr: String::new(), + success: true, + }), + ); + let out_end = ep.collect_conversation_events(&end); + assert_eq!(out_end.len(), 1); + + // Validate structure without relying on HashMap iteration order + match &out_end[0] { + ConversationEvent::ItemCompleted(ItemCompletedEvent { item }) => { + assert_eq!(&item.id, "item_0"); + match &item.details { + ConversationItemDetails::FileChange(file_update) => { + assert_eq!(file_update.status, PatchApplyStatus::Completed); + + let mut actual: Vec<(String, PatchChangeKind)> = file_update + .changes + .iter() + .map(|c| (c.path.clone(), c.kind.clone())) + .collect(); + actual.sort_by(|a, b| a.0.cmp(&b.0)); + + let mut expected = vec![ + ("a/added.txt".to_string(), PatchChangeKind::Add), + ("b/deleted.txt".to_string(), PatchChangeKind::Delete), + ("c/modified.txt".to_string(), PatchChangeKind::Update), + ]; + expected.sort_by(|a, b| a.0.cmp(&b.0)); + + assert_eq!(actual, expected); + } + other => panic!("unexpected details: {other:?}"), + } + } + other => panic!("unexpected event: {other:?}"), + } +} + +#[test] +fn patch_apply_failure_produces_item_completed_patchapply_failed() { + let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None); + + let mut changes = std::collections::HashMap::new(); + changes.insert( + PathBuf::from("file.txt"), + FileChange::Update { + unified_diff: "--- file.txt\n+++ file.txt\n@@\n-old\n+new\n".to_string(), + move_path: None, + }, + ); + + // Begin -> no output + let begin = event( + "p1", + EventMsg::PatchApplyBegin(PatchApplyBeginEvent { + call_id: "call-2".to_string(), + auto_approved: false, + changes: changes.clone(), + }), + ); + assert!(ep.collect_conversation_events(&begin).is_empty()); + + // End (failure) -> item.completed (item_0) with Failed status + let end = event( + "p2", + EventMsg::PatchApplyEnd(PatchApplyEndEvent { + call_id: "call-2".to_string(), + stdout: String::new(), + stderr: "failed to apply".to_string(), + success: false, + }), + ); + let out_end = ep.collect_conversation_events(&end); + assert_eq!(out_end.len(), 1); + + match &out_end[0] { + ConversationEvent::ItemCompleted(ItemCompletedEvent { item }) => { + assert_eq!(&item.id, "item_0"); + match &item.details { + ConversationItemDetails::FileChange(file_update) => { + assert_eq!(file_update.status, PatchApplyStatus::Failed); + assert_eq!(file_update.changes.len(), 1); + assert_eq!(file_update.changes[0].path, "file.txt".to_string()); + assert_eq!(file_update.changes[0].kind, PatchChangeKind::Update); + } + other => panic!("unexpected details: {other:?}"), + } + } + other => panic!("unexpected event: {other:?}"), + } +}