Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Why blocked tokio worker can still work concurrently?

When I run

use tokio::sync::mpsc;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        while let Some(i) = rx.recv().await {
            println!("got = {}", i);
        } 
    });

    for i in 0..5 {

        // busy calculation
        std::thread::sleep(std::time::Duration::from_millis(10));

        match tx.try_send(i) {
            Ok(_) => {
                println!("sent = {}", i);
            },
            Err(err) => {
                println!("{}", err);
            }
        };

    };
 
}

I got

sent = 0
got = 0
sent = 1
got = 1
sent = 2
got = 2
sent = 3
got = 3
sent = 4

Form my understanding, the only worker is working on the for loop because it never yield. The only worker should have no chance to work on receiving. Therefore, the channel should be full after the first sending.
Turns out I am wrong. What am I missing?

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

>Solution :

The code in a #[tokio::main] function does not actually run on a worker thread. Therefore, the spawned task is sent to the only worker thread, while the for loop is executed on the program’s main thread.

Under the hood, tokio::main lifts the function body into an async block, builds a runtime, then passes the future for the generated async block to Runtime::block_on. According to this method’s documentation:

Note that the future required by this function does not run as a worker. The expectation is that other tasks are spawned by the future here. Awaiting on other futures from the future provided here will not perform as fast as those spawned as workers.

You can realize what you are expecting in two ways.

The first way is to lift the body of your main function into a new task:

use tokio::sync::mpsc;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
    tokio::spawn(async move {
        let (tx, mut rx) = mpsc::channel(1);

        tokio::spawn(async move {
            while let Some(i) = rx.recv().await {
                println!("got = {}", i);
            }
        });

        for i in 0..5 {
            // busy calculation
            std::thread::sleep(std::time::Duration::from_millis(10));

            match tx.try_send(i) {
                Ok(_) => {
                    println!("sent = {}", i);
                }
                Err(err) => {
                    println!("{}", err);
                }
            };
        }
    })
    .await
    .unwrap();
}

The second is to use the "current thread" runtime instead of the multithreaded runtime. This runs all tasks on the main thread.

use tokio::sync::mpsc;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        while let Some(i) = rx.recv().await {
            println!("got = {}", i);
        }
    });

    for i in 0..5 {
        // busy calculation
        std::thread::sleep(std::time::Duration::from_millis(10));

        match tx.try_send(i) {
            Ok(_) => {
                println!("sent = {}", i);
            }
            Err(err) => {
                println!("{}", err);
            }
        };
    }
}

Both of these will show "sent = 0" and then try_send will fail with "no available capacity."

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading