diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index c9edd540..d0aa1d81 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -14,6 +14,7 @@ use tracing::warn; use uuid::Uuid; use crate::user_instructions::UserInstructions; +use crate::user_shell_command::is_user_shell_command_text; fn is_session_prefix(text: &str) -> bool { let trimmed = text.trim_start(); @@ -31,7 +32,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option { for content_item in message.iter() { match content_item { ContentItem::InputText { text } => { - if is_session_prefix(text) { + if is_session_prefix(text) || is_user_shell_command_text(text) { return None; } content.push(UserInput::Text { text: text.clone() }); @@ -197,7 +198,14 @@ mod tests { text: "# AGENTS.md instructions for test_directory\n\n\ntest_text\n".to_string(), }], }, - ]; + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "echo 42".to_string(), + }], + }, + ]; for item in items { let turn_item = parse_turn_item(&item); diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 0738e7af..5229d006 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -81,6 +81,7 @@ mod function_tool; mod state; mod tasks; mod user_notification; +mod user_shell_command; pub mod util; pub use apply_patch::CODEX_APPLY_PATCH_ARG1; diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 0e57e1b7..28a84f23 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -1,28 +1,35 @@ use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; -use codex_protocol::models::ShellToolCallParams; +use codex_async_utils::CancelErr; +use codex_async_utils::OrCancelExt; use codex_protocol::user_input::UserInput; -use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::error; use uuid::Uuid; use crate::codex::TurnContext; +use crate::exec::ExecToolCallOutput; +use crate::exec::SandboxType; +use crate::exec::StdoutStream; +use crate::exec::StreamOutput; +use crate::exec::execute_exec_env; +use crate::exec_env::create_env; +use crate::parse_command::parse_command; use crate::protocol::EventMsg; +use crate::protocol::ExecCommandBeginEvent; +use crate::protocol::ExecCommandEndEvent; +use crate::protocol::SandboxPolicy; use crate::protocol::TaskStartedEvent; +use crate::sandboxing::ExecEnv; use crate::state::TaskKind; -use crate::tools::context::ToolPayload; -use crate::tools::parallel::ToolCallRuntime; -use crate::tools::router::ToolCall; -use crate::tools::router::ToolRouter; -use crate::turn_diff_tracker::TurnDiffTracker; +use crate::tools::format_exec_output_str; +use crate::user_shell_command::user_shell_command_record_item; use super::SessionTask; use super::SessionTaskContext; -const USER_SHELL_TOOL_NAME: &str = "local_shell"; - #[derive(Clone)] pub(crate) struct UserShellCommandTask { command: String, @@ -78,34 +85,126 @@ impl SessionTask for UserShellCommandTask { } }; - let params = ShellToolCallParams { + let call_id = Uuid::new_v4().to_string(); + let raw_command = self.command.clone(); + + let parsed_cmd = parse_command(&shell_invocation); + session + .send_event( + turn_context.as_ref(), + EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id: call_id.clone(), + command: shell_invocation.clone(), + cwd: turn_context.cwd.clone(), + parsed_cmd, + is_user_shell_command: true, + }), + ) + .await; + + let exec_env = ExecEnv { command: shell_invocation, - workdir: None, + cwd: turn_context.cwd.clone(), + env: create_env(&turn_context.shell_environment_policy), timeout_ms: None, + sandbox: SandboxType::None, with_escalated_permissions: None, justification: None, + arg0: None, }; - let tool_call = ToolCall { - tool_name: USER_SHELL_TOOL_NAME.to_string(), - call_id: Uuid::new_v4().to_string(), - payload: ToolPayload::LocalShell { params }, - }; + let stdout_stream = Some(StdoutStream { + sub_id: turn_context.sub_id.clone(), + call_id: call_id.clone(), + tx_event: session.get_tx_event(), + }); - let router = Arc::new(ToolRouter::from_config(&turn_context.tools_config, None)); - let tracker = Arc::new(Mutex::new(TurnDiffTracker::new())); - let runtime = ToolCallRuntime::new( - Arc::clone(&router), - Arc::clone(&session), - Arc::clone(&turn_context), - Arc::clone(&tracker), - ); + let sandbox_policy = SandboxPolicy::DangerFullAccess; + let exec_result = execute_exec_env(exec_env, &sandbox_policy, stdout_stream) + .or_cancel(&cancellation_token) + .await; - if let Err(err) = runtime - .handle_tool_call(tool_call, cancellation_token) - .await - { - error!("user shell command failed: {err:?}"); + match exec_result { + Err(CancelErr::Cancelled) => { + let aborted_message = "command aborted by user".to_string(); + let exec_output = ExecToolCallOutput { + exit_code: -1, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(aborted_message.clone()), + aggregated_output: StreamOutput::new(aborted_message.clone()), + duration: Duration::ZERO, + timed_out: false, + }; + let output_items = [user_shell_command_record_item(&raw_command, &exec_output)]; + session + .record_conversation_items(turn_context.as_ref(), &output_items) + .await; + session + .send_event( + turn_context.as_ref(), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id, + stdout: String::new(), + stderr: aborted_message.clone(), + aggregated_output: aborted_message.clone(), + exit_code: -1, + duration: Duration::ZERO, + formatted_output: aborted_message, + }), + ) + .await; + } + Ok(Ok(output)) => { + session + .send_event( + turn_context.as_ref(), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: call_id.clone(), + stdout: output.stdout.text.clone(), + stderr: output.stderr.text.clone(), + aggregated_output: output.aggregated_output.text.clone(), + exit_code: output.exit_code, + duration: output.duration, + formatted_output: format_exec_output_str(&output), + }), + ) + .await; + + let output_items = [user_shell_command_record_item(&raw_command, &output)]; + session + .record_conversation_items(turn_context.as_ref(), &output_items) + .await; + } + Ok(Err(err)) => { + error!("user shell command failed: {err:?}"); + let message = format!("execution error: {err:?}"); + let exec_output = ExecToolCallOutput { + exit_code: -1, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(message.clone()), + aggregated_output: StreamOutput::new(message.clone()), + duration: Duration::ZERO, + timed_out: false, + }; + session + .send_event( + turn_context.as_ref(), + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id, + stdout: exec_output.stdout.text.clone(), + stderr: exec_output.stderr.text.clone(), + aggregated_output: exec_output.aggregated_output.text.clone(), + exit_code: exec_output.exit_code, + duration: exec_output.duration, + formatted_output: format_exec_output_str(&exec_output), + }), + ) + .await; + let output_items = [user_shell_command_record_item(&raw_command, &exec_output)]; + session + .record_conversation_items(turn_context.as_ref(), &output_items) + .await; + } } None } diff --git a/codex-rs/core/src/user_shell_command.rs b/codex-rs/core/src/user_shell_command.rs new file mode 100644 index 00000000..7f0731c9 --- /dev/null +++ b/codex-rs/core/src/user_shell_command.rs @@ -0,0 +1,108 @@ +use std::time::Duration; + +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; + +use crate::exec::ExecToolCallOutput; +use crate::tools::format_exec_output_str; + +pub const USER_SHELL_COMMAND_OPEN: &str = ""; +pub const USER_SHELL_COMMAND_CLOSE: &str = ""; + +pub fn is_user_shell_command_text(text: &str) -> bool { + let trimmed = text.trim_start(); + let lowered = trimmed.to_ascii_lowercase(); + lowered.starts_with(USER_SHELL_COMMAND_OPEN) +} + +fn format_duration_line(duration: Duration) -> String { + let duration_seconds = duration.as_secs_f64(); + format!("Duration: {duration_seconds:.4} seconds") +} + +fn format_user_shell_command_body(command: &str, exec_output: &ExecToolCallOutput) -> String { + let mut sections = Vec::new(); + sections.push("".to_string()); + sections.push(command.to_string()); + sections.push("".to_string()); + sections.push("".to_string()); + sections.push(format!("Exit code: {}", exec_output.exit_code)); + sections.push(format_duration_line(exec_output.duration)); + sections.push("Output:".to_string()); + sections.push(format_exec_output_str(exec_output)); + sections.push("".to_string()); + sections.join("\n") +} + +pub fn format_user_shell_command_record(command: &str, exec_output: &ExecToolCallOutput) -> String { + let body = format_user_shell_command_body(command, exec_output); + format!("{USER_SHELL_COMMAND_OPEN}\n{body}\n{USER_SHELL_COMMAND_CLOSE}") +} + +pub fn user_shell_command_record_item( + command: &str, + exec_output: &ExecToolCallOutput, +) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format_user_shell_command_record(command, exec_output), + }], + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::exec::StreamOutput; + use pretty_assertions::assert_eq; + + #[test] + fn detects_user_shell_command_text_variants() { + assert!(is_user_shell_command_text( + "\necho hi\n" + )); + assert!(!is_user_shell_command_text("echo hi")); + } + + #[test] + fn formats_basic_record() { + let exec_output = ExecToolCallOutput { + exit_code: 0, + stdout: StreamOutput::new("hi".to_string()), + stderr: StreamOutput::new(String::new()), + aggregated_output: StreamOutput::new("hi".to_string()), + duration: Duration::from_secs(1), + timed_out: false, + }; + let item = user_shell_command_record_item("echo hi", &exec_output); + let ResponseItem::Message { content, .. } = item else { + panic!("expected message"); + }; + let [ContentItem::InputText { text }] = content.as_slice() else { + panic!("expected input text"); + }; + assert_eq!( + text, + "\n\necho hi\n\n\nExit code: 0\nDuration: 1.0000 seconds\nOutput:\nhi\n\n" + ); + } + + #[test] + fn uses_aggregated_output_over_streams() { + let exec_output = ExecToolCallOutput { + exit_code: 42, + stdout: StreamOutput::new("stdout-only".to_string()), + stderr: StreamOutput::new("stderr-only".to_string()), + aggregated_output: StreamOutput::new("combined output wins".to_string()), + duration: Duration::from_millis(120), + timed_out: false, + }; + let record = format_user_shell_command_record("false", &exec_output); + assert_eq!( + record, + "\n\nfalse\n\n\nExit code: 42\nDuration: 0.1200 seconds\nOutput:\ncombined output wins\n\n" + ); + } +} diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 4fd6dcf7..1eaf9d0c 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -61,6 +61,18 @@ impl ResponsesRequest { self.0.body_json().unwrap() } + /// Returns all `input_text` spans from `message` inputs for the provided role. + pub fn message_input_texts(&self, role: &str) -> Vec { + self.inputs_of_type("message") + .into_iter() + .filter(|item| item.get("role").and_then(Value::as_str) == Some(role)) + .filter_map(|item| item.get("content").and_then(Value::as_array).cloned()) + .flatten() + .filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text")) + .filter_map(|span| span.get("text").and_then(Value::as_str).map(str::to_owned)) + .collect() + } + pub fn input(&self) -> Vec { self.0.body_json::().unwrap()["input"] .as_array() diff --git a/codex-rs/core/tests/suite/user_shell_cmd.rs b/codex-rs/core/tests/suite/user_shell_cmd.rs index 0832d7e6..fa09f005 100644 --- a/codex-rs/core/tests/suite/user_shell_cmd.rs +++ b/codex-rs/core/tests/suite/user_shell_cmd.rs @@ -2,35 +2,20 @@ use codex_core::ConversationManager; use codex_core::NewConversation; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecCommandEndEvent; +use codex_core::protocol::ExecOutputStream; use codex_core::protocol::Op; use codex_core::protocol::TurnAbortReason; +use core_test_support::assert_regex_match; use core_test_support::load_default_config_for_test; +use core_test_support::responses; use core_test_support::wait_for_event; +use core_test_support::wait_for_event_match; +use regex_lite::escape; use std::path::PathBuf; -use std::process::Command; -use std::process::Stdio; use tempfile::TempDir; -fn detect_python_executable() -> Option { - let candidates = ["python3", "python"]; - candidates.iter().find_map(|candidate| { - Command::new(candidate) - .arg("--version") - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .status() - .ok() - .and_then(|status| status.success().then(|| (*candidate).to_string())) - }) -} - #[tokio::test] async fn user_shell_cmd_ls_and_cat_in_temp_dir() { - let Some(python) = detect_python_executable() else { - eprintln!("skipping test: python3 not found in PATH"); - return; - }; - // Create a temporary working directory with a known file. let cwd = TempDir::new().unwrap(); let file_name = "hello.txt"; @@ -55,10 +40,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() { .await .expect("create new conversation"); - // 1) python should list the file - let list_cmd = format!( - "{python} -c \"import pathlib; print('\\n'.join(sorted(p.name for p in pathlib.Path('.').iterdir())))\"" - ); + // 1) shell command should list the file + let list_cmd = "ls".to_string(); codex .submit(Op::RunUserShellCommand { command: list_cmd }) .await @@ -76,10 +59,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() { "ls output should include {file_name}, got: {stdout:?}" ); - // 2) python should print the file contents verbatim - let cat_cmd = format!( - "{python} -c \"import pathlib; print(pathlib.Path('{file_name}').read_text(), end='')\"" - ); + // 2) shell command should print the file contents verbatim + let cat_cmd = format!("cat {file_name}"); codex .submit(Op::RunUserShellCommand { command: cat_cmd }) .await @@ -95,7 +76,7 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() { }; assert_eq!(exit_code, 0); if cfg!(windows) { - // Windows' Python writes CRLF line endings; normalize so the assertion remains portable. + // Windows shells emit CRLF line endings; normalize so the assertion remains portable. stdout = stdout.replace("\r\n", "\n"); } assert_eq!(stdout, contents); @@ -103,10 +84,6 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() { #[tokio::test] async fn user_shell_cmd_can_be_interrupted() { - let Some(python) = detect_python_executable() else { - eprintln!("skipping test: python3 not found in PATH"); - return; - }; // Set up isolated config and conversation. let codex_home = TempDir::new().unwrap(); let config = load_default_config_for_test(&codex_home); @@ -121,7 +98,7 @@ async fn user_shell_cmd_can_be_interrupted() { .expect("create new conversation"); // Start a long-running command and then interrupt it. - let sleep_cmd = format!("{python} -c \"import time; time.sleep(5)\""); + let sleep_cmd = "sleep 5".to_string(); codex .submit(Op::RunUserShellCommand { command: sleep_cmd }) .await @@ -138,3 +115,137 @@ async fn user_shell_cmd_can_be_interrupted() { }; assert_eq!(ev.reason, TurnAbortReason::Interrupted); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn user_shell_command_history_is_persisted_and_shared_with_model() -> anyhow::Result<()> { + let server = responses::start_mock_server().await; + let mut builder = core_test_support::test_codex::test_codex(); + let test = builder.build(&server).await?; + + #[cfg(windows)] + let command = r#"$val = $env:CODEX_SANDBOX; if ([string]::IsNullOrEmpty($val)) { $val = 'not-set' } ; [System.Console]::Write($val)"#.to_string(); + #[cfg(not(windows))] + let command = r#"sh -c "printf '%s' \"${CODEX_SANDBOX:-not-set}\"""#.to_string(); + + test.codex + .submit(Op::RunUserShellCommand { + command: command.clone(), + }) + .await?; + + let begin_event = wait_for_event_match(&test.codex, |ev| match ev { + EventMsg::ExecCommandBegin(event) => Some(event.clone()), + _ => None, + }) + .await; + assert!(begin_event.is_user_shell_command); + let matches_last_arg = begin_event.command.last() == Some(&command); + let matches_split = shlex::split(&command).is_some_and(|split| split == begin_event.command); + assert!( + matches_last_arg || matches_split, + "user command begin event should include the original command; got: {:?}", + begin_event.command + ); + + let delta_event = wait_for_event_match(&test.codex, |ev| match ev { + EventMsg::ExecCommandOutputDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + assert_eq!(delta_event.stream, ExecOutputStream::Stdout); + let chunk_text = + String::from_utf8(delta_event.chunk.clone()).expect("user command chunk is valid utf-8"); + assert_eq!(chunk_text.trim(), "not-set"); + + let end_event = wait_for_event_match(&test.codex, |ev| match ev { + EventMsg::ExecCommandEnd(event) => Some(event.clone()), + _ => None, + }) + .await; + assert_eq!(end_event.exit_code, 0); + assert_eq!(end_event.stdout.trim(), "not-set"); + + let _ = wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let responses = vec![responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "done"), + responses::ev_completed("resp-1"), + ])]; + let mock = responses::mount_sse_sequence(&server, responses).await; + + test.submit_turn("follow-up after shell command").await?; + + let request = mock.single_request(); + + let command_message = request + .message_input_texts("user") + .into_iter() + .find(|text| text.contains("")) + .expect("command message recorded in request"); + let command_message = command_message.replace("\r\n", "\n"); + let escaped_command = escape(&command); + let expected_pattern = format!( + r"(?m)\A\n\n{escaped_command}\n\n\nExit code: 0\nDuration: [0-9]+(?:\.[0-9]+)? seconds\nOutput:\nnot-set\n\n\z" + ); + assert_regex_match(&expected_pattern, &command_message); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn user_shell_command_output_is_truncated_in_history() -> anyhow::Result<()> { + let server = responses::start_mock_server().await; + let mut builder = core_test_support::test_codex::test_codex(); + let test = builder.build(&server).await?; + + #[cfg(windows)] + let command = r#"for ($i=1; $i -le 400; $i++) { Write-Output $i }"#.to_string(); + #[cfg(not(windows))] + let command = "seq 1 400".to_string(); + + test.codex + .submit(Op::RunUserShellCommand { + command: command.clone(), + }) + .await?; + + let end_event = wait_for_event_match(&test.codex, |ev| match ev { + EventMsg::ExecCommandEnd(event) => Some(event.clone()), + _ => None, + }) + .await; + assert_eq!(end_event.exit_code, 0); + + let _ = wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + let responses = vec![responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "done"), + responses::ev_completed("resp-1"), + ])]; + let mock = responses::mount_sse_sequence(&server, responses).await; + + test.submit_turn("follow-up after shell command").await?; + + let request = mock.single_request(); + let command_message = request + .message_input_texts("user") + .into_iter() + .find(|text| text.contains("")) + .expect("command message recorded in request"); + let command_message = command_message.replace("\r\n", "\n"); + + let head = (1..=128).map(|i| format!("{i}\n")).collect::(); + let tail = (273..=400).map(|i| format!("{i}\n")).collect::(); + let truncated_body = + format!("Total output lines: 400\n\n{head}\n[... omitted 144 of 400 lines ...]\n\n{tail}"); + let escaped_command = escape(&command); + let escaped_truncated_body = escape(&truncated_body); + let expected_pattern = format!( + r"(?m)\A\n\n{escaped_command}\n\n\nExit code: 0\nDuration: [0-9]+(?:\.[0-9]+)? seconds\nOutput:\n{escaped_truncated_body}\n\n\z" + ); + assert_regex_match(&expected_pattern, &command_message); + + Ok(()) +} diff --git a/codex-rs/tui/src/exec_cell/render.rs b/codex-rs/tui/src/exec_cell/render.rs index 3ccc0527..8ebc1251 100644 --- a/codex-rs/tui/src/exec_cell/render.rs +++ b/codex-rs/tui/src/exec_cell/render.rs @@ -317,7 +317,13 @@ impl ExecCell { Some(false) => "•".red().bold(), None => spinner(call.start_time), }; - let title = if self.is_active() { "Running" } else { "Ran" }; + let title = if self.is_active() { + "Running" + } else if call.is_user_shell_command { + "You ran" + } else { + "Ran" + }; let mut header_line = Line::from(vec![bullet.clone(), " ".into(), title.bold(), " ".into()]);