[mcp-server] Add reply tool call (#1643)

## Summary
Adds a new mcp tool call, `codex-reply`, so we can continue existing
sessions. This is a first draft and does not yet support sessions from
previous processes.

## Testing
- [x] tested with mcp client
This commit is contained in:
Dylan
2025-07-21 21:01:56 -07:00
committed by GitHub
parent d49d802b06
commit 18b2b30841
14 changed files with 301 additions and 47 deletions

View File

@@ -33,6 +33,7 @@ tokio = { version = "1", features = [
"rt-multi-thread",
"signal",
] }
uuid = { version = "1", features = ["serde", "v4"] }
[dev-dependencies]
assert_cmd = "2"

View File

@@ -160,6 +160,47 @@ impl CodexToolCallParam {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub(crate) struct CodexToolCallReplyParam {
/// The *session id* for this conversation.
pub session_id: String,
/// The *next user prompt* to continue the Codex conversation.
pub prompt: String,
}
/// Builds a `Tool` definition for the `codex-reply` tool-call.
pub(crate) fn create_tool_for_codex_tool_call_reply_param() -> Tool {
let schema = SchemaSettings::draft2019_09()
.with(|s| {
s.inline_subschemas = true;
s.option_add_null_type = false;
})
.into_generator()
.into_root_schema_for::<CodexToolCallReplyParam>();
#[expect(clippy::expect_used)]
let schema_value =
serde_json::to_value(&schema).expect("Codex reply tool schema should serialise to JSON");
let tool_input_schema =
serde_json::from_value::<ToolInputSchema>(schema_value).unwrap_or_else(|e| {
panic!("failed to create Tool from schema: {e}");
});
Tool {
name: "codex-reply".to_string(),
title: Some("Codex Reply".to_string()),
input_schema: tool_input_schema,
output_schema: None,
description: Some(
"Continue a Codex session by providing the session id and prompt.".to_string(),
),
annotations: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -235,4 +276,34 @@ mod tests {
});
assert_eq!(expected_tool_json, tool_json);
}
#[test]
fn verify_codex_tool_reply_json_schema() {
let tool = create_tool_for_codex_tool_call_reply_param();
#[expect(clippy::expect_used)]
let tool_json = serde_json::to_value(&tool).expect("tool serializes");
let expected_tool_json = serde_json::json!({
"description": "Continue a Codex session by providing the session id and prompt.",
"inputSchema": {
"properties": {
"prompt": {
"description": "The *next user prompt* to continue the Codex conversation.",
"type": "string"
},
"sessionId": {
"description": "The *session id* for this conversation.",
"type": "string"
},
},
"required": [
"prompt",
"sessionId",
],
"type": "object",
},
"name": "codex-reply",
"title": "Codex Reply",
});
assert_eq!(expected_tool_json, tool_json);
}
}

View File

@@ -2,6 +2,7 @@
//! Tokio task. Separated from `message_processor.rs` to keep that file small
//! and to make future feature-growth easier to manage.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -27,7 +28,9 @@ use mcp_types::TextContent;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use tokio::sync::Mutex;
use tracing::error;
use uuid::Uuid;
use crate::outgoing_message::OutgoingMessageSender;
@@ -42,8 +45,9 @@ pub async fn run_codex_tool_session(
initial_prompt: String,
config: CodexConfig,
outgoing: Arc<OutgoingMessageSender>,
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
) {
let (codex, first_event, _ctrl_c) = match init_codex(config).await {
let (codex, first_event, _ctrl_c, session_id) = match init_codex(config).await {
Ok(res) => res,
Err(e) => {
let result = CallToolResult {
@@ -61,6 +65,11 @@ pub async fn run_codex_tool_session(
};
let codex = Arc::new(codex);
// update the session map so we can retrieve the session in a reply, and then drop it, since
// we no longer need it for this function
session_map.lock().await.insert(session_id, codex.clone());
drop(session_map);
// Send initial SessionConfigured event.
outgoing.send_event_as_notification(&first_event).await;
@@ -85,6 +94,37 @@ pub async fn run_codex_tool_session(
tracing::error!("Failed to submit initial prompt: {e}");
}
run_codex_tool_session_inner(codex, outgoing, id).await;
}
pub async fn run_codex_tool_session_reply(
codex: Arc<Codex>,
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
prompt: String,
) {
if let Err(e) = codex
.submit(Op::UserInput {
items: vec![InputItem::Text { text: prompt }],
})
.await
{
tracing::error!("Failed to submit user input: {e}");
}
run_codex_tool_session_inner(codex, outgoing, request_id).await;
}
async fn run_codex_tool_session_inner(
codex: Arc<Codex>,
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
) {
let sub_id = match &request_id {
RequestId::String(s) => s.clone(),
RequestId::Integer(n) => n.to_string(),
};
// Stream events until the task needs to pause for user interaction or
// completes.
loop {
@@ -128,7 +168,7 @@ pub async fn run_codex_tool_session(
outgoing
.send_error(
id.clone(),
request_id.clone(),
JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message,
@@ -168,7 +208,9 @@ pub async fn run_codex_tool_session(
is_error: None,
structured_content: None,
};
outgoing.send_response(id.clone(), result.into()).await;
outgoing
.send_response(request_id.clone(), result.into())
.await;
// Continue, don't break so the session continues.
continue;
}
@@ -186,7 +228,9 @@ pub async fn run_codex_tool_session(
is_error: None,
structured_content: None,
};
outgoing.send_response(id.clone(), result.into()).await;
outgoing
.send_response(request_id.clone(), result.into())
.await;
break;
}
EventMsg::SessionConfigured(_) => {
@@ -234,7 +278,9 @@ pub async fn run_codex_tool_session(
// structured way.
structured_content: None,
};
outgoing.send_response(id.clone(), result.into()).await;
outgoing
.send_response(request_id.clone(), result.into())
.await;
break;
}
}

View File

@@ -1,10 +1,14 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use crate::codex_tool_config::CodexToolCallParam;
use crate::codex_tool_config::CodexToolCallReplyParam;
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
use crate::codex_tool_config::create_tool_for_codex_tool_call_reply_param;
use crate::outgoing_message::OutgoingMessageSender;
use codex_core::Codex;
use codex_core::config::Config as CodexConfig;
use mcp_types::CallToolRequestParams;
use mcp_types::CallToolResult;
@@ -22,12 +26,15 @@ use mcp_types::ServerCapabilitiesTools;
use mcp_types::ServerNotification;
use mcp_types::TextContent;
use serde_json::json;
use tokio::sync::Mutex;
use tokio::task;
use uuid::Uuid;
pub(crate) struct MessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
initialized: bool,
codex_linux_sandbox_exe: Option<PathBuf>,
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
}
impl MessageProcessor {
@@ -41,6 +48,7 @@ impl MessageProcessor {
outgoing: Arc::new(outgoing),
initialized: false,
codex_linux_sandbox_exe,
session_map: Arc::new(Mutex::new(HashMap::new())),
}
}
@@ -272,7 +280,10 @@ impl MessageProcessor {
) {
tracing::trace!("tools/list -> {params:?}");
let result = ListToolsResult {
tools: vec![create_tool_for_codex_tool_call_param()],
tools: vec![
create_tool_for_codex_tool_call_param(),
create_tool_for_codex_tool_call_reply_param(),
],
next_cursor: None,
};
@@ -288,23 +299,29 @@ impl MessageProcessor {
tracing::info!("tools/call -> params: {:?}", params);
let CallToolRequestParams { name, arguments } = params;
// We only support the "codex" tool for now.
if name != "codex" {
// Tool not found return error result so the LLM can react.
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_string(),
text: format!("Unknown tool '{name}'"),
annotations: None,
})],
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(id, result)
.await;
return;
match name.as_str() {
"codex" => self.handle_tool_call_codex(id, arguments).await,
"codex-reply" => {
self.handle_tool_call_codex_session_reply(id, arguments)
.await
}
_ => {
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_string(),
text: format!("Unknown tool '{name}'"),
annotations: None,
})],
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(id, result)
.await;
}
}
}
async fn handle_tool_call_codex(&self, id: RequestId, arguments: Option<serde_json::Value>) {
let (initial_prompt, config): (String, CodexConfig) = match arguments {
Some(json_val) => match serde_json::from_value::<CodexToolCallParam>(json_val) {
Ok(tool_cfg) => match tool_cfg.into_config(self.codex_linux_sandbox_exe.clone()) {
@@ -359,15 +376,127 @@ impl MessageProcessor {
}
};
// Clone outgoing sender to move into async task.
// Clone outgoing and session map to move into async task.
let outgoing = self.outgoing.clone();
let session_map = self.session_map.clone();
// Spawn an async task to handle the Codex session so that we do not
// block the synchronous message-processing loop.
task::spawn(async move {
// Run the Codex session and stream events back to the client.
crate::codex_tool_runner::run_codex_tool_session(id, initial_prompt, config, outgoing)
.await;
crate::codex_tool_runner::run_codex_tool_session(
id,
initial_prompt,
config,
outgoing,
session_map,
)
.await;
});
}
async fn handle_tool_call_codex_session_reply(
&self,
request_id: RequestId,
arguments: Option<serde_json::Value>,
) {
tracing::info!("tools/call -> params: {:?}", arguments);
// parse arguments
let CodexToolCallReplyParam { session_id, prompt } = match arguments {
Some(json_val) => match serde_json::from_value::<CodexToolCallReplyParam>(json_val) {
Ok(params) => params,
Err(e) => {
tracing::error!("Failed to parse Codex tool call reply parameters: {e}");
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_owned(),
text: format!("Failed to parse configuration for Codex tool: {e}"),
annotations: None,
})],
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(request_id, result)
.await;
return;
}
},
None => {
tracing::error!(
"Missing arguments for codex-reply tool-call; the `session_id` and `prompt` fields are required."
);
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_owned(),
text: "Missing arguments for codex-reply tool-call; the `session_id` and `prompt` fields are required.".to_owned(),
annotations: None,
})],
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(request_id, result)
.await;
return;
}
};
let session_id = match Uuid::parse_str(&session_id) {
Ok(id) => id,
Err(e) => {
tracing::error!("Failed to parse session_id: {e}");
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_owned(),
text: format!("Failed to parse session_id: {e}"),
annotations: None,
})],
is_error: Some(true),
structured_content: None,
};
self.send_response::<mcp_types::CallToolRequest>(request_id, result)
.await;
return;
}
};
// load codex from session map
let session_map_mutex = Arc::clone(&self.session_map);
// Clone outgoing and session map to move into async task.
let outgoing = self.outgoing.clone();
// Spawn an async task to handle the Codex session so that we do not
// block the synchronous message-processing loop.
task::spawn(async move {
let session_map = session_map_mutex.lock().await;
let codex = match session_map.get(&session_id) {
Some(codex) => codex,
None => {
tracing::warn!("Session not found for session_id: {session_id}");
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_owned(),
text: format!("Session not found for session_id: {session_id}"),
annotations: None,
})],
is_error: Some(true),
structured_content: None,
};
// unwrap_or_default is fine here because we know the result is valid JSON
outgoing
.send_response(request_id, serde_json::to_value(result).unwrap_or_default())
.await;
return;
}
};
crate::codex_tool_runner::run_codex_tool_session_reply(
codex.clone(),
outgoing,
request_id,
prompt.clone(),
)
.await;
});
}