fix: switch to unbounded channel (#2874)
#2747 encouraged me to audit our codebase for similar issues, as now I am particularly suspicious that our flaky tests are due to a racy deadlock. I asked Codex to audit our code, and one of its suggestions was this: > **High-Risk Patterns** > > All `send_*` methods await on a bounded `mpsc::Sender<OutgoingMessage>`. If the writer blocks, the channel fills and the processor task blocks on send, stops draining incoming requests, and stdin reader eventually blocks on its send. This creates a backpressure deadlock cycle across the three tasks. > > **Recommendations** > * Server outgoing path: break the backpressure cycle > * Option A (minimal risk): Change `OutgoingMessageSender` to use an unbounded channel to decouple producer from stdout. Add rate logging so floods are visible. > * Option B (bounded + drop policy): Change `send_*` to try_send and drop messages (or coalesce) when the queue is full, logging a warning. This prevents processor stalls at the cost of losing messages under extreme backpressure. > * Option C (two-stage buffer): Keep bounded channel, but have a dedicated “egress” task that drains an unbounded internal queue, writing to stdout with retries and a shutdown timeout. This centralizes backpressure policy. So this PR is Option A. Indeed, we previously used a bounded channel with a capacity of `128`, but as we discovered recently with #2776, there are certainly cases where we can get flooded with events. That said, `test_shell_command_approval_triggers_elicitation` just failed one one build when I put up this PR, so clearly we are not out of the woods yet... **Update:** I think I found the true source of the deadlock! See https://github.com/openai/codex/pull/2876
This commit is contained in:
@@ -59,7 +59,7 @@ pub async fn run_main(
|
|||||||
|
|
||||||
// Set up channels.
|
// Set up channels.
|
||||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
|
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
|
||||||
|
|
||||||
// Task: read from stdin, push to `incoming_tx`.
|
// Task: read from stdin, push to `incoming_tx`.
|
||||||
let stdin_reader_handle = tokio::spawn({
|
let stdin_reader_handle = tokio::spawn({
|
||||||
|
|||||||
@@ -24,12 +24,12 @@ use crate::error_code::INTERNAL_ERROR_CODE;
|
|||||||
/// Sends messages to the client and manages request callbacks.
|
/// Sends messages to the client and manages request callbacks.
|
||||||
pub(crate) struct OutgoingMessageSender {
|
pub(crate) struct OutgoingMessageSender {
|
||||||
next_request_id: AtomicI64,
|
next_request_id: AtomicI64,
|
||||||
sender: mpsc::Sender<OutgoingMessage>,
|
sender: mpsc::UnboundedSender<OutgoingMessage>,
|
||||||
request_id_to_callback: Mutex<HashMap<RequestId, oneshot::Sender<Result>>>,
|
request_id_to_callback: Mutex<HashMap<RequestId, oneshot::Sender<Result>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OutgoingMessageSender {
|
impl OutgoingMessageSender {
|
||||||
pub(crate) fn new(sender: mpsc::Sender<OutgoingMessage>) -> Self {
|
pub(crate) fn new(sender: mpsc::UnboundedSender<OutgoingMessage>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
next_request_id: AtomicI64::new(0),
|
next_request_id: AtomicI64::new(0),
|
||||||
sender,
|
sender,
|
||||||
@@ -55,7 +55,7 @@ impl OutgoingMessageSender {
|
|||||||
method: method.to_string(),
|
method: method.to_string(),
|
||||||
params,
|
params,
|
||||||
});
|
});
|
||||||
let _ = self.sender.send(outgoing_message).await;
|
let _ = self.sender.send(outgoing_message);
|
||||||
rx_approve
|
rx_approve
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -81,7 +81,7 @@ impl OutgoingMessageSender {
|
|||||||
match serde_json::to_value(response) {
|
match serde_json::to_value(response) {
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
|
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
|
||||||
let _ = self.sender.send(outgoing_message).await;
|
let _ = self.sender.send(outgoing_message);
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.send_error(
|
self.send_error(
|
||||||
@@ -130,17 +130,17 @@ impl OutgoingMessageSender {
|
|||||||
};
|
};
|
||||||
let outgoing_message =
|
let outgoing_message =
|
||||||
OutgoingMessage::Notification(OutgoingNotification { method, params });
|
OutgoingMessage::Notification(OutgoingNotification { method, params });
|
||||||
let _ = self.sender.send(outgoing_message).await;
|
let _ = self.sender.send(outgoing_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
|
pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
|
||||||
let outgoing_message = OutgoingMessage::Notification(notification);
|
let outgoing_message = OutgoingMessage::Notification(notification);
|
||||||
let _ = self.sender.send(outgoing_message).await;
|
let _ = self.sender.send(outgoing_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn send_error(&self, id: RequestId, error: JSONRPCErrorError) {
|
pub(crate) async fn send_error(&self, id: RequestId, error: JSONRPCErrorError) {
|
||||||
let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error });
|
let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error });
|
||||||
let _ = self.sender.send(outgoing_message).await;
|
let _ = self.sender.send(outgoing_message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -250,7 +250,7 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_send_event_as_notification() {
|
async fn test_send_event_as_notification() {
|
||||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(2);
|
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
|
||||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||||
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
@@ -281,7 +281,7 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_send_event_as_notification_with_meta() {
|
async fn test_send_event_as_notification_with_meta() {
|
||||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(2);
|
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
|
||||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||||
|
|
||||||
let session_configured_event = SessionConfiguredEvent {
|
let session_configured_event = SessionConfiguredEvent {
|
||||||
|
|||||||
Reference in New Issue
Block a user