This adds `parsed_cmd: Vec<ParsedCommand>` to `ExecApprovalRequestEvent` in the core protocol (`protocol/src/protocol.rs`), which is also what this field is named on `ExecCommandBeginEvent`. Honestly, I don't love the name (it sounds like a single command, but it is actually a list of them), but I don't want to get distracted by a naming discussion right now. This also adds `parsed_cmd` to `ExecCommandApprovalParams` in `codex-rs/app-server-protocol/src/protocol.rs`, so it will be available via `codex app-server`, as well. For consistency, I also updated `ExecApprovalElicitRequestParams` in `codex-rs/mcp-server/src/exec_approval.rs` to include this field under the name `codex_parsed_cmd`, as that struct already has a number of special `codex_*` fields. Note this is the code for when Codex is used as an MCP _server_ and therefore has to conform to the official spec for an MCP elicitation type.
315 lines
12 KiB
Rust
315 lines
12 KiB
Rust
//! Asynchronous worker that executes a **Codex** tool-call inside a spawned
|
|
//! Tokio task. Separated from `message_processor.rs` to keep that file small
|
|
//! and to make future feature-growth easier to manage.
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
|
|
use crate::exec_approval::handle_exec_approval_request;
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
|
use crate::outgoing_message::OutgoingNotificationMeta;
|
|
use crate::patch_approval::handle_patch_approval_request;
|
|
use codex_core::CodexConversation;
|
|
use codex_core::ConversationManager;
|
|
use codex_core::NewConversation;
|
|
use codex_core::config::Config as CodexConfig;
|
|
use codex_core::protocol::AgentMessageEvent;
|
|
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
|
use codex_core::protocol::Event;
|
|
use codex_core::protocol::EventMsg;
|
|
use codex_core::protocol::ExecApprovalRequestEvent;
|
|
use codex_core::protocol::InputItem;
|
|
use codex_core::protocol::Op;
|
|
use codex_core::protocol::Submission;
|
|
use codex_core::protocol::TaskCompleteEvent;
|
|
use codex_protocol::ConversationId;
|
|
use mcp_types::CallToolResult;
|
|
use mcp_types::ContentBlock;
|
|
use mcp_types::RequestId;
|
|
use mcp_types::TextContent;
|
|
use serde_json::json;
|
|
use tokio::sync::Mutex;
|
|
|
|
pub(crate) const INVALID_PARAMS_ERROR_CODE: i64 = -32602;
|
|
|
|
/// Run a complete Codex session and stream events back to the client.
|
|
///
|
|
/// On completion (success or error) the function sends the appropriate
|
|
/// `tools/call` response so the LLM can continue the conversation.
|
|
pub async fn run_codex_tool_session(
|
|
id: RequestId,
|
|
initial_prompt: String,
|
|
config: CodexConfig,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
conversation_manager: Arc<ConversationManager>,
|
|
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ConversationId>>>,
|
|
) {
|
|
let NewConversation {
|
|
conversation_id,
|
|
conversation,
|
|
session_configured,
|
|
} = match conversation_manager.new_conversation(config).await {
|
|
Ok(res) => res,
|
|
Err(e) => {
|
|
let result = CallToolResult {
|
|
content: vec![ContentBlock::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: format!("Failed to start Codex session: {e}"),
|
|
annotations: None,
|
|
})],
|
|
is_error: Some(true),
|
|
structured_content: None,
|
|
};
|
|
outgoing.send_response(id.clone(), result).await;
|
|
return;
|
|
}
|
|
};
|
|
|
|
let session_configured_event = Event {
|
|
// Use a fake id value for now.
|
|
id: "".to_string(),
|
|
msg: EventMsg::SessionConfigured(session_configured.clone()),
|
|
};
|
|
outgoing
|
|
.send_event_as_notification(
|
|
&session_configured_event,
|
|
Some(OutgoingNotificationMeta::new(Some(id.clone()))),
|
|
)
|
|
.await;
|
|
|
|
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
|
|
// any events emitted for this tool-call can be correlated with the
|
|
// originating `tools/call` request.
|
|
let sub_id = match &id {
|
|
RequestId::String(s) => s.clone(),
|
|
RequestId::Integer(n) => n.to_string(),
|
|
};
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.insert(id.clone(), conversation_id);
|
|
let submission = Submission {
|
|
id: sub_id.clone(),
|
|
op: Op::UserInput {
|
|
items: vec![InputItem::Text {
|
|
text: initial_prompt.clone(),
|
|
}],
|
|
},
|
|
};
|
|
|
|
if let Err(e) = conversation.submit_with_id(submission).await {
|
|
tracing::error!("Failed to submit initial prompt: {e}");
|
|
// unregister the id so we don't keep it in the map
|
|
running_requests_id_to_codex_uuid.lock().await.remove(&id);
|
|
return;
|
|
}
|
|
|
|
run_codex_tool_session_inner(
|
|
conversation,
|
|
outgoing,
|
|
id,
|
|
running_requests_id_to_codex_uuid,
|
|
)
|
|
.await;
|
|
}
|
|
|
|
pub async fn run_codex_tool_session_reply(
|
|
conversation: Arc<CodexConversation>,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
request_id: RequestId,
|
|
prompt: String,
|
|
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ConversationId>>>,
|
|
conversation_id: ConversationId,
|
|
) {
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.insert(request_id.clone(), conversation_id);
|
|
if let Err(e) = conversation
|
|
.submit(Op::UserInput {
|
|
items: vec![InputItem::Text { text: prompt }],
|
|
})
|
|
.await
|
|
{
|
|
tracing::error!("Failed to submit user input: {e}");
|
|
// unregister the id so we don't keep it in the map
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.remove(&request_id);
|
|
return;
|
|
}
|
|
|
|
run_codex_tool_session_inner(
|
|
conversation,
|
|
outgoing,
|
|
request_id,
|
|
running_requests_id_to_codex_uuid,
|
|
)
|
|
.await;
|
|
}
|
|
|
|
async fn run_codex_tool_session_inner(
|
|
codex: Arc<CodexConversation>,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
request_id: RequestId,
|
|
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ConversationId>>>,
|
|
) {
|
|
let request_id_str = match &request_id {
|
|
RequestId::String(s) => s.clone(),
|
|
RequestId::Integer(n) => n.to_string(),
|
|
};
|
|
|
|
// Stream events until the task needs to pause for user interaction or
|
|
// completes.
|
|
loop {
|
|
match codex.next_event().await {
|
|
Ok(event) => {
|
|
outgoing
|
|
.send_event_as_notification(
|
|
&event,
|
|
Some(OutgoingNotificationMeta::new(Some(request_id.clone()))),
|
|
)
|
|
.await;
|
|
|
|
match event.msg {
|
|
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
|
command,
|
|
cwd,
|
|
call_id,
|
|
reason: _,
|
|
parsed_cmd,
|
|
}) => {
|
|
handle_exec_approval_request(
|
|
command,
|
|
cwd,
|
|
outgoing.clone(),
|
|
codex.clone(),
|
|
request_id.clone(),
|
|
request_id_str.clone(),
|
|
event.id.clone(),
|
|
call_id,
|
|
parsed_cmd,
|
|
)
|
|
.await;
|
|
continue;
|
|
}
|
|
EventMsg::Error(err_event) => {
|
|
// Return a response to conclude the tool call when the Codex session reports an error (e.g., interruption).
|
|
let result = json!({
|
|
"error": err_event.message,
|
|
});
|
|
outgoing.send_response(request_id.clone(), result).await;
|
|
break;
|
|
}
|
|
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
|
call_id,
|
|
reason,
|
|
grant_root,
|
|
changes,
|
|
}) => {
|
|
handle_patch_approval_request(
|
|
call_id,
|
|
reason,
|
|
grant_root,
|
|
changes,
|
|
outgoing.clone(),
|
|
codex.clone(),
|
|
request_id.clone(),
|
|
request_id_str.clone(),
|
|
event.id.clone(),
|
|
)
|
|
.await;
|
|
continue;
|
|
}
|
|
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
|
|
let text = match last_agent_message {
|
|
Some(msg) => msg,
|
|
None => "".to_string(),
|
|
};
|
|
let result = CallToolResult {
|
|
content: vec![ContentBlock::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text,
|
|
annotations: None,
|
|
})],
|
|
is_error: None,
|
|
structured_content: None,
|
|
};
|
|
outgoing.send_response(request_id.clone(), result).await;
|
|
// unregister the id so we don't keep it in the map
|
|
running_requests_id_to_codex_uuid
|
|
.lock()
|
|
.await
|
|
.remove(&request_id);
|
|
break;
|
|
}
|
|
EventMsg::SessionConfigured(_) => {
|
|
tracing::error!("unexpected SessionConfigured event");
|
|
}
|
|
EventMsg::AgentMessageDelta(_) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::AgentReasoningDelta(_) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::AgentReasoningRawContent(_)
|
|
| EventMsg::AgentReasoningRawContentDelta(_)
|
|
| EventMsg::TaskStarted(_)
|
|
| EventMsg::TokenCount(_)
|
|
| EventMsg::AgentReasoning(_)
|
|
| EventMsg::AgentReasoningSectionBreak(_)
|
|
| EventMsg::McpToolCallBegin(_)
|
|
| EventMsg::McpToolCallEnd(_)
|
|
| EventMsg::McpListToolsResponse(_)
|
|
| EventMsg::ListCustomPromptsResponse(_)
|
|
| EventMsg::ExecCommandBegin(_)
|
|
| EventMsg::ExecCommandOutputDelta(_)
|
|
| EventMsg::ExecCommandEnd(_)
|
|
| EventMsg::BackgroundEvent(_)
|
|
| EventMsg::StreamError(_)
|
|
| EventMsg::PatchApplyBegin(_)
|
|
| EventMsg::PatchApplyEnd(_)
|
|
| EventMsg::TurnDiff(_)
|
|
| EventMsg::WebSearchBegin(_)
|
|
| EventMsg::WebSearchEnd(_)
|
|
| EventMsg::GetHistoryEntryResponse(_)
|
|
| EventMsg::PlanUpdate(_)
|
|
| EventMsg::TurnAborted(_)
|
|
| EventMsg::ConversationPath(_)
|
|
| EventMsg::UserMessage(_)
|
|
| EventMsg::ShutdownComplete
|
|
| EventMsg::ViewImageToolCall(_)
|
|
| EventMsg::EnteredReviewMode(_)
|
|
| EventMsg::ExitedReviewMode(_) => {
|
|
// For now, we do not do anything extra for these
|
|
// events. Note that
|
|
// send(codex_event_to_notification(&event)) above has
|
|
// already dispatched these events as notifications,
|
|
// though we may want to do give different treatment to
|
|
// individual events in the future.
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let result = CallToolResult {
|
|
content: vec![ContentBlock::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: format!("Codex runtime error: {e}"),
|
|
annotations: None,
|
|
})],
|
|
is_error: Some(true),
|
|
// TODO(mbolin): Could present the error in a more
|
|
// structured way.
|
|
structured_content: None,
|
|
};
|
|
outgoing.send_response(request_id.clone(), result).await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|