Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Javascript serial queue using async?

I have the following code:

var async  = require('async');

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

var serialQueue = async.queue(function(task, callback) { 
    console.log(`done with tasks...`);
    callback(); 
}, 1);

async function fx(msg, waitTime) {
    console.log(`START FX: ${msg}. waitTime: ${waitTime}`);
    await timeout(waitTime);
    console.log(`DONE FX: ${msg}`);
}

async function test() {
    serialQueue.push(fx("1", 10000));
    serialQueue.push(fx("2", 2000));
    serialQueue.push(fx("3", 1000));
    await serialQueue.drain();
    console.log(`DRAINED!!!`);
}

test();

When I run the code above I get this:

START FX: 1. waitTime: 10000
START FX: 2. waitTime: 2000
START FX: 3. waitTime: 1000
DRAINED!!!
done with tasks...
done with tasks...
done with tasks...
all items have been processed
DONE FX: 3
DONE FX: 2
DONE FX: 1

To do that I referenced this link:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

https://caolan.github.io/async/v3/docs.html#queue

Could someone help me understand why the "serial" nature of the queue is not respected here? I set concurrency to 1 when I created serialQueue but, it looks like the queue isn’t really serializing the execution of the three functions passed in.

How can I get this queue to execute the three functions in order, so have 2 wait for 1 and 3 wait for 2…? Also, why is drained called before all functions are actually done executing?

>Solution :

There are a few things wrong here.

First: serialQueue.push(fx(...)) invokes fk immediately, does not wait for it to return (you have no await) and places the return value (a promise) to your queue. You need to pass something that actually captures your intent so that it can be invoked later. There are lots of ways of doing this. You could pass arrays, in the form [func, arg1, arg2, ...], or you could pass functions in the form async () => { await func(arg1, arg2, ...) }, or you could use currying to produce a new function with predefined arguments, etc. Regardless…

Second: You don’t do anything with task in your queue’s handler. It’s your job to do something, the queue just passes in the items you’ve given it. If you want the items to be functions, you have to invoke something, and await the promise it returns, if that’s what you want the queue to do for you. Right now, you’re ignoring task entirely.

Something like this:

var async  = require('async');

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms));

var serialQueue = async.queue(async function(task, callback) {
    await task(); // Actually invoke the task
    console.log(`done with tasks...`);
    callback();
}, 1);

async function fx(msg, waitTime) {
    console.log(`START FX: ${msg}. waitTime: ${waitTime}`);
    await timeout(waitTime);
    console.log(`DONE FX: ${msg}`);
}

async function test() {
    // Don't invoke the function, instead pass a thing that can be invoked later, in the queue's handler function
    serialQueue.push(async () => { await fx("1", 10000) });
    serialQueue.push(async () => { await fx("2", 2000) });
    serialQueue.push(async () => { await fx("3", 1000) });
    await serialQueue.drain();
    console.log(`DRAINED!!!`);
}

test();

Output:

START FX: 1. waitTime: 10000
DONE FX: 1
done with tasks...
START FX: 2. waitTime: 2000
DONE FX: 2
done with tasks...
START FX: 3. waitTime: 1000
DONE FX: 3
done with tasks...
DRAINED!!!
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading