Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue concurent tasks delay #1113

Closed
charbel14 opened this issue Apr 16, 2016 · 5 comments
Closed

Queue concurent tasks delay #1113

charbel14 opened this issue Apr 16, 2016 · 5 comments

Comments

@charbel14
Copy link

charbel14 commented Apr 16, 2016

Is there a way to introduce a delay between concurrent tasks?

For example say I set my concurrent number to 5. Is there a way to wait until all the 5 tasks have completed and process the next set of 5? wait for 30 seconds before running the next batch of 5 tasks?

Tried something like below..the alternative is to not push 5 at a time and wait for queue to be empty and push the next 5. not as clean. If the queue had concurentDelay(maxdelaytimeout, proceedIfIdle - boolean); would be a lot cleaner.

q.saturated = function () {
    //queue limit reached
    console.log('Saturated');
    q.pause();

    var ticks = 0;
    var intervalObject = setInterval(function () {
        ticks++;
        console.log(ticks, 'seconds passed');
        //Resume when Idle or after 30 seconds have elapsed.
        if (q.idle() || ticks == 6) {
            console.log('exiting');
            clearInterval(intervalObject);
            q.resume();
        }
    },  5000);
};

Thanks a lot.

@hargasinski
Copy link
Collaborator

hargasinski commented Apr 16, 2016

If you're going to be adding your tasks synchronously, or your worker function's delay is long enough, you could look into cargo. It processes tasks in batches specified by the payload argument. You could then just add a delay to your worker function.

This is the modified cargo example from the README.md for your case:

var cargo = async.cargo(function (tasks, callback) {
     setTimeout(function() {
         for(var i=0; i<tasks.length; i++) {
             console.log('hello ' + tasks[i].name);
         }
         callback();
    }, 30*1000);
}, 2);

cargo.push({name: 'foo'}, function (err) {
    console.log('finished processing foo');
});
cargo.push({name: 'bar'}, function (err) {
    console.log('finished processing bar');
});

cargo.push({name: 'baz'}, function (err) {
    console.log('finished processing baz');
});

// hello foo
// hello bar
// finished processing foo
// finished processing bar
// hello baz
// finished processing baz

@charbel14
Copy link
Author

charbel14 commented Apr 16, 2016

This is interesting . I did not realize cargo had that feature. Thanks for that.

In my case the task is and http request. In fact I am using npm "request" library and was also going to introduce retry in case the request failed due to load...

This is actually batching POST requests to avoid being blocked or throttled by API provider.

Still trying to figure how the retry would work and how i can provide which responses are retryable.

Reading through the entire readme I am a bit lost on which way to go given my requirements.

I was kind of hoping to keep track of all the POST requests, to later report which ones passed, retried and failed, retried and passed etc.. and in case of failure the possible error. Throttling my requests to run 5 in parallel once they all pass run the next 5. if any failed retry up to 3 times. to avoid a block I want to introduce timeout too... for example the case where 1 requests exceeded 30 seconds. just purge it as error.

Thanks.

@aearly
Copy link
Collaborator

aearly commented Apr 22, 2016

There's a market for some sort of "throttled queue" functionality. A queue where the worker is only allowed to operate n times every xxx milliseconds. People have described problems in other issues that would be solved by something like that. Nothing like it exists in Async today.

It would be a bit tricky -- left unbounded, a rush of requests could overload memory. You'd have to have a strategy for dealing with backpressure. If it needed to be reliable, you'd probably want to use a more dedicated message queue service like RabbitMQ or similar.

@charbel14
Copy link
Author

charbel14 commented Apr 26, 2016

What I did is:
1 - Use Cargo.
2 - For each set of tasks use async.each.
3 - For each task, perform an http request using requestretry library.

Seems to work nicely.

Only comment I have is that the callbacks do not take any arguments. Was hoping to have the cargo final callback to keep track of all responses and collect the needed info to return in the response.

var cargo = async.cargo(function (tasks, callback) {
    async.each(tasks,function(task,taskCallback){
        var callbackWrapper = function(error, response, body) {
            taskCallback(error,response,body);//arguments are ignored
        };
        sendRequest(requestInfo,callbackWrapper);
    }, function(err,response,body){
        callback(err, response, body); //does not pass the arguments
    });
}, 5);

@aearly
Copy link
Collaborator

aearly commented Apr 1, 2017

We've decided Async is going to stay out of the throttling business. There's too much complexity to handle here.

@aearly aearly closed this as completed Apr 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants