Add user command event types (#6246)

adding new user command event, logic in TUI to render user command
events
This commit is contained in:
zhao-oai
2025-11-10 11:18:45 -08:00
committed by GitHub
parent e743d251a7
commit 980886498c
7 changed files with 411 additions and 66 deletions

View File

@@ -14,6 +14,7 @@ use tracing::warn;
use uuid::Uuid;
use crate::user_instructions::UserInstructions;
use crate::user_shell_command::is_user_shell_command_text;
fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
@@ -31,7 +32,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
for content_item in message.iter() {
match content_item {
ContentItem::InputText { text } => {
if is_session_prefix(text) {
if is_session_prefix(text) || is_user_shell_command_text(text) {
return None;
}
content.push(UserInput::Text { text: text.clone() });
@@ -197,7 +198,14 @@ mod tests {
text: "# AGENTS.md instructions for test_directory\n\n<INSTRUCTIONS>\ntest_text\n</INSTRUCTIONS>".to_string(),
}],
},
];
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "<user_shell_command>echo 42</user_shell_command>".to_string(),
}],
},
];
for item in items {
let turn_item = parse_turn_item(&item);

View File

@@ -81,6 +81,7 @@ mod function_tool;
mod state;
mod tasks;
mod user_notification;
mod user_shell_command;
pub mod util;
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;

View File

@@ -1,28 +1,35 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use codex_protocol::models::ShellToolCallParams;
use codex_async_utils::CancelErr;
use codex_async_utils::OrCancelExt;
use codex_protocol::user_input::UserInput;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::error;
use uuid::Uuid;
use crate::codex::TurnContext;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::StreamOutput;
use crate::exec::execute_exec_env;
use crate::exec_env::create_env;
use crate::parse_command::parse_command;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandBeginEvent;
use crate::protocol::ExecCommandEndEvent;
use crate::protocol::SandboxPolicy;
use crate::protocol::TaskStartedEvent;
use crate::sandboxing::ExecEnv;
use crate::state::TaskKind;
use crate::tools::context::ToolPayload;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolCall;
use crate::tools::router::ToolRouter;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::tools::format_exec_output_str;
use crate::user_shell_command::user_shell_command_record_item;
use super::SessionTask;
use super::SessionTaskContext;
const USER_SHELL_TOOL_NAME: &str = "local_shell";
#[derive(Clone)]
pub(crate) struct UserShellCommandTask {
command: String,
@@ -78,34 +85,126 @@ impl SessionTask for UserShellCommandTask {
}
};
let params = ShellToolCallParams {
let call_id = Uuid::new_v4().to_string();
let raw_command = self.command.clone();
let parsed_cmd = parse_command(&shell_invocation);
session
.send_event(
turn_context.as_ref(),
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: call_id.clone(),
command: shell_invocation.clone(),
cwd: turn_context.cwd.clone(),
parsed_cmd,
is_user_shell_command: true,
}),
)
.await;
let exec_env = ExecEnv {
command: shell_invocation,
workdir: None,
cwd: turn_context.cwd.clone(),
env: create_env(&turn_context.shell_environment_policy),
timeout_ms: None,
sandbox: SandboxType::None,
with_escalated_permissions: None,
justification: None,
arg0: None,
};
let tool_call = ToolCall {
tool_name: USER_SHELL_TOOL_NAME.to_string(),
call_id: Uuid::new_v4().to_string(),
payload: ToolPayload::LocalShell { params },
};
let stdout_stream = Some(StdoutStream {
sub_id: turn_context.sub_id.clone(),
call_id: call_id.clone(),
tx_event: session.get_tx_event(),
});
let router = Arc::new(ToolRouter::from_config(&turn_context.tools_config, None));
let tracker = Arc::new(Mutex::new(TurnDiffTracker::new()));
let runtime = ToolCallRuntime::new(
Arc::clone(&router),
Arc::clone(&session),
Arc::clone(&turn_context),
Arc::clone(&tracker),
);
let sandbox_policy = SandboxPolicy::DangerFullAccess;
let exec_result = execute_exec_env(exec_env, &sandbox_policy, stdout_stream)
.or_cancel(&cancellation_token)
.await;
if let Err(err) = runtime
.handle_tool_call(tool_call, cancellation_token)
.await
{
error!("user shell command failed: {err:?}");
match exec_result {
Err(CancelErr::Cancelled) => {
let aborted_message = "command aborted by user".to_string();
let exec_output = ExecToolCallOutput {
exit_code: -1,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(aborted_message.clone()),
aggregated_output: StreamOutput::new(aborted_message.clone()),
duration: Duration::ZERO,
timed_out: false,
};
let output_items = [user_shell_command_record_item(&raw_command, &exec_output)];
session
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
session
.send_event(
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout: String::new(),
stderr: aborted_message.clone(),
aggregated_output: aborted_message.clone(),
exit_code: -1,
duration: Duration::ZERO,
formatted_output: aborted_message,
}),
)
.await;
}
Ok(Ok(output)) => {
session
.send_event(
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: call_id.clone(),
stdout: output.stdout.text.clone(),
stderr: output.stderr.text.clone(),
aggregated_output: output.aggregated_output.text.clone(),
exit_code: output.exit_code,
duration: output.duration,
formatted_output: format_exec_output_str(&output),
}),
)
.await;
let output_items = [user_shell_command_record_item(&raw_command, &output)];
session
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
}
Ok(Err(err)) => {
error!("user shell command failed: {err:?}");
let message = format!("execution error: {err:?}");
let exec_output = ExecToolCallOutput {
exit_code: -1,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(message.clone()),
aggregated_output: StreamOutput::new(message.clone()),
duration: Duration::ZERO,
timed_out: false,
};
session
.send_event(
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout: exec_output.stdout.text.clone(),
stderr: exec_output.stderr.text.clone(),
aggregated_output: exec_output.aggregated_output.text.clone(),
exit_code: exec_output.exit_code,
duration: exec_output.duration,
formatted_output: format_exec_output_str(&exec_output),
}),
)
.await;
let output_items = [user_shell_command_record_item(&raw_command, &exec_output)];
session
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
}
}
None
}

