Async channel can't stop

I am a beginner in rust asynchronous programming. I’m writing a program to implement the Sleep Sort algorithm, which can sort a non-negative array. The principle of its implementation is that for each element in the array, wait for a period of time before outputting, and the waiting time is proportional to the value of the element. In this way, after a period of time, the array will be output in order from small to large.

use std::time::Duration;

use text_io::read;
use tokio::sync::mpsc::channel;

#[tokio::main]
async fn main() {
    let n: u32 = read!();
    let (tx, mut rx) = channel::<u32>(n as usize);
    let mut before: Vec<u32> = Vec::new();
    for _ in 0..n {
        let a = read!();
        before.push(a);
    }

    for i in before.into_iter() {
        let tx = tx.clone();
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis((50 * i) as u64)).await;
            if let Err(_) = tx.send(i).await {
                panic!();
            }
        });
    }

    while let Some(i) = rx.recv().await {
        println!("{}", i);
    }
}

This program prints the correct answer, but it doesn’t stop. I don’t konw why and how to solve it.

>Solution :

From the recv documentation (emphasis mine):

This method returns None if the channel has been closed and there are no remaining messages in the channel’s buffer. […] The channel is closed when all senders have been dropped, or when close is called.

So, to stop receiving, you need to either drop the extra sender you’re still holding in main before receiving:

// -- snip --

std::mem::drop(tx);
while let Some(i) = rx.recv().await {
    println!("{}", i);
}

or else call rx.close(). However, this will cause senders trying to send messages afterwards to fail, and thus will break your sleep-sort implementation. So you need to simply drop the extra sender in this case.

Leave a Reply