From 970e466ab3465ad037542042fc5086718ba6f98b Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Thu, 28 Aug 2025 22:20:10 -0700 Subject: [PATCH] fix: switch to unbounded channel (#2874) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #2747 encouraged me to audit our codebase for similar issues, as now I am particularly suspicious that our flaky tests are due to a racy deadlock. I asked Codex to audit our code, and one of its suggestions was this: > **High-Risk Patterns** > > All `send_*` methods await on a bounded `mpsc::Sender`. If the writer blocks, the channel fills and the processor task blocks on send, stops draining incoming requests, and stdin reader eventually blocks on its send. This creates a backpressure deadlock cycle across the three tasks. > > **Recommendations** > * Server outgoing path: break the backpressure cycle > * Option A (minimal risk): Change `OutgoingMessageSender` to use an unbounded channel to decouple producer from stdout. Add rate logging so floods are visible. > * Option B (bounded + drop policy): Change `send_*` to try_send and drop messages (or coalesce) when the queue is full, logging a warning. This prevents processor stalls at the cost of losing messages under extreme backpressure. > * Option C (two-stage buffer): Keep bounded channel, but have a dedicated “egress” task that drains an unbounded internal queue, writing to stdout with retries and a shutdown timeout. This centralizes backpressure policy. So this PR is Option A. Indeed, we previously used a bounded channel with a capacity of `128`, but as we discovered recently with #2776, there are certainly cases where we can get flooded with events. That said, `test_shell_command_approval_triggers_elicitation` just failed one one build when I put up this PR, so clearly we are not out of the woods yet... **Update:** I think I found the true source of the deadlock! See https://github.com/openai/codex/pull/2876 --- codex-rs/mcp-server/src/lib.rs | 2 +- codex-rs/mcp-server/src/outgoing_message.rs | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index 702cb501..f4bebdf5 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -59,7 +59,7 @@ pub async fn run_main( // Set up channels. let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); // Task: read from stdin, push to `incoming_tx`. let stdin_reader_handle = tokio::spawn({ diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index 5f206cb0..abfc542e 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -24,12 +24,12 @@ use crate::error_code::INTERNAL_ERROR_CODE; /// Sends messages to the client and manages request callbacks. pub(crate) struct OutgoingMessageSender { next_request_id: AtomicI64, - sender: mpsc::Sender, + sender: mpsc::UnboundedSender, request_id_to_callback: Mutex>>, } impl OutgoingMessageSender { - pub(crate) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { Self { next_request_id: AtomicI64::new(0), sender, @@ -55,7 +55,7 @@ impl OutgoingMessageSender { method: method.to_string(), params, }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); rx_approve } @@ -81,7 +81,7 @@ impl OutgoingMessageSender { match serde_json::to_value(response) { Ok(result) => { let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } Err(err) => { self.send_error( @@ -130,17 +130,17 @@ impl OutgoingMessageSender { }; let outgoing_message = OutgoingMessage::Notification(OutgoingNotification { method, params }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } pub(crate) async fn send_notification(&self, notification: OutgoingNotification) { let outgoing_message = OutgoingMessage::Notification(notification); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } pub(crate) async fn send_error(&self, id: RequestId, error: JSONRPCErrorError) { let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error }); - let _ = self.sender.send(outgoing_message).await; + let _ = self.sender.send(outgoing_message); } } @@ -250,7 +250,7 @@ mod tests { #[tokio::test] async fn test_send_event_as_notification() { - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(2); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); let event = Event { @@ -281,7 +281,7 @@ mod tests { #[tokio::test] async fn test_send_event_as_notification_with_meta() { - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(2); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); let session_configured_event = SessionConfiguredEvent {