diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index 702cb501..f4bebdf5 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -59,7 +59,7 @@ pub async fn run_main( // Set up channels. let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); // Task: read from stdin, push to `incoming_tx`. let stdin_reader_handle = tokio::spawn({ diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index 5f206cb0..abfc542e 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -24,12 +24,12 @@ use crate::error_code::INTERNAL_ERROR_CODE; /// Sends messages to the client and manages request callbacks. pub(crate) struct OutgoingMessageSender { next_request_id: AtomicI64, - sender: mpsc::Sender, + sender: mpsc::UnboundedSender, request_id_to_callback: Mutex>>, } impl OutgoingMessageSender { - pub(crate) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { Self { next_request_id: AtomicI64::new(0), sender, @@ -55,7 +55,7 @@ impl OutgoingMessageSender { method: method.to_string(), params, }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); rx_approve } @@ -81,7 +81,7 @@ impl OutgoingMessageSender { match serde_json::to_value(response) { Ok(result) => { let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } Err(err) => { self.send_error( @@ -130,17 +130,17 @@ impl OutgoingMessageSender { }; let outgoing_message = OutgoingMessage::Notification(OutgoingNotification { method, params }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } pub(crate) async fn send_notification(&self, notification: OutgoingNotification) { let outgoing_message = OutgoingMessage::Notification(notification); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } pub(crate) async fn send_error(&self, id: RequestId, error: JSONRPCErrorError) { let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } } @@ -250,7 +250,7 @@ mod tests { #[tokio::test] async fn test_send_event_as_notification() { - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(2); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); let event = Event { @@ -281,7 +281,7 @@ mod tests { #[tokio::test] async fn test_send_event_as_notification_with_meta() { - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(2); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); let session_configured_event = SessionConfiguredEvent {