View File

@@ -0,0 +1,108 @@
use std::time::Duration;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use crate::exec::ExecToolCallOutput;
use crate::tools::format_exec_output_str;
pub const USER_SHELL_COMMAND_OPEN: &str = "<user_shell_command>";
pub const USER_SHELL_COMMAND_CLOSE: &str = "</user_shell_command>";
pub fn is_user_shell_command_text(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with(USER_SHELL_COMMAND_OPEN)
}
fn format_duration_line(duration: Duration) -> String {
let duration_seconds = duration.as_secs_f64();
format!("Duration: {duration_seconds:.4} seconds")
}
fn format_user_shell_command_body(command: &str, exec_output: &ExecToolCallOutput) -> String {
let mut sections = Vec::new();
sections.push("<command>".to_string());
sections.push(command.to_string());
sections.push("</command>".to_string());
sections.push("<result>".to_string());
sections.push(format!("Exit code: {}", exec_output.exit_code));
sections.push(format_duration_line(exec_output.duration));
sections.push("Output:".to_string());
sections.push(format_exec_output_str(exec_output));
sections.push("</result>".to_string());
sections.join("\n")
}
pub fn format_user_shell_command_record(command: &str, exec_output: &ExecToolCallOutput) -> String {
let body = format_user_shell_command_body(command, exec_output);
format!("{USER_SHELL_COMMAND_OPEN}\n{body}\n{USER_SHELL_COMMAND_CLOSE}")
}
pub fn user_shell_command_record_item(
command: &str,
exec_output: &ExecToolCallOutput,
) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format_user_shell_command_record(command, exec_output),
}],
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::exec::StreamOutput;
use pretty_assertions::assert_eq;
#[test]
fn detects_user_shell_command_text_variants() {
assert!(is_user_shell_command_text(
"<user_shell_command>\necho hi\n</user_shell_command>"
));
assert!(!is_user_shell_command_text("echo hi"));
}
#[test]
fn formats_basic_record() {
let exec_output = ExecToolCallOutput {
exit_code: 0,
stdout: StreamOutput::new("hi".to_string()),
stderr: StreamOutput::new(String::new()),
aggregated_output: StreamOutput::new("hi".to_string()),
duration: Duration::from_secs(1),
timed_out: false,
};
let item = user_shell_command_record_item("echo hi", &exec_output);
let ResponseItem::Message { content, .. } = item else {
panic!("expected message");
};
let [ContentItem::InputText { text }] = content.as_slice() else {
panic!("expected input text");
};
assert_eq!(
text,
"<user_shell_command>\n<command>\necho hi\n</command>\n<result>\nExit code: 0\nDuration: 1.0000 seconds\nOutput:\nhi\n</result>\n</user_shell_command>"
);
}
#[test]
fn uses_aggregated_output_over_streams() {
let exec_output = ExecToolCallOutput {
exit_code: 42,
stdout: StreamOutput::new("stdout-only".to_string()),
stderr: StreamOutput::new("stderr-only".to_string()),
aggregated_output: StreamOutput::new("combined output wins".to_string()),
duration: Duration::from_millis(120),
timed_out: false,
};
let record = format_user_shell_command_record("false", &exec_output);
assert_eq!(
record,
"<user_shell_command>\n<command>\nfalse\n</command>\n<result>\nExit code: 42\nDuration: 0.1200 seconds\nOutput:\ncombined output wins\n</result>\n</user_shell_command>"
);
}
}

