Bug fix: clone of incoming_tx can lead to deadlock (#2747)
POC code
```rust
use tokio::sync::mpsc;
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("=== Test 1: Simulating original MCP server pattern ===");
test_original_pattern().await;
}
async fn test_original_pattern() {
println!("Testing the original pattern from MCP server...");
// Create channel - this simulates the original incoming_tx/incoming_rx
let (tx, mut rx) = mpsc::channel::<String>(10);
// Task 1: Simulates stdin reader that will naturally terminate
let stdin_task = tokio::spawn({
let tx_clone = tx.clone();
async move {
println!(" stdin_task: Started, will send 3 messages then exit");
for i in 0..3 {
let msg = format!("Message {}", i);
if tx_clone.send(msg.clone()).await.is_err() {
println!(" stdin_task: Receiver dropped, exiting");
break;
}
println!(" stdin_task: Sent {}", msg);
tokio::time::sleep(Duration::from_millis(300)).await;
}
println!(" stdin_task: Finished (simulating EOF)");
// tx_clone is dropped here
}
});
// Task 2: Simulates message processor
let processor_task = tokio::spawn(async move {
println!(" processor_task: Started, waiting for messages");
while let Some(msg) = rx.recv().await {
println!(" processor_task: Processing {}", msg);
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!(" processor_task: Finished (channel closed)");
});
// Task 3: Simulates stdout writer or other background task
let background_task = tokio::spawn(async move {
for i in 0..2 {
tokio::time::sleep(Duration::from_millis(500)).await;
println!(" background_task: Tick {}", i);
}
println!(" background_task: Finished");
});
println!(" main: Original tx is still alive here");
println!(" main: About to call tokio::join! - will this deadlock?");
// This is the pattern from the original code
let _ = tokio::join!(stdin_task, processor_task, background_task);
}
```
---------
Co-authored-by: Michael Bolin <bolinfest@gmail.com>
This commit is contained in:
@@ -63,7 +63,6 @@ pub async fn run_main(
|
||||
|
||||
// Task: read from stdin, push to `incoming_tx`.
|
||||
let stdin_reader_handle = tokio::spawn({
|
||||
let incoming_tx = incoming_tx.clone();
|
||||
async move {
|
||||
let stdin = io::stdin();
|
||||
let reader = BufReader::new(stdin);
|
||||
|
||||
Reference in New Issue
Block a user