diff --git a/codex-rs/core/src/tools/events.rs b/codex-rs/core/src/tools/events.rs index 74cf1ade..0a0bbaed 100644 --- a/codex-rs/core/src/tools/events.rs +++ b/codex-rs/core/src/tools/events.rs @@ -76,6 +76,13 @@ pub(crate) enum ToolEmitter { changes: HashMap, auto_approved: bool, }, + UnifiedExec { + command: String, + cwd: PathBuf, + // True for `exec_command` and false for `write_stdin`. + #[allow(dead_code)] + is_startup_command: bool, + }, } impl ToolEmitter { @@ -90,6 +97,14 @@ impl ToolEmitter { } } + pub fn unified_exec(command: String, cwd: PathBuf, is_startup_command: bool) -> Self { + Self::UnifiedExec { + command, + cwd, + is_startup_command, + } + } + pub async fn emit(&self, ctx: ToolEventCtx<'_>, stage: ToolEventStage) { match (self, stage) { (Self::Shell { command, cwd }, ToolEventStage::Begin) => { @@ -181,6 +196,10 @@ impl ToolEmitter { ) => { emit_patch_end(ctx, String::new(), (*message).to_string(), false).await; } + (Self::UnifiedExec { command, cwd, .. }, _) => { + // TODO(jif) add end and failures. + emit_exec_command_begin(ctx, &[command.to_string()], cwd.as_path()).await; + } } } } diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index 2c238909..0ad3739c 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -1,35 +1,68 @@ +use std::time::Duration; + use async_trait::async_trait; use serde::Deserialize; +use serde::Serialize; use crate::function_tool::FunctionCallError; use crate::tools::context::ToolInvocation; use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; +use crate::tools::events::ToolEmitter; +use crate::tools::events::ToolEventCtx; +use crate::tools::events::ToolEventStage; use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; -use crate::unified_exec::UnifiedExecRequest; +use crate::unified_exec::ExecCommandRequest; +use crate::unified_exec::UnifiedExecContext; +use crate::unified_exec::UnifiedExecResponse; +use crate::unified_exec::UnifiedExecSessionManager; +use crate::unified_exec::WriteStdinRequest; pub struct UnifiedExecHandler; -#[derive(Deserialize)] -struct UnifiedExecArgs { - input: Vec, +#[derive(Debug, Deserialize)] +struct ExecCommandArgs { + cmd: String, + #[serde(default = "default_shell")] + shell: String, + #[serde(default = "default_login")] + login: bool, #[serde(default)] - session_id: Option, + yield_time_ms: Option, #[serde(default)] - timeout_ms: Option, + max_output_tokens: Option, +} + +#[derive(Debug, Deserialize)] +struct WriteStdinArgs { + session_id: i32, + #[serde(default)] + chars: String, + #[serde(default)] + yield_time_ms: Option, + #[serde(default)] + max_output_tokens: Option, +} + +fn default_shell() -> String { + "/bin/bash".to_string() +} + +fn default_login() -> bool { + true } #[async_trait] impl ToolHandler for UnifiedExecHandler { fn kind(&self) -> ToolKind { - ToolKind::UnifiedExec + ToolKind::Function } fn matches_kind(&self, payload: &ToolPayload) -> bool { matches!( payload, - ToolPayload::UnifiedExec { .. } | ToolPayload::Function { .. } + ToolPayload::Function { .. } | ToolPayload::UnifiedExec { .. } ) } @@ -38,19 +71,14 @@ impl ToolHandler for UnifiedExecHandler { session, turn, call_id, - tool_name: _tool_name, + tool_name, payload, .. } = invocation; - let args = match payload { - ToolPayload::UnifiedExec { arguments } | ToolPayload::Function { arguments } => { - serde_json::from_str::(&arguments).map_err(|err| { - FunctionCallError::RespondToModel(format!( - "failed to parse function arguments: {err:?}" - )) - })? - } + let arguments = match payload { + ToolPayload::Function { arguments } => arguments, + ToolPayload::UnifiedExec { arguments } => arguments, _ => { return Err(FunctionCallError::RespondToModel( "unified_exec handler received unsupported payload".to_string(), @@ -58,58 +86,69 @@ impl ToolHandler for UnifiedExecHandler { } }; - let UnifiedExecArgs { - input, - session_id, - timeout_ms, - } = args; + let manager: &UnifiedExecSessionManager = &session.services.unified_exec_manager; + let context = UnifiedExecContext { + session: &session, + turn: turn.as_ref(), + call_id: &call_id, + }; - let parsed_session_id = if let Some(session_id) = session_id { - match session_id.parse::() { - Ok(parsed) => Some(parsed), - Err(output) => { - return Err(FunctionCallError::RespondToModel(format!( - "invalid session_id: {session_id} due to error {output:?}" - ))); - } + let response = match tool_name.as_str() { + "exec_command" => { + let args: ExecCommandArgs = serde_json::from_str(&arguments).map_err(|err| { + FunctionCallError::RespondToModel(format!( + "failed to parse exec_command arguments: {err:?}" + )) + })?; + + let event_ctx = + ToolEventCtx::new(context.session, context.turn, context.call_id, None); + let emitter = + ToolEmitter::unified_exec(args.cmd.clone(), context.turn.cwd.clone(), true); + emitter.emit(event_ctx, ToolEventStage::Begin).await; + + manager + .exec_command( + ExecCommandRequest { + command: &args.cmd, + shell: &args.shell, + login: args.login, + yield_time_ms: args.yield_time_ms, + max_output_tokens: args.max_output_tokens, + }, + &context, + ) + .await + .map_err(|err| { + FunctionCallError::RespondToModel(format!("exec_command failed: {err:?}")) + })? + } + "write_stdin" => { + let args: WriteStdinArgs = serde_json::from_str(&arguments).map_err(|err| { + FunctionCallError::RespondToModel(format!( + "failed to parse write_stdin arguments: {err:?}" + )) + })?; + manager + .write_stdin(WriteStdinRequest { + session_id: args.session_id, + input: &args.chars, + yield_time_ms: args.yield_time_ms, + max_output_tokens: args.max_output_tokens, + }) + .await + .map_err(|err| { + FunctionCallError::RespondToModel(format!("write_stdin failed: {err:?}")) + })? + } + other => { + return Err(FunctionCallError::RespondToModel(format!( + "unsupported unified exec function {other}" + ))); } - } else { - None }; - let request = UnifiedExecRequest { - input_chunks: &input, - timeout_ms, - }; - - let value = session - .services - .unified_exec_manager - .handle_request( - request, - crate::unified_exec::UnifiedExecContext { - session: &session, - turn: turn.as_ref(), - call_id: &call_id, - session_id: parsed_session_id, - }, - ) - .await - .map_err(|err| { - FunctionCallError::RespondToModel(format!("unified exec failed: {err:?}")) - })?; - - #[derive(serde::Serialize)] - struct SerializedUnifiedExecResult { - session_id: Option, - output: String, - } - - let content = serde_json::to_string(&SerializedUnifiedExecResult { - session_id: value.session_id.map(|id| id.to_string()), - output: value.output, - }) - .map_err(|err| { + let content = serialize_response(&response).map_err(|err| { FunctionCallError::RespondToModel(format!( "failed to serialize unified exec output: {err:?}" )) @@ -121,3 +160,33 @@ impl ToolHandler for UnifiedExecHandler { }) } } + +#[derive(Serialize)] +struct SerializedUnifiedExecResponse<'a> { + chunk_id: &'a str, + wall_time_seconds: f64, + output: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + session_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + exit_code: Option, + #[serde(skip_serializing_if = "Option::is_none")] + original_token_count: Option, +} + +fn serialize_response(response: &UnifiedExecResponse) -> Result { + let payload = SerializedUnifiedExecResponse { + chunk_id: &response.chunk_id, + wall_time_seconds: duration_to_seconds(response.wall_time), + output: &response.output, + session_id: response.session_id, + exit_code: response.exit_code, + original_token_count: response.original_token_count, + }; + + serde_json::to_string(&payload) +} + +fn duration_to_seconds(duration: Duration) -> f64 { + duration.as_secs_f64() +} diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index c44cdbd8..87692597 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -15,7 +15,6 @@ use crate::tools::context::ToolPayload; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum ToolKind { Function, - UnifiedExec, Mcp, } @@ -27,7 +26,6 @@ pub trait ToolHandler: Send + Sync { matches!( (self.kind(), payload), (ToolKind::Function, ToolPayload::Function { .. }) - | (ToolKind::UnifiedExec, ToolPayload::UnifiedExec { .. }) | (ToolKind::Mcp, ToolPayload::Mcp { .. }) ) } diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 84baff80..fdf95abb 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -136,48 +136,99 @@ impl From for AdditionalProperties { } } -fn create_unified_exec_tool() -> ToolSpec { +fn create_exec_command_tool() -> ToolSpec { let mut properties = BTreeMap::new(); properties.insert( - "input".to_string(), - JsonSchema::Array { - items: Box::new(JsonSchema::String { description: None }), - description: Some( - "When no session_id is provided, treat the array as the command and arguments \ - to launch. When session_id is set, concatenate the strings (in order) and write \ - them to the session's stdin." - .to_string(), - ), - }, - ); - properties.insert( - "session_id".to_string(), + "cmd".to_string(), JsonSchema::String { + description: Some("Shell command to execute.".to_string()), + }, + ); + properties.insert( + "shell".to_string(), + JsonSchema::String { + description: Some("Shell binary to launch. Defaults to /bin/bash.".to_string()), + }, + ); + properties.insert( + "login".to_string(), + JsonSchema::Boolean { description: Some( - "Identifier for an existing interactive session. If omitted, a new command \ - is spawned." - .to_string(), + "Whether to run the shell with -l/-i semantics. Defaults to true.".to_string(), ), }, ); properties.insert( - "timeout_ms".to_string(), + "yield_time_ms".to_string(), JsonSchema::Number { description: Some( - "Maximum time in milliseconds to wait for output after writing the input." - .to_string(), + "How long to wait (in milliseconds) for output before yielding.".to_string(), + ), + }, + ); + properties.insert( + "max_output_tokens".to_string(), + JsonSchema::Number { + description: Some( + "Maximum number of tokens to return. Excess output will be truncated.".to_string(), ), }, ); ToolSpec::Function(ResponsesApiTool { - name: "unified_exec".to_string(), + name: "exec_command".to_string(), description: - "Runs a command in a PTY. Provide a session_id to reuse an existing interactive session.".to_string(), + "Runs a command in a PTY, returning output or a session ID for ongoing interaction." + .to_string(), strict: false, parameters: JsonSchema::Object { properties, - required: Some(vec!["input".to_string()]), + required: Some(vec!["cmd".to_string()]), + additional_properties: Some(false.into()), + }, + }) +} + +fn create_write_stdin_tool() -> ToolSpec { + let mut properties = BTreeMap::new(); + properties.insert( + "session_id".to_string(), + JsonSchema::Number { + description: Some("Identifier of the running unified exec session.".to_string()), + }, + ); + properties.insert( + "chars".to_string(), + JsonSchema::String { + description: Some("Bytes to write to stdin (may be empty to poll).".to_string()), + }, + ); + properties.insert( + "yield_time_ms".to_string(), + JsonSchema::Number { + description: Some( + "How long to wait (in milliseconds) for output before yielding.".to_string(), + ), + }, + ); + properties.insert( + "max_output_tokens".to_string(), + JsonSchema::Number { + description: Some( + "Maximum number of tokens to return. Excess output will be truncated.".to_string(), + ), + }, + ); + + ToolSpec::Function(ResponsesApiTool { + name: "write_stdin".to_string(), + description: + "Writes characters to an existing unified exec session and returns recent output." + .to_string(), + strict: false, + parameters: JsonSchema::Object { + properties, + required: Some(vec!["session_id".to_string()]), additional_properties: Some(false.into()), }, }) @@ -839,19 +890,20 @@ pub(crate) fn build_specs( || matches!(config.shell_type, ConfigShellToolType::Streamable); if use_unified_exec { - builder.push_spec(create_unified_exec_tool()); - builder.register_handler("unified_exec", unified_exec_handler); - } else { - match &config.shell_type { - ConfigShellToolType::Default => { - builder.push_spec(create_shell_tool()); - } - ConfigShellToolType::Local => { - builder.push_spec(ToolSpec::LocalShell {}); - } - ConfigShellToolType::Streamable => { - // Already handled by use_unified_exec. - } + builder.push_spec(create_exec_command_tool()); + builder.push_spec(create_write_stdin_tool()); + builder.register_handler("exec_command", unified_exec_handler.clone()); + builder.register_handler("write_stdin", unified_exec_handler); + } + match &config.shell_type { + ConfigShellToolType::Default => { + builder.push_spec(create_shell_tool()); + } + ConfigShellToolType::Local => { + builder.push_spec(ToolSpec::LocalShell {}); + } + ConfigShellToolType::Streamable => { + // Already handled by use_unified_exec. } } @@ -986,6 +1038,14 @@ mod tests { } } + fn shell_tool_name(config: &ToolsConfig) -> Option<&'static str> { + match config.shell_type { + ConfigShellToolType::Default => Some("shell"), + ConfigShellToolType::Local => Some("local_shell"), + ConfigShellToolType::Streamable => None, + } + } + fn find_tool<'a>( tools: &'a [ConfiguredToolSpec], expected_name: &str, @@ -1009,18 +1069,20 @@ mod tests { }); let (tools, _) = build_specs(&config, Some(HashMap::new())).build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "web_search", - "view_image", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + } + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "web_search", + "view_image", + ]); + + assert_eq_tool_names(&tools, &expected); } #[test] @@ -1035,18 +1097,20 @@ mod tests { }); let (tools, _) = build_specs(&config, Some(HashMap::new())).build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "web_search", - "view_image", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + } + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "web_search", + "view_image", + ]); + + assert_eq_tool_names(&tools, &expected); } #[test] @@ -1063,7 +1127,8 @@ mod tests { }); let (tools, _) = build_specs(&config, None).build(); - assert!(!find_tool(&tools, "unified_exec").supports_parallel_tool_calls); + assert!(!find_tool(&tools, "exec_command").supports_parallel_tool_calls); + assert!(!find_tool(&tools, "write_stdin").supports_parallel_tool_calls); assert!(find_tool(&tools, "grep_files").supports_parallel_tool_calls); assert!(find_tool(&tools, "list_dir").supports_parallel_tool_calls); assert!(find_tool(&tools, "read_file").supports_parallel_tool_calls); @@ -1148,19 +1213,21 @@ mod tests { ) .build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "web_search", - "view_image", - "test_server/do_something_cool", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + } + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "web_search", + "view_image", + "test_server/do_something_cool", + ]); + + assert_eq_tool_names(&tools, &expected); let tool = find_tool(&tools, "test_server/do_something_cool"); assert_eq!( @@ -1267,21 +1334,23 @@ mod tests { ]); let (tools, _) = build_specs(&config, Some(tools_map)).build(); - // Expect unified_exec first, followed by MCP tools sorted by fully-qualified name. - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "view_image", - "test_server/cool", - "test_server/do", - "test_server/something", - ], - ); + // Expect exec_command/write_stdin first, followed by MCP tools sorted by fully-qualified name. + let mut expected = vec!["exec_command", "write_stdin"]; + if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + } + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "view_image", + "test_server/cool", + "test_server/do", + "test_server/something", + ]); + + assert_eq_tool_names(&tools, &expected); } #[test] @@ -1320,23 +1389,28 @@ mod tests { ) .build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "apply_patch", - "web_search", - "view_image", - "dash/search", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + let has_shell = if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + true + } else { + false + }; + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "apply_patch", + "web_search", + "view_image", + "dash/search", + ]); + + assert_eq_tool_names(&tools, &expected); assert_eq!( - tools[8].spec, + tools[if has_shell { 10 } else { 9 }].spec, ToolSpec::Function(ResponsesApiTool { name: "dash/search".to_string(), parameters: JsonSchema::Object { @@ -1389,22 +1463,27 @@ mod tests { ) .build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "apply_patch", - "web_search", - "view_image", - "dash/paginate", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + let has_shell = if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + true + } else { + false + }; + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "apply_patch", + "web_search", + "view_image", + "dash/paginate", + ]); + + assert_eq_tool_names(&tools, &expected); assert_eq!( - tools[8].spec, + tools[if has_shell { 10 } else { 9 }].spec, ToolSpec::Function(ResponsesApiTool { name: "dash/paginate".to_string(), parameters: JsonSchema::Object { @@ -1456,22 +1535,26 @@ mod tests { ) .build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "apply_patch", - "web_search", - "view_image", - "dash/tags", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + let has_shell = if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + true + } else { + false + }; + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "apply_patch", + "web_search", + "view_image", + "dash/tags", + ]); + assert_eq_tool_names(&tools, &expected); assert_eq!( - tools[8].spec, + tools[if has_shell { 10 } else { 9 }].spec, ToolSpec::Function(ResponsesApiTool { name: "dash/tags".to_string(), parameters: JsonSchema::Object { @@ -1525,22 +1608,26 @@ mod tests { ) .build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "apply_patch", - "web_search", - "view_image", - "dash/value", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + let has_shell = if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + true + } else { + false + }; + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "apply_patch", + "web_search", + "view_image", + "dash/value", + ]); + assert_eq_tool_names(&tools, &expected); assert_eq!( - tools[8].spec, + tools[if has_shell { 10 } else { 9 }].spec, ToolSpec::Function(ResponsesApiTool { name: "dash/value".to_string(), parameters: JsonSchema::Object { @@ -1631,23 +1718,28 @@ mod tests { ) .build(); - assert_eq_tool_names( - &tools, - &[ - "unified_exec", - "list_mcp_resources", - "list_mcp_resource_templates", - "read_mcp_resource", - "update_plan", - "apply_patch", - "web_search", - "view_image", - "test_server/do_something_cool", - ], - ); + let mut expected = vec!["exec_command", "write_stdin"]; + let has_shell = if let Some(shell_tool) = shell_tool_name(&config) { + expected.push(shell_tool); + true + } else { + false + }; + expected.extend([ + "list_mcp_resources", + "list_mcp_resource_templates", + "read_mcp_resource", + "update_plan", + "apply_patch", + "web_search", + "view_image", + "test_server/do_something_cool", + ]); + + assert_eq_tool_names(&tools, &expected); assert_eq!( - tools[8].spec, + tools[if has_shell { 10 } else { 9 }].spec, ToolSpec::Function(ResponsesApiTool { name: "test_server/do_something_cool".to_string(), parameters: JsonSchema::Object { diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index 7118e20e..d575929d 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -23,7 +23,10 @@ use std::collections::HashMap; use std::sync::atomic::AtomicI32; +use std::time::Duration; +use rand::Rng; +use rand::rng; use tokio::sync::Mutex; use crate::codex::Session; @@ -36,27 +39,43 @@ mod session_manager; pub(crate) use errors::UnifiedExecError; pub(crate) use session::UnifiedExecSession; -const DEFAULT_TIMEOUT_MS: u64 = 1_000; -const MAX_TIMEOUT_MS: u64 = 60_000; -const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 128 * 1024; // 128 KiB +pub(crate) const DEFAULT_YIELD_TIME_MS: u64 = 10_000; +pub(crate) const MIN_YIELD_TIME_MS: u64 = 250; +pub(crate) const MAX_YIELD_TIME_MS: u64 = 30_000; +pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000; +pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB pub(crate) struct UnifiedExecContext<'a> { pub session: &'a Session, pub turn: &'a TurnContext, pub call_id: &'a str, - pub session_id: Option, } #[derive(Debug)] -pub(crate) struct UnifiedExecRequest<'a> { - pub input_chunks: &'a [String], - pub timeout_ms: Option, +pub(crate) struct ExecCommandRequest<'a> { + pub command: &'a str, + pub shell: &'a str, + pub login: bool, + pub yield_time_ms: Option, + pub max_output_tokens: Option, +} + +#[derive(Debug)] +pub(crate) struct WriteStdinRequest<'a> { + pub session_id: i32, + pub input: &'a str, + pub yield_time_ms: Option, + pub max_output_tokens: Option, } #[derive(Debug, Clone, PartialEq)] -pub(crate) struct UnifiedExecResult { - pub session_id: Option, +pub(crate) struct UnifiedExecResponse { + pub chunk_id: String, + pub wall_time: Duration, pub output: String, + pub session_id: Option, + pub exit_code: Option, + pub original_token_count: Option, } #[derive(Debug, Default)] @@ -65,16 +84,66 @@ pub(crate) struct UnifiedExecSessionManager { sessions: Mutex>, } +pub(crate) fn clamp_yield_time(yield_time_ms: Option) -> u64 { + match yield_time_ms { + Some(value) => value.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS), + None => DEFAULT_YIELD_TIME_MS, + } +} + +pub(crate) fn resolve_max_tokens(max_tokens: Option) -> usize { + max_tokens.unwrap_or(DEFAULT_MAX_OUTPUT_TOKENS) +} + +pub(crate) fn generate_chunk_id() -> String { + let mut rng = rng(); + (0..6) + .map(|_| format!("{:x}", rng.random_range(0..16))) + .collect() +} + +pub(crate) fn truncate_output_to_tokens( + output: &str, + max_tokens: usize, +) -> (String, Option) { + if max_tokens == 0 { + let total_tokens = output.chars().count(); + let message = format!("…{total_tokens} tokens truncated…"); + return (message, Some(total_tokens)); + } + + let tokens: Vec = output.chars().collect(); + let total_tokens = tokens.len(); + if total_tokens <= max_tokens { + return (output.to_string(), None); + } + + let half = max_tokens / 2; + if half == 0 { + let truncated = total_tokens.saturating_sub(max_tokens); + let message = format!("…{truncated} tokens truncated…"); + return (message, Some(total_tokens)); + } + + let truncated = total_tokens.saturating_sub(half * 2); + let mut truncated_output = String::new(); + truncated_output.extend(&tokens[..half]); + truncated_output.push_str(&format!("…{truncated} tokens truncated…")); + truncated_output.extend(&tokens[total_tokens - half..]); + (truncated_output, Some(total_tokens)) +} + #[cfg(test)] #[cfg(unix)] mod tests { use super::*; - use crate::codex::Session; use crate::codex::TurnContext; use crate::codex::make_session_and_context; use crate::protocol::AskForApproval; use crate::protocol::SandboxPolicy; + use crate::unified_exec::ExecCommandRequest; + use crate::unified_exec::WriteStdinRequest; use core_test_support::skip_if_sandbox; use std::sync::Arc; use tokio::time::Duration; @@ -88,34 +157,52 @@ mod tests { (Arc::new(session), Arc::new(turn)) } - async fn run_unified_exec_request( + async fn exec_command( session: &Arc, turn: &Arc, - session_id: Option, - input: Vec, - timeout_ms: Option, - ) -> Result { - let request_input = input; - let request = UnifiedExecRequest { - input_chunks: &request_input, - timeout_ms, + cmd: &str, + yield_time_ms: Option, + ) -> Result { + let context = UnifiedExecContext { + session, + turn: turn.as_ref(), + call_id: "call", }; session .services .unified_exec_manager - .handle_request( - request, - UnifiedExecContext { - session, - turn: turn.as_ref(), - call_id: "call", - session_id, + .exec_command( + ExecCommandRequest { + command: cmd, + shell: "/bin/bash", + login: true, + yield_time_ms, + max_output_tokens: None, }, + &context, ) .await } + async fn write_stdin( + session: &Arc, + session_id: i32, + input: &str, + yield_time_ms: Option, + ) -> Result { + session + .services + .unified_exec_manager + .write_stdin(WriteStdinRequest { + session_id, + input, + yield_time_ms, + max_output_tokens: None, + }) + .await + } + #[test] fn push_chunk_trims_only_excess_bytes() { let mut buffer = OutputBufferState::default(); @@ -140,37 +227,28 @@ mod tests { let (session, turn) = test_session_and_turn(); - let open_shell = run_unified_exec_request( - &session, - &turn, - None, - vec!["bash".to_string(), "-i".to_string()], - Some(2_500), - ) - .await?; + let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?; let session_id = open_shell.session_id.expect("expected session_id"); - run_unified_exec_request( + write_stdin( &session, - &turn, - Some(session_id), - vec![ - "export".to_string(), - "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), - ], + session_id, + "export CODEX_INTERACTIVE_SHELL_VAR=codex\n", Some(2_500), ) .await?; - let out_2 = run_unified_exec_request( + let out_2 = write_stdin( &session, - &turn, - Some(session_id), - vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], + session_id, + "echo $CODEX_INTERACTIVE_SHELL_VAR\n", Some(2_500), ) .await?; - assert!(out_2.output.contains("codex")); + assert!( + out_2.output.contains("codex"), + "expected environment variable output" + ); Ok(()) } @@ -181,47 +259,44 @@ mod tests { let (session, turn) = test_session_and_turn(); - let shell_a = run_unified_exec_request( - &session, - &turn, - None, - vec!["/bin/bash".to_string(), "-i".to_string()], - Some(2_500), - ) - .await?; + let shell_a = exec_command(&session, &turn, "bash -i", Some(2_500)).await?; let session_a = shell_a.session_id.expect("expected session id"); - run_unified_exec_request( + write_stdin( &session, - &turn, - Some(session_a), - vec!["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()], + session_a, + "export CODEX_INTERACTIVE_SHELL_VAR=codex\n", Some(2_500), ) .await?; - let out_2 = run_unified_exec_request( + let out_2 = exec_command( &session, &turn, - None, - vec![ - "echo".to_string(), - "$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(), - ], + "echo $CODEX_INTERACTIVE_SHELL_VAR", Some(2_500), ) .await?; - assert!(!out_2.output.contains("codex")); + assert!( + out_2.session_id.is_none(), + "short command should not retain a session" + ); + assert!( + !out_2.output.contains("codex"), + "short command should run in a fresh shell" + ); - let out_3 = run_unified_exec_request( + let out_3 = write_stdin( &session, - &turn, - Some(session_a), - vec!["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], + session_a, + "echo $CODEX_INTERACTIVE_SHELL_VAR\n", Some(2_500), ) .await?; - assert!(out_3.output.contains("codex")); + assert!( + out_3.output.contains("codex"), + "session should preserve state" + ); Ok(()) } @@ -232,45 +307,37 @@ mod tests { let (session, turn) = test_session_and_turn(); - let open_shell = run_unified_exec_request( - &session, - &turn, - None, - vec!["bash".to_string(), "-i".to_string()], - Some(2_500), - ) - .await?; + let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?; let session_id = open_shell.session_id.expect("expected session id"); - run_unified_exec_request( + write_stdin( &session, - &turn, - Some(session_id), - vec![ - "export".to_string(), - "CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(), - ], + session_id, + "export CODEX_INTERACTIVE_SHELL_VAR=codex\n", Some(2_500), ) .await?; - let out_2 = run_unified_exec_request( + let out_2 = write_stdin( &session, - &turn, - Some(session_id), - vec!["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()], + session_id, + "sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n", Some(10), ) .await?; - assert!(!out_2.output.contains("codex")); + assert!( + !out_2.output.contains("codex"), + "timeout too short should yield incomplete output" + ); tokio::time::sleep(Duration::from_secs(7)).await; - let out_3 = - run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100)) - .await?; + let out_3 = write_stdin(&session, session_id, "", Some(100)).await?; - assert!(out_3.output.contains("codex")); + assert!( + out_3.output.contains("codex"), + "subsequent poll should retrieve output" + ); Ok(()) } @@ -280,18 +347,9 @@ mod tests { async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> { let (session, turn) = test_session_and_turn(); - let result = run_unified_exec_request( - &session, - &turn, - None, - vec!["echo".to_string(), "codex".to_string()], - Some(120_000), - ) - .await?; + let result = exec_command(&session, &turn, "echo codex", Some(120_000)).await?; - assert!(result.output.starts_with( - "Warning: requested timeout 120000ms exceeds maximum of 60000ms; clamping to 60000ms.\n" - )); + assert!(result.session_id.is_none()); assert!(result.output.contains("codex")); Ok(()) @@ -301,16 +359,12 @@ mod tests { #[ignore] // Ignored while we have a better way to test this. async fn completed_commands_do_not_persist_sessions() -> anyhow::Result<()> { let (session, turn) = test_session_and_turn(); - let result = run_unified_exec_request( - &session, - &turn, - None, - vec!["/bin/echo".to_string(), "codex".to_string()], - Some(2_500), - ) - .await?; + let result = exec_command(&session, &turn, "echo codex", Some(2_500)).await?; - assert!(result.session_id.is_none()); + assert!( + result.session_id.is_none(), + "completed command should not retain session" + ); assert!(result.output.contains("codex")); assert!( @@ -332,31 +386,16 @@ mod tests { let (session, turn) = test_session_and_turn(); - let open_shell = run_unified_exec_request( - &session, - &turn, - None, - vec!["/bin/bash".to_string(), "-i".to_string()], - Some(2_500), - ) - .await?; + let open_shell = exec_command(&session, &turn, "bash -i", Some(2_500)).await?; let session_id = open_shell.session_id.expect("expected session id"); - run_unified_exec_request( - &session, - &turn, - Some(session_id), - vec!["exit\n".to_string()], - Some(2_500), - ) - .await?; + write_stdin(&session, session_id, "exit\n", Some(2_500)).await?; tokio::time::sleep(Duration::from_millis(200)).await; - let err = - run_unified_exec_request(&session, &turn, Some(session_id), Vec::new(), Some(100)) - .await - .expect_err("expected unknown session error"); + let err = write_stdin(&session, session_id, "", Some(100)) + .await + .expect_err("expected unknown session error"); match err { UnifiedExecError::UnknownSessionId { session_id: err_id } => { diff --git a/codex-rs/core/src/unified_exec/session_manager.rs b/codex-rs/core/src/unified_exec/session_manager.rs index dc6e6004..77124a7d 100644 --- a/codex-rs/core/src/unified_exec/session_manager.rs +++ b/codex-rs/core/src/unified_exec/session_manager.rs @@ -7,84 +7,163 @@ use tokio::time::Instant; use crate::exec_env::create_env; use crate::sandboxing::ExecEnv; -use crate::tools::events::ToolEmitter; -use crate::tools::events::ToolEventCtx; -use crate::tools::events::ToolEventStage; use crate::tools::orchestrator::ToolOrchestrator; use crate::tools::runtimes::unified_exec::UnifiedExecRequest as UnifiedExecToolRequest; use crate::tools::runtimes::unified_exec::UnifiedExecRuntime; use crate::tools::sandboxing::ToolCtx; -use crate::truncate::truncate_middle; -use super::DEFAULT_TIMEOUT_MS; -use super::MAX_TIMEOUT_MS; -use super::UNIFIED_EXEC_OUTPUT_MAX_BYTES; +use super::ExecCommandRequest; +use super::MIN_YIELD_TIME_MS; use super::UnifiedExecContext; use super::UnifiedExecError; -use super::UnifiedExecRequest; -use super::UnifiedExecResult; +use super::UnifiedExecResponse; use super::UnifiedExecSessionManager; +use super::WriteStdinRequest; +use super::clamp_yield_time; +use super::generate_chunk_id; +use super::resolve_max_tokens; use super::session::OutputBuffer; use super::session::UnifiedExecSession; - -pub(super) struct SessionAcquisition { - pub(super) session_id: i32, - pub(super) writer_tx: mpsc::Sender>, - pub(super) output_buffer: OutputBuffer, - pub(super) output_notify: Arc, - pub(super) new_session: Option, - pub(super) reuse_requested: bool, -} +use super::truncate_output_to_tokens; impl UnifiedExecSessionManager { - pub(super) async fn acquire_session( + pub(crate) async fn exec_command( &self, - request: &UnifiedExecRequest<'_>, + request: ExecCommandRequest<'_>, context: &UnifiedExecContext<'_>, - ) -> Result { - if let Some(existing_id) = context.session_id { - let mut sessions = self.sessions.lock().await; - match sessions.get(&existing_id) { - Some(session) => { - if session.has_exited() { - sessions.remove(&existing_id); - return Err(UnifiedExecError::UnknownSessionId { - session_id: existing_id, - }); - } - let (buffer, notify) = session.output_handles(); - let writer_tx = session.writer_sender(); - Ok(SessionAcquisition { - session_id: existing_id, - writer_tx, - output_buffer: buffer, - output_notify: notify, - new_session: None, - reuse_requested: true, - }) - } - None => Err(UnifiedExecError::UnknownSessionId { - session_id: existing_id, - }), - } + ) -> Result { + let shell_flag = if request.login { "-lc" } else { "-c" }; + let command = vec![ + request.shell.to_string(), + shell_flag.to_string(), + request.command.to_string(), + ]; + + let session = self.open_session_with_sandbox(command, context).await?; + + let max_tokens = resolve_max_tokens(request.max_output_tokens); + let yield_time_ms = + clamp_yield_time(Some(request.yield_time_ms.unwrap_or(MIN_YIELD_TIME_MS))); + + let start = Instant::now(); + let (output_buffer, output_notify) = session.output_handles(); + let deadline = start + Duration::from_millis(yield_time_ms); + let collected = + Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await; + let wall_time = Instant::now().saturating_duration_since(start); + + let text = String::from_utf8_lossy(&collected).to_string(); + let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens); + let chunk_id = generate_chunk_id(); + let exit_code = session.exit_code(); + let session_id = if session.has_exited() { + None } else { - let new_id = self - .next_session_id - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let managed_session = self - .open_session_with_sandbox(request.input_chunks.to_vec(), context) - .await?; - let (buffer, notify) = managed_session.output_handles(); - let writer_tx = managed_session.writer_sender(); - Ok(SessionAcquisition { - session_id: new_id, - writer_tx, - output_buffer: buffer, - output_notify: notify, - new_session: Some(managed_session), - reuse_requested: false, - }) + Some(self.store_session(session).await) + }; + + Ok(UnifiedExecResponse { + chunk_id, + wall_time, + output, + session_id, + exit_code, + original_token_count, + }) + } + + pub(crate) async fn write_stdin( + &self, + request: WriteStdinRequest<'_>, + ) -> Result { + let session_id = request.session_id; + + let (writer_tx, output_buffer, output_notify) = + self.prepare_session_handles(session_id).await?; + + if !request.input.is_empty() { + Self::send_input(&writer_tx, request.input.as_bytes()).await?; + tokio::time::sleep(Duration::from_millis(100)).await; } + + let max_tokens = resolve_max_tokens(request.max_output_tokens); + let yield_time_ms = clamp_yield_time(request.yield_time_ms); + let start = Instant::now(); + let deadline = start + Duration::from_millis(yield_time_ms); + let collected = + Self::collect_output_until_deadline(&output_buffer, &output_notify, deadline).await; + let wall_time = Instant::now().saturating_duration_since(start); + + let text = String::from_utf8_lossy(&collected).to_string(); + let (output, original_token_count) = truncate_output_to_tokens(&text, max_tokens); + let chunk_id = generate_chunk_id(); + + let (session_id, exit_code) = self.refresh_session_state(session_id).await; + + Ok(UnifiedExecResponse { + chunk_id, + wall_time, + output, + session_id, + exit_code, + original_token_count, + }) + } + + async fn refresh_session_state(&self, session_id: i32) -> (Option, Option) { + let mut sessions = self.sessions.lock().await; + if !sessions.contains_key(&session_id) { + return (None, None); + } + + let has_exited = sessions + .get(&session_id) + .map(UnifiedExecSession::has_exited) + .unwrap_or(false); + let exit_code = sessions + .get(&session_id) + .and_then(UnifiedExecSession::exit_code); + + if has_exited { + sessions.remove(&session_id); + (None, exit_code) + } else { + (Some(session_id), exit_code) + } + } + + async fn prepare_session_handles( + &self, + session_id: i32, + ) -> Result<(mpsc::Sender>, OutputBuffer, Arc), UnifiedExecError> { + let sessions = self.sessions.lock().await; + let (output_buffer, output_notify, writer_tx) = + if let Some(session) = sessions.get(&session_id) { + let (buffer, notify) = session.output_handles(); + (buffer, notify, session.writer_sender()) + } else { + return Err(UnifiedExecError::UnknownSessionId { session_id }); + }; + + Ok((writer_tx, output_buffer, output_notify)) + } + + async fn send_input( + writer_tx: &mpsc::Sender>, + data: &[u8], + ) -> Result<(), UnifiedExecError> { + writer_tx + .send(data.to_vec()) + .await + .map_err(|_| UnifiedExecError::WriteToStdin) + } + + async fn store_session(&self, session: UnifiedExecSession) -> i32 { + let session_id = self + .next_session_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.sessions.lock().await.insert(session_id, session); + session_id } pub(crate) async fn open_session_with_exec_env( @@ -118,7 +197,7 @@ impl UnifiedExecSessionManager { session: context.session, turn: context.turn, call_id: context.call_id.to_string(), - tool_name: "unified_exec".to_string(), + tool_name: "exec_command".to_string(), }; orchestrator .run( @@ -175,124 +254,4 @@ impl UnifiedExecSessionManager { collected } - - pub(super) async fn should_store_session(&self, acquisition: &SessionAcquisition) -> bool { - if let Some(session) = acquisition.new_session.as_ref() { - !session.has_exited() - } else if acquisition.reuse_requested { - let mut sessions = self.sessions.lock().await; - if let Some(existing) = sessions.get(&acquisition.session_id) { - if existing.has_exited() { - sessions.remove(&acquisition.session_id); - false - } else { - true - } - } else { - false - } - } else { - true - } - } - - pub(super) async fn send_input_chunks( - writer_tx: &mpsc::Sender>, - chunks: &[String], - ) -> Result<(), UnifiedExecError> { - let mut trailing_whitespace = true; - for chunk in chunks { - if chunk.is_empty() { - continue; - } - - let leading_whitespace = chunk - .chars() - .next() - .map(char::is_whitespace) - .unwrap_or(true); - - if !trailing_whitespace - && !leading_whitespace - && writer_tx.send(vec![b' ']).await.is_err() - { - return Err(UnifiedExecError::WriteToStdin); - } - - if writer_tx.send(chunk.as_bytes().to_vec()).await.is_err() { - return Err(UnifiedExecError::WriteToStdin); - } - - trailing_whitespace = chunk - .chars() - .next_back() - .map(char::is_whitespace) - .unwrap_or(trailing_whitespace); - } - - Ok(()) - } - - pub async fn handle_request( - &self, - request: UnifiedExecRequest<'_>, - context: UnifiedExecContext<'_>, - ) -> Result { - let (timeout_ms, timeout_warning) = match request.timeout_ms { - Some(requested) if requested > MAX_TIMEOUT_MS => ( - MAX_TIMEOUT_MS, - Some(format!( - "Warning: requested timeout {requested}ms exceeds maximum of {MAX_TIMEOUT_MS}ms; clamping to {MAX_TIMEOUT_MS}ms.\n" - )), - ), - Some(requested) => (requested, None), - None => (DEFAULT_TIMEOUT_MS, None), - }; - - if !request.input_chunks.is_empty() { - let event_ctx = ToolEventCtx::new(context.session, context.turn, context.call_id, None); - let emitter = - ToolEmitter::shell(request.input_chunks.to_vec(), context.turn.cwd.clone()); - emitter.emit(event_ctx, ToolEventStage::Begin).await; - } - - let mut acquisition = self.acquire_session(&request, &context).await?; - - if acquisition.reuse_requested { - Self::send_input_chunks(&acquisition.writer_tx, request.input_chunks).await?; - } - - let deadline = Instant::now() + Duration::from_millis(timeout_ms); - let collected = Self::collect_output_until_deadline( - &acquisition.output_buffer, - &acquisition.output_notify, - deadline, - ) - .await; - - let (output, _maybe_tokens) = truncate_middle( - &String::from_utf8_lossy(&collected), - UNIFIED_EXEC_OUTPUT_MAX_BYTES, - ); - let output = if let Some(warning) = timeout_warning { - format!("{warning}{output}") - } else { - output - }; - - let should_store_session = self.should_store_session(&acquisition).await; - let session_id = if should_store_session { - if let Some(session) = acquisition.new_session.take() { - self.sessions - .lock() - .await - .insert(acquisition.session_id, session); - } - Some(acquisition.session_id) - } else { - None - }; - - Ok(UnifiedExecResult { session_id, output }) - } } diff --git a/codex-rs/core/tests/suite/tools.rs b/codex-rs/core/tests/suite/tools.rs index 46dd0ba8..e0546fa8 100644 --- a/codex-rs/core/tests/suite/tools.rs +++ b/codex-rs/core/tests/suite/tools.rs @@ -320,14 +320,22 @@ async fn unified_exec_spec_toggle_end_to_end() -> Result<()> { let tools_disabled = collect_tools(false).await?; assert!( - !tools_disabled.iter().any(|name| name == "unified_exec"), - "tools list should not include unified_exec when disabled: {tools_disabled:?}" + !tools_disabled.iter().any(|name| name == "exec_command"), + "tools list should not include exec_command when disabled: {tools_disabled:?}" + ); + assert!( + !tools_disabled.iter().any(|name| name == "write_stdin"), + "tools list should not include write_stdin when disabled: {tools_disabled:?}" ); let tools_enabled = collect_tools(true).await?; assert!( - tools_enabled.iter().any(|name| name == "unified_exec"), - "tools list should include unified_exec when enabled: {tools_enabled:?}" + tools_enabled.iter().any(|name| name == "exec_command"), + "tools list should include exec_command when enabled: {tools_enabled:?}" + ); + assert!( + tools_enabled.iter().any(|name| name == "write_stdin"), + "tools list should include write_stdin when enabled: {tools_enabled:?}" ); Ok(()) diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index a53cab8f..710eb0bc 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -4,7 +4,6 @@ use std::collections::HashMap; use anyhow::Result; use codex_core::features::Feature; -use codex_core::parse_command::parse_command; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; @@ -81,16 +80,15 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> { } = builder.build(&server).await?; let call_id = "uexec-begin-event"; - let command = vec!["/bin/echo".to_string(), "hello unified exec".to_string()]; let args = json!({ - "input": command.clone(), - "timeout_ms": 250, + "cmd": "/bin/echo hello unified exec".to_string(), + "yield_time_ms": 250, }); let responses = vec![ sse(vec![ ev_response_created("resp-1"), - ev_function_call(call_id, "unified_exec", &serde_json::to_string(&args)?), + ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?), ev_completed("resp-1"), ]), sse(vec![ @@ -124,9 +122,11 @@ async fn unified_exec_emits_exec_command_begin_event() -> Result<()> { }) .await; - assert_eq!(begin_event.command, command); + assert_eq!( + begin_event.command, + vec!["/bin/echo hello unified exec".to_string()] + ); assert_eq!(begin_event.cwd, cwd.path()); - assert_eq!(begin_event.parsed_cmd, parse_command(&command)); wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; @@ -154,14 +154,9 @@ async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> { } = builder.build(&server).await?; let open_call_id = "uexec-open-session"; - let open_command = vec![ - "/bin/sh".to_string(), - "-c".to_string(), - "echo ready".to_string(), - ]; let open_args = json!({ - "input": open_command.clone(), - "timeout_ms": 200, + "cmd": "/bin/sh -c echo ready".to_string(), + "yield_time_ms": 250, }); let poll_call_id = "uexec-poll-empty"; @@ -176,7 +171,7 @@ async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> { ev_response_created("resp-1"), ev_function_call( open_call_id, - "unified_exec", + "exec_command", &serde_json::to_string(&open_args)?, ), ev_completed("resp-1"), @@ -185,7 +180,7 @@ async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> { ev_response_created("resp-2"), ev_function_call( poll_call_id, - "unified_exec", + "write_stdin", &serde_json::to_string(&poll_args)?, ), ev_completed("resp-2"), @@ -231,7 +226,292 @@ async fn unified_exec_skips_begin_event_for_empty_input() -> Result<()> { "expected only the initial command to emit begin event" ); assert_eq!(begin_events[0].call_id, open_call_id); - assert_eq!(begin_events[0].command, open_command); + assert_eq!(begin_events[0].command[0], "/bin/sh -c echo ready"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let call_id = "uexec-metadata"; + let args = serde_json::json!({ + "cmd": "printf 'abcdefghijklmnopqrstuvwxyz'", + "yield_time_ms": 500, + "max_output_tokens": 6, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "exec_command", &serde_json::to_string(&args)?), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "run metadata test".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + let requests = server.received_requests().await.expect("recorded requests"); + assert!(!requests.is_empty(), "expected at least one POST request"); + + let bodies = requests + .iter() + .map(|req| req.body_json::().expect("request json")) + .collect::>(); + + let outputs = collect_tool_outputs(&bodies)?; + let metadata = outputs + .get(call_id) + .expect("missing exec_command metadata output"); + + let chunk_id = metadata + .get("chunk_id") + .and_then(Value::as_str) + .expect("missing chunk_id"); + assert_eq!(chunk_id.len(), 6, "chunk id should be 6 hex characters"); + assert!( + chunk_id.chars().all(|c| c.is_ascii_hexdigit()), + "chunk id should be hexadecimal: {chunk_id}" + ); + + let wall_time = metadata + .get("wall_time_seconds") + .and_then(Value::as_f64) + .unwrap_or_default(); + assert!( + wall_time >= 0.0, + "wall_time_seconds should be non-negative, got {wall_time}" + ); + + assert!( + metadata.get("session_id").is_none(), + "exec_command for a completed process should not include session_id" + ); + + let exit_code = metadata + .get("exit_code") + .and_then(Value::as_i64) + .expect("expected exit_code"); + assert_eq!(exit_code, 0, "expected successful exit"); + + let output_text = metadata + .get("output") + .and_then(Value::as_str) + .expect("missing output text"); + assert!( + output_text.contains("tokens truncated"), + "expected truncation notice in output: {output_text:?}" + ); + + let original_tokens = metadata + .get("original_token_count") + .and_then(Value::as_u64) + .expect("missing original_token_count"); + assert!( + original_tokens as usize > 6, + "original token count should exceed max_output_tokens" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> { + skip_if_no_network!(Ok(())); + skip_if_sandbox!(Ok(())); + + let server = start_mock_server().await; + + let mut builder = test_codex().with_config(|config| { + config.features.enable(Feature::UnifiedExec); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let start_call_id = "uexec-cat-start"; + let send_call_id = "uexec-cat-send"; + let exit_call_id = "uexec-cat-exit"; + + let start_args = serde_json::json!({ + "cmd": "/bin/cat", + "yield_time_ms": 500, + }); + let send_args = serde_json::json!({ + "chars": "hello unified exec\n", + "session_id": 0, + "yield_time_ms": 500, + }); + let exit_args = serde_json::json!({ + "chars": "\u{0004}", + "session_id": 0, + "yield_time_ms": 500, + }); + + let responses = vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + start_call_id, + "exec_command", + &serde_json::to_string(&start_args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_function_call( + send_call_id, + "write_stdin", + &serde_json::to_string(&send_args)?, + ), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_function_call( + exit_call_id, + "write_stdin", + &serde_json::to_string(&exit_args)?, + ), + ev_completed("resp-3"), + ]), + sse(vec![ + ev_assistant_message("msg-1", "all done"), + ev_completed("resp-4"), + ]), + ]; + mount_sse_sequence(&server, responses).await; + + let session_model = session_configured.model.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "test write_stdin exit behavior".into(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await?; + + wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await; + + let requests = server.received_requests().await.expect("recorded requests"); + assert!(!requests.is_empty(), "expected at least one POST request"); + + let bodies = requests + .iter() + .map(|req| req.body_json::().expect("request json")) + .collect::>(); + + let outputs = collect_tool_outputs(&bodies)?; + + let start_output = outputs + .get(start_call_id) + .expect("missing start output for exec_command"); + let session_id = start_output + .get("session_id") + .and_then(Value::as_i64) + .expect("expected session id from exec_command"); + assert!( + session_id >= 0, + "session_id should be non-negative, got {session_id}" + ); + assert!( + start_output.get("exit_code").is_none(), + "initial exec_command should not include exit_code while session is running" + ); + + let send_output = outputs + .get(send_call_id) + .expect("missing write_stdin echo output"); + let echoed = send_output + .get("output") + .and_then(Value::as_str) + .unwrap_or_default(); + assert!( + echoed.contains("hello unified exec"), + "expected echoed output from cat, got {echoed:?}" + ); + let echoed_session = send_output + .get("session_id") + .and_then(Value::as_i64) + .expect("write_stdin should return session id while process is running"); + assert_eq!( + echoed_session, session_id, + "write_stdin should reuse existing session id" + ); + assert!( + send_output.get("exit_code").is_none(), + "write_stdin should not include exit_code while process is running" + ); + + let exit_output = outputs + .get(exit_call_id) + .expect("missing exit metadata output"); + assert!( + exit_output.get("session_id").is_none(), + "session_id should be omitted once the process exits" + ); + let exit_code = exit_output + .get("exit_code") + .and_then(Value::as_i64) + .expect("expected exit_code after sending EOF"); + assert_eq!(exit_code, 0, "cat should exit cleanly after EOF"); + + let exit_chunk = exit_output + .get("chunk_id") + .and_then(Value::as_str) + .expect("missing chunk id for exit output"); + assert!( + exit_chunk.chars().all(|c| c.is_ascii_hexdigit()), + "chunk id should be hexadecimal: {exit_chunk}" + ); Ok(()) } @@ -255,15 +535,15 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { let first_call_id = "uexec-start"; let first_args = serde_json::json!({ - "input": ["/bin/cat"], - "timeout_ms": 200, + "cmd": "/bin/cat", + "yield_time_ms": 200, }); let second_call_id = "uexec-stdin"; let second_args = serde_json::json!({ - "input": ["hello unified exec\n"], - "session_id": "0", - "timeout_ms": 500, + "chars": "hello unified exec\n", + "session_id": 0, + "yield_time_ms": 500, }); let responses = vec![ @@ -271,7 +551,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { ev_response_created("resp-1"), ev_function_call( first_call_id, - "unified_exec", + "exec_command", &serde_json::to_string(&first_args)?, ), ev_completed("resp-1"), @@ -280,7 +560,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { ev_response_created("resp-2"), ev_function_call( second_call_id, - "unified_exec", + "write_stdin", &serde_json::to_string(&second_args)?, ), ev_completed("resp-2"), @@ -324,9 +604,9 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { let start_output = outputs .get(first_call_id) .expect("missing first unified_exec output"); - let session_id = start_output["session_id"].as_str().unwrap_or_default(); + let session_id = start_output["session_id"].as_i64().unwrap_or_default(); assert!( - !session_id.is_empty(), + session_id >= 0, "expected session id in first unified_exec response" ); assert!( @@ -340,7 +620,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { .get(second_call_id) .expect("missing reused unified_exec output"); assert_eq!( - reuse_output["session_id"].as_str().unwrap_or_default(), + reuse_output["session_id"].as_i64().unwrap_or_default(), session_id ); let echoed = reuse_output["output"].as_str().unwrap_or_default(); @@ -391,15 +671,15 @@ PY let first_call_id = "uexec-lag-start"; let first_args = serde_json::json!({ - "input": ["/bin/sh", "-c", script], - "timeout_ms": 25, + "cmd": script, + "yield_time_ms": 25, }); let second_call_id = "uexec-lag-poll"; let second_args = serde_json::json!({ - "input": Vec::::new(), - "session_id": "0", - "timeout_ms": 2_000, + "chars": "", + "session_id": 0, + "yield_time_ms": 2_000, }); let responses = vec![ @@ -407,7 +687,7 @@ PY ev_response_created("resp-1"), ev_function_call( first_call_id, - "unified_exec", + "exec_command", &serde_json::to_string(&first_args)?, ), ev_completed("resp-1"), @@ -416,7 +696,7 @@ PY ev_response_created("resp-2"), ev_function_call( second_call_id, - "unified_exec", + "write_stdin", &serde_json::to_string(&second_args)?, ), ev_completed("resp-2"), @@ -460,9 +740,9 @@ PY let start_output = outputs .get(first_call_id) .expect("missing initial unified_exec output"); - let session_id = start_output["session_id"].as_str().unwrap_or_default(); + let session_id = start_output["session_id"].as_i64().unwrap_or_default(); assert!( - !session_id.is_empty(), + session_id >= 0, "expected session id from initial unified_exec response" ); @@ -497,15 +777,15 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { let first_call_id = "uexec-timeout"; let first_args = serde_json::json!({ - "input": ["/bin/sh", "-c", "sleep 0.1; echo ready"], - "timeout_ms": 10, + "cmd": "sleep 0.5; echo ready", + "yield_time_ms": 10, }); let second_call_id = "uexec-poll"; let second_args = serde_json::json!({ - "input": Vec::::new(), - "session_id": "0", - "timeout_ms": 800, + "chars": "", + "session_id": 0, + "yield_time_ms": 800, }); let responses = vec![ @@ -513,7 +793,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { ev_response_created("resp-1"), ev_function_call( first_call_id, - "unified_exec", + "exec_command", &serde_json::to_string(&first_args)?, ), ev_completed("resp-1"), @@ -522,7 +802,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { ev_response_created("resp-2"), ev_function_call( second_call_id, - "unified_exec", + "write_stdin", &serde_json::to_string(&second_args)?, ), ev_completed("resp-2"), @@ -569,7 +849,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { let outputs = collect_tool_outputs(&bodies)?; let first_output = outputs.get(first_call_id).expect("missing timeout output"); - assert_eq!(first_output["session_id"], "0"); + assert_eq!(first_output["session_id"], 0); assert!( first_output["output"] .as_str()