View File

@@ -61,6 +61,18 @@ impl ResponsesRequest {
self.0.body_json().unwrap()
}
/// Returns all `input_text` spans from `message` inputs for the provided role.
pub fn message_input_texts(&self, role: &str) -> Vec<String> {
self.inputs_of_type("message")
.into_iter()
.filter(|item| item.get("role").and_then(Value::as_str) == Some(role))
.filter_map(|item| item.get("content").and_then(Value::as_array).cloned())
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str).map(str::to_owned))
.collect()
}
pub fn input(&self) -> Vec<Value> {
self.0.body_json::<Value>().unwrap()["input"]
.as_array()

View File

@@ -2,35 +2,20 @@ use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::ExecOutputStream;
use codex_core::protocol::Op;
use codex_core::protocol::TurnAbortReason;
use core_test_support::assert_regex_match;
use core_test_support::load_default_config_for_test;
use core_test_support::responses;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use regex_lite::escape;
use std::path::PathBuf;
use std::process::Command;
use std::process::Stdio;
use tempfile::TempDir;
fn detect_python_executable() -> Option<String> {
let candidates = ["python3", "python"];
candidates.iter().find_map(|candidate| {
Command::new(candidate)
.arg("--version")
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.ok()
.and_then(|status| status.success().then(|| (*candidate).to_string()))
})
}
#[tokio::test]
async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
let Some(python) = detect_python_executable() else {
eprintln!("skipping test: python3 not found in PATH");
return;
};
// Create a temporary working directory with a known file.
let cwd = TempDir::new().unwrap();
let file_name = "hello.txt";
@@ -55,10 +40,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
.await
.expect("create new conversation");
// 1) python should list the file
let list_cmd = format!(
"{python} -c \"import pathlib; print('\\n'.join(sorted(p.name for p in pathlib.Path('.').iterdir())))\""
);
// 1) shell command should list the file
let list_cmd = "ls".to_string();
codex
.submit(Op::RunUserShellCommand { command: list_cmd })
.await
@@ -76,10 +59,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
"ls output should include {file_name}, got: {stdout:?}"
);
// 2) python should print the file contents verbatim
let cat_cmd = format!(
"{python} -c \"import pathlib; print(pathlib.Path('{file_name}').read_text(), end='')\""
);
// 2) shell command should print the file contents verbatim
let cat_cmd = format!("cat {file_name}");
codex
.submit(Op::RunUserShellCommand { command: cat_cmd })
.await
@@ -95,7 +76,7 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
};
assert_eq!(exit_code, 0);
if cfg!(windows) {
// Windows' Python writes CRLF line endings; normalize so the assertion remains portable.
// Windows shells emit CRLF line endings; normalize so the assertion remains portable.
stdout = stdout.replace("\r\n", "\n");
}
assert_eq!(stdout, contents);
@@ -103,10 +84,6 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
#[tokio::test]
async fn user_shell_cmd_can_be_interrupted() {
let Some(python) = detect_python_executable() else {
eprintln!("skipping test: python3 not found in PATH");
return;
};
// Set up isolated config and conversation.
let codex_home = TempDir::new().unwrap();
let config = load_default_config_for_test(&codex_home);
@@ -121,7 +98,7 @@ async fn user_shell_cmd_can_be_interrupted() {
.expect("create new conversation");
// Start a long-running command and then interrupt it.
let sleep_cmd = format!("{python} -c \"import time; time.sleep(5)\"");
let sleep_cmd = "sleep 5".to_string();
codex
.submit(Op::RunUserShellCommand { command: sleep_cmd })
.await
@@ -138,3 +115,137 @@ async fn user_shell_cmd_can_be_interrupted() {
};
assert_eq!(ev.reason, TurnAbortReason::Interrupted);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_shell_command_history_is_persisted_and_shared_with_model() -> anyhow::Result<()> {
let server = responses::start_mock_server().await;
let mut builder = core_test_support::test_codex::test_codex();
let test = builder.build(&server).await?;
#[cfg(windows)]
let command = r#"$val = $env:CODEX_SANDBOX; if ([string]::IsNullOrEmpty($val)) { $val = 'not-set' } ; [System.Console]::Write($val)"#.to_string();
#[cfg(not(windows))]
let command = r#"sh -c "printf '%s' \"${CODEX_SANDBOX:-not-set}\"""#.to_string();
test.codex
.submit(Op::RunUserShellCommand {
command: command.clone(),
})
.await?;
let begin_event = wait_for_event_match(&test.codex, |ev| match ev {
EventMsg::ExecCommandBegin(event) => Some(event.clone()),
_ => None,
})
.await;
assert!(begin_event.is_user_shell_command);
let matches_last_arg = begin_event.command.last() == Some(&command);
let matches_split = shlex::split(&command).is_some_and(|split| split == begin_event.command);
assert!(
matches_last_arg || matches_split,
"user command begin event should include the original command; got: {:?}",
begin_event.command
);
let delta_event = wait_for_event_match(&test.codex, |ev| match ev {
EventMsg::ExecCommandOutputDelta(event) => Some(event.clone()),
_ => None,
})
.await;
assert_eq!(delta_event.stream, ExecOutputStream::Stdout);
let chunk_text =
String::from_utf8(delta_event.chunk.clone()).expect("user command chunk is valid utf-8");
assert_eq!(chunk_text.trim(), "not-set");
let end_event = wait_for_event_match(&test.codex, |ev| match ev {
EventMsg::ExecCommandEnd(event) => Some(event.clone()),
_ => None,
})
.await;
assert_eq!(end_event.exit_code, 0);
assert_eq!(end_event.stdout.trim(), "not-set");
let _ = wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let responses = vec![responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "done"),
responses::ev_completed("resp-1"),
])];
let mock = responses::mount_sse_sequence(&server, responses).await;
test.submit_turn("follow-up after shell command").await?;
let request = mock.single_request();
let command_message = request
.message_input_texts("user")
.into_iter()
.find(|text| text.contains("<user_shell_command>"))
.expect("command message recorded in request");
let command_message = command_message.replace("\r\n", "\n");
let escaped_command = escape(&command);
let expected_pattern = format!(
r"(?m)\A<user_shell_command>\n<command>\n{escaped_command}\n</command>\n<result>\nExit code: 0\nDuration: [0-9]+(?:\.[0-9]+)? seconds\nOutput:\nnot-set\n</result>\n</user_shell_command>\z"
);
assert_regex_match(&expected_pattern, &command_message);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_shell_command_output_is_truncated_in_history() -> anyhow::Result<()> {
let server = responses::start_mock_server().await;
let mut builder = core_test_support::test_codex::test_codex();
let test = builder.build(&server).await?;
#[cfg(windows)]
let command = r#"for ($i=1; $i -le 400; $i++) { Write-Output $i }"#.to_string();
#[cfg(not(windows))]
let command = "seq 1 400".to_string();
test.codex
.submit(Op::RunUserShellCommand {
command: command.clone(),
})
.await?;
let end_event = wait_for_event_match(&test.codex, |ev| match ev {
EventMsg::ExecCommandEnd(event) => Some(event.clone()),
_ => None,
})
.await;
assert_eq!(end_event.exit_code, 0);
let _ = wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let responses = vec![responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "done"),
responses::ev_completed("resp-1"),
])];
let mock = responses::mount_sse_sequence(&server, responses).await;
test.submit_turn("follow-up after shell command").await?;
let request = mock.single_request();
let command_message = request
.message_input_texts("user")
.into_iter()
.find(|text| text.contains("<user_shell_command>"))
.expect("command message recorded in request");
let command_message = command_message.replace("\r\n", "\n");
let head = (1..=128).map(|i| format!("{i}\n")).collect::<String>();
let tail = (273..=400).map(|i| format!("{i}\n")).collect::<String>();
let truncated_body =
format!("Total output lines: 400\n\n{head}\n[... omitted 144 of 400 lines ...]\n\n{tail}");
let escaped_command = escape(&command);
let escaped_truncated_body = escape(&truncated_body);
let expected_pattern = format!(
r"(?m)\A<user_shell_command>\n<command>\n{escaped_command}\n</command>\n<result>\nExit code: 0\nDuration: [0-9]+(?:\.[0-9]+)? seconds\nOutput:\n{escaped_truncated_body}\n</result>\n</user_shell_command>\z"
);
assert_regex_match(&expected_pattern, &command_message);
Ok(())
}

View File

@@ -317,7 +317,13 @@ impl ExecCell {
Some(false) => "".red().bold(),
None => spinner(call.start_time),
};
let title = if self.is_active() { "Running" } else { "Ran" };
let title = if self.is_active() {
"Running"
} else if call.is_user_shell_command {
"You ran"
} else {
"Ran"
};
let mut header_line =
Line::from(vec![bullet.clone(), " ".into(), title.bold(), " ".into()]);