Files
llmx/codex-rs/exec/src/event_processor.rs

409 lines
15 KiB
Rust
Raw Normal View History

use chrono::Utc;
use codex_common::elapsed::format_elapsed;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandBeginEvent;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::FileChange;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
use std::collections::HashMap;
/// This should be configurable. When used in CI, users may not want to impose
/// a limit so they can see the full transcript.
const MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL: usize = 20;
pub(crate) struct EventProcessor {
call_id_to_command: HashMap<String, ExecCommandBegin>,
call_id_to_patch: HashMap<String, PatchApplyBegin>,
/// Tracks in-flight MCP tool calls so we can calculate duration and print
/// a concise summary when the corresponding `McpToolCallEnd` event is
/// received.
call_id_to_tool_call: HashMap<String, McpToolCallBegin>,
// To ensure that --color=never is respected, ANSI escapes _must_ be added
// using .style() with one of these fields. If you need a new style, add a
// new field here.
bold: Style,
dimmed: Style,
magenta: Style,
red: Style,
green: Style,
}
impl EventProcessor {
pub(crate) fn create_with_ansi(with_ansi: bool) -> Self {
let call_id_to_command = HashMap::new();
let call_id_to_patch = HashMap::new();
let call_id_to_tool_call = HashMap::new();
if with_ansi {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new().bold(),
dimmed: Style::new().dimmed(),
magenta: Style::new().magenta(),
red: Style::new().red(),
green: Style::new().green(),
call_id_to_tool_call,
}
} else {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new(),
dimmed: Style::new(),
magenta: Style::new(),
red: Style::new(),
green: Style::new(),
call_id_to_tool_call,
}
}
}
}
struct ExecCommandBegin {
command: Vec<String>,
start_time: chrono::DateTime<Utc>,
}
/// Metadata captured when an `McpToolCallBegin` event is received.
struct McpToolCallBegin {
/// Formatted invocation string, e.g. `server.tool({"city":"sf"})`.
invocation: String,
/// Timestamp when the call started so we can compute duration later.
start_time: chrono::DateTime<Utc>,
}
struct PatchApplyBegin {
start_time: chrono::DateTime<Utc>,
auto_approved: bool,
}
macro_rules! ts_println {
($($arg:tt)*) => {{
let now = Utc::now();
let formatted = now.format("%Y-%m-%dT%H:%M:%S").to_string();
print!("[{}] ", formatted);
println!($($arg)*);
}};
}
impl EventProcessor {
pub(crate) fn process_event(&mut self, event: Event) {
let Event { id, msg } = event;
match msg {
EventMsg::Error(ErrorEvent { message }) => {
let prefix = "ERROR:".style(self.red);
ts_println!("{prefix} {message}");
}
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!("{}", message.style(self.dimmed));
}
EventMsg::TaskStarted => {
let msg = format!("Task started: {id}");
ts_println!("{}", msg.style(self.dimmed));
}
EventMsg::TaskComplete => {
let msg = format!("Task complete: {id}");
ts_println!("{}", msg.style(self.bold));
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
let prefix = "Agent message:".style(self.bold);
ts_println!("{prefix} {message}");
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
command,
cwd,
}) => {
self.call_id_to_command.insert(
call_id.clone(),
ExecCommandBegin {
command: command.clone(),
start_time: Utc::now(),
},
);
ts_println!(
"{} {} in {}",
"exec".style(self.magenta),
escape_command(&command).style(self.bold),
cwd.to_string_lossy(),
);
}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout,
stderr,
exit_code,
}) => {
let exec_command = self.call_id_to_command.remove(&call_id);
let (duration, call) = if let Some(ExecCommandBegin {
command,
start_time,
}) = exec_command
{
(
format!(" in {}", format_elapsed(start_time)),
format!("{}", escape_command(&command).style(self.bold)),
)
} else {
("".to_string(), format!("exec('{call_id}')"))
};
let output = if exit_code == 0 { stdout } else { stderr };
let truncated_output = output
.lines()
.take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL)
.collect::<Vec<_>>()
.join("\n");
match exit_code {
0 => {
let title = format!("{call} succeeded{duration}:");
ts_println!("{}", title.style(self.green));
}
_ => {
let title = format!("{call} exited {exit_code}{duration}:");
ts_println!("{}", title.style(self.red));
}
}
println!("{}", truncated_output.style(self.dimmed));
}
EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id,
server,
tool,
arguments,
}) => {
// Build fully-qualified tool name: server.tool
let fq_tool_name = format!("{server}.{tool}");
// Format arguments as compact JSON so they fit on one line.
let args_str = arguments
.as_ref()
.map(|v: &serde_json::Value| {
serde_json::to_string(v).unwrap_or_else(|_| v.to_string())
})
.unwrap_or_default();
let invocation = if args_str.is_empty() {
format!("{fq_tool_name}()")
} else {
format!("{fq_tool_name}({args_str})")
};
self.call_id_to_tool_call.insert(
call_id.clone(),
McpToolCallBegin {
invocation: invocation.clone(),
start_time: Utc::now(),
},
);
ts_println!(
"{} {}",
"tool".style(self.magenta),
invocation.style(self.bold),
);
}
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id,
success,
result,
}) => {
// Retrieve start time and invocation for duration calculation and labeling.
let info = self.call_id_to_tool_call.remove(&call_id);
let (duration, invocation) = if let Some(McpToolCallBegin {
invocation,
start_time,
..
}) = info
{
(format!(" in {}", format_elapsed(start_time)), invocation)
} else {
(String::new(), format!("tool('{call_id}')"))
};
let status_str = if success { "success" } else { "failed" };
let title_style = if success { self.green } else { self.red };
let title = format!("{invocation} {status_str}{duration}:");
ts_println!("{}", title.style(title_style));
if let Some(res) = result {
let val: serde_json::Value = res.into();
let pretty =
serde_json::to_string_pretty(&val).unwrap_or_else(|_| val.to_string());
for line in pretty.lines().take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL) {
println!("{}", line.style(self.dimmed));
}
}
}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id,
auto_approved,
changes,
}) => {
// Store metadata so we can calculate duration later when we
// receive the corresponding PatchApplyEnd event.
self.call_id_to_patch.insert(
call_id.clone(),
PatchApplyBegin {
start_time: Utc::now(),
auto_approved,
},
);
ts_println!(
"{} auto_approved={}:",
"apply_patch".style(self.magenta),
auto_approved,
);
// Pretty-print the patch summary with colored diff markers so
// its easy to scan in the terminal output.
for (path, change) in changes.iter() {
match change {
FileChange::Add { content } => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
println!("{}", header.style(self.magenta));
for line in content.lines() {
println!("{}", line.style(self.green));
}
}
FileChange::Delete => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
println!("{}", header.style(self.magenta));
}
FileChange::Update {
unified_diff,
move_path,
} => {
let header = if let Some(dest) = move_path {
format!(
"{} {} -> {}",
format_file_change(change),
path.to_string_lossy(),
dest.to_string_lossy()
)
} else {
format!("{} {}", format_file_change(change), path.to_string_lossy())
};
println!("{}", header.style(self.magenta));
// Colorize diff lines. We keep file header lines
// (--- / +++) without extra coloring so they are
// still readable.
for diff_line in unified_diff.lines() {
if diff_line.starts_with('+') && !diff_line.starts_with("+++") {
println!("{}", diff_line.style(self.green));
} else if diff_line.starts_with('-')
&& !diff_line.starts_with("---")
{
println!("{}", diff_line.style(self.red));
} else {
println!("{diff_line}");
}
}
}
}
}
}
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id,
stdout,
stderr,
success,
}) => {
let patch_begin = self.call_id_to_patch.remove(&call_id);
// Compute duration and summary label similar to exec commands.
let (duration, label) = if let Some(PatchApplyBegin {
start_time,
auto_approved,
}) = patch_begin
{
(
format!(" in {}", format_elapsed(start_time)),
format!("apply_patch(auto_approved={})", auto_approved),
)
} else {
(String::new(), format!("apply_patch('{call_id}')"))
};
let (exit_code, output, title_style) = if success {
(0, stdout, self.green)
} else {
(1, stderr, self.red)
};
let title = format!("{label} exited {exit_code}{duration}:");
ts_println!("{}", title.style(title_style));
for line in output.lines() {
println!("{}", line.style(self.dimmed));
}
}
EventMsg::ExecApprovalRequest(_) => {
// Should we exit?
}
EventMsg::ApplyPatchApprovalRequest(_) => {
// Should we exit?
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
println!("thinking: {}", agent_reasoning_event.text);
}
EventMsg::SessionConfigured(session_configured_event) => {
feat: record messages from user in ~/.codex/history.jsonl (#939) This is a large change to support a "history" feature like you would expect in a shell like Bash. History events are recorded in `$CODEX_HOME/history.jsonl`. Because it is a JSONL file, it is straightforward to append new entries (as opposed to the TypeScript file that uses `$CODEX_HOME/history.json`, so to be valid JSON, each new entry entails rewriting the entire file). Because it is possible for there to be multiple instances of Codex CLI writing to `history.jsonl` at once, we use advisory file locking when working with `history.jsonl` in `codex-rs/core/src/message_history.rs`. Because we believe history is a sufficiently useful feature, we enable it by default. Though to provide some safety, we set the file permissions of `history.jsonl` to be `o600` so that other users on the system cannot read the user's history. We do not yet support a default list of `SENSITIVE_PATTERNS` as the TypeScript CLI does: https://github.com/openai/codex/blob/3fdf9df1335ac9501e3fb0e61715359145711e8b/codex-cli/src/utils/storage/command-history.ts#L10-L17 We are going to take a more conservative approach to this list in the Rust CLI. For example, while `/\b[A-Za-z0-9-_]{20,}\b/` might exclude sensitive information like API tokens, it would also exclude valuable information such as references to Git commits. As noted in the updated documentation, users can opt-out of history by adding the following to `config.toml`: ```toml [history] persistence = "none" ``` Because `history.jsonl` could, in theory, be quite large, we take a[n arguably overly pedantic] approach in reading history entries into memory. Specifically, we start by telling the client the current number of entries in the history file (`history_entry_count`) as well as the inode (`history_log_id`) of `history.jsonl` (see the new fields on `SessionConfiguredEvent`). The client is responsible for keeping new entries in memory to create a "local history," but if the user hits up enough times to go "past" the end of local history, then the client should use the new `GetHistoryEntryRequest` in the protocol to fetch older entries. Specifically, it should pass the `history_log_id` it was given originally and work backwards from `history_entry_count`. (It should really fetch history in batches rather than one-at-a-time, but that is something we can improve upon in subsequent PRs.) The motivation behind this crazy scheme is that it is designed to defend against: * The `history.jsonl` being truncated during the session such that the index into the history is no longer consistent with what had been read up to that point. We do not yet have logic to enforce a `max_bytes` for `history.jsonl`, but once we do, we will aspire to implement it in a way that should result in a new inode for the file on most systems. * New items from concurrent Codex CLI sessions amending to the history. Because, in absence of truncation, `history.jsonl` is an append-only log, so long as the client reads backwards from `history_entry_count`, it should always get a consistent view of history. (That said, it will not be able to read _new_ commands from concurrent sessions, but perhaps we will introduce a `/` command to reload latest history or something down the road.) Admittedly, my testing of this feature thus far has been fairly light. I expect we will find bugs and introduce enhancements/fixes going forward.
2025-05-15 16:26:23 -07:00
let SessionConfiguredEvent {
session_id,
model,
history_log_id: _,
history_entry_count: _,
} = session_configured_event;
println!("session {session_id} with model {model}");
}
feat: record messages from user in ~/.codex/history.jsonl (#939) This is a large change to support a "history" feature like you would expect in a shell like Bash. History events are recorded in `$CODEX_HOME/history.jsonl`. Because it is a JSONL file, it is straightforward to append new entries (as opposed to the TypeScript file that uses `$CODEX_HOME/history.json`, so to be valid JSON, each new entry entails rewriting the entire file). Because it is possible for there to be multiple instances of Codex CLI writing to `history.jsonl` at once, we use advisory file locking when working with `history.jsonl` in `codex-rs/core/src/message_history.rs`. Because we believe history is a sufficiently useful feature, we enable it by default. Though to provide some safety, we set the file permissions of `history.jsonl` to be `o600` so that other users on the system cannot read the user's history. We do not yet support a default list of `SENSITIVE_PATTERNS` as the TypeScript CLI does: https://github.com/openai/codex/blob/3fdf9df1335ac9501e3fb0e61715359145711e8b/codex-cli/src/utils/storage/command-history.ts#L10-L17 We are going to take a more conservative approach to this list in the Rust CLI. For example, while `/\b[A-Za-z0-9-_]{20,}\b/` might exclude sensitive information like API tokens, it would also exclude valuable information such as references to Git commits. As noted in the updated documentation, users can opt-out of history by adding the following to `config.toml`: ```toml [history] persistence = "none" ``` Because `history.jsonl` could, in theory, be quite large, we take a[n arguably overly pedantic] approach in reading history entries into memory. Specifically, we start by telling the client the current number of entries in the history file (`history_entry_count`) as well as the inode (`history_log_id`) of `history.jsonl` (see the new fields on `SessionConfiguredEvent`). The client is responsible for keeping new entries in memory to create a "local history," but if the user hits up enough times to go "past" the end of local history, then the client should use the new `GetHistoryEntryRequest` in the protocol to fetch older entries. Specifically, it should pass the `history_log_id` it was given originally and work backwards from `history_entry_count`. (It should really fetch history in batches rather than one-at-a-time, but that is something we can improve upon in subsequent PRs.) The motivation behind this crazy scheme is that it is designed to defend against: * The `history.jsonl` being truncated during the session such that the index into the history is no longer consistent with what had been read up to that point. We do not yet have logic to enforce a `max_bytes` for `history.jsonl`, but once we do, we will aspire to implement it in a way that should result in a new inode for the file on most systems. * New items from concurrent Codex CLI sessions amending to the history. Because, in absence of truncation, `history.jsonl` is an append-only log, so long as the client reads backwards from `history_entry_count`, it should always get a consistent view of history. (That said, it will not be able to read _new_ commands from concurrent sessions, but perhaps we will introduce a `/` command to reload latest history or something down the road.) Admittedly, my testing of this feature thus far has been fairly light. I expect we will find bugs and introduce enhancements/fixes going forward.
2025-05-15 16:26:23 -07:00
EventMsg::GetHistoryEntryResponse(_) => {
// Currently ignored in exec output.
}
}
}
}
fn escape_command(command: &[String]) -> String {
try_join(command.iter().map(|s| s.as_str())).unwrap_or_else(|_| command.join(" "))
}
fn format_file_change(change: &FileChange) -> &'static str {
match change {
FileChange::Add { .. } => "A",
FileChange::Delete => "D",
FileChange::Update {
move_path: Some(_), ..
} => "R",
FileChange::Update {
move_path: None, ..
} => "M",
}
}