Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added rateLimit parameter to the queue #1425

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/cargo.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ import queue from './internal/queue';
* });
*/
export default function cargo(worker, payload) {
return queue(worker, 1, payload);
return queue(worker, 1, null, payload);
}
30 changes: 28 additions & 2 deletions lib/internal/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,34 @@ import setImmediate from './setImmediate';
import DLL from './DoublyLinkedList';
import wrapAsync from './wrapAsync';

export default function queue(worker, concurrency, payload) {
export default function queue(worker, concurrency, rateLimit, payload) {
if (concurrency == null) {
concurrency = 1;
}
else if(concurrency === 0) {
throw new Error('Concurrency must not be zero');
}

if (typeof rateLimit === 'number' && rateLimit <= 0) {
throw new Error('rateLimit must greater than zero');
}

var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
var tokens = 0;
var interval = null;

// add tokens to the token count at the given rateLimit
if (rateLimit) {
tokens = (rateLimit > 1)? rateLimit : 1;
interval = setInterval(function() {
if (tokens < 1) {
tokens += rateLimit;
}
q.process();
}, 1000);
}

function _insert(data, insertAtFront, callback) {
if (callback != null && typeof callback !== 'function') {
Expand Down Expand Up @@ -97,6 +114,11 @@ export default function queue(worker, concurrency, payload) {
kill: function () {
q.drain = noop;
q._tasks.empty();
if (interval) {
clearInterval(interval);
interval = null;
}

},
unshift: function (data, callback) {
_insert(data, true, callback);
Expand All @@ -111,7 +133,7 @@ export default function queue(worker, concurrency, payload) {
return;
}
isProcessing = true;
while(!q.paused && numRunning < q.concurrency && q._tasks.length){
while(!q.paused && numRunning < q.concurrency && q._tasks.length && (!rateLimit || (rateLimit && tokens >= 1))) {
var tasks = [], data = [];
var l = q._tasks.length;
if (q.payload) l = Math.min(l, q.payload);
Expand All @@ -134,6 +156,10 @@ export default function queue(worker, concurrency, payload) {

var cb = onlyOnce(_next(tasks));
_worker(data, cb);
if (rateLimit) {
tokens--;
}

}
isProcessing = false;
},
Expand Down
8 changes: 6 additions & 2 deletions lib/priorityQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import queue from './queue';
* @param {number} concurrency - An `integer` for determining how many `worker`
* functions should be run in parallel. If omitted, the concurrency defaults to
* `1`. If the concurrency is `0`, an error is thrown.
* @param {number} [rateLimit=null] - A `number` that determines the maximum number
* of times per second work will be consumed from the queue. If omitted, the rateLimit
* defaults to `1` - 1 item per second. If the rateLimit is `0` or less, an error is thrown.
* A rate limit of `null` means "no rate limit - as fast as possible"
* @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two
* differences between `queue` and `priorityQueue` objects:
* * `push(task, priority, [callback])` - `priority` should be a number. If an
* array of `tasks` is given, all tasks will be assigned the same priority.
* * The `unshift` method was removed.
*/
export default function(worker, concurrency) {
export default function(worker, concurrency, rateLimit) {
// Start with a normal queue
var q = queue(worker, concurrency);
var q = queue(worker, concurrency, rateLimit);

// Override push to accept second parameter representing priority
q.push = function(data, priority, callback) {
Expand Down
9 changes: 7 additions & 2 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ import wrapAsync from './internal/wrapAsync';
* @param {number} [concurrency=1] - An `integer` for determining how many
* `worker` functions should be run in parallel. If omitted, the concurrency
* defaults to `1`. If the concurrency is `0`, an error is thrown.
* @param {number} [rateLimit=null] - A `number` that determines the maximum number
* of times per second work will be consumed from the queue. If omitted, the rateLimit
* defaults to `1` - 1 item per second. If the rateLimit is `0` or less, an error is thrown.
* A rate limit of `null` means "no rate limit - as fast as possible"
* @returns {module:ControlFlow.QueueObject} A queue object to manage the tasks. Callbacks can
* attached as certain properties to listen for specific events during the
* lifecycle of the queue.
Expand Down Expand Up @@ -106,9 +110,10 @@ import wrapAsync from './internal/wrapAsync';
* console.log('finished processing bar');
* });
*/
export default function (worker, concurrency) {
export default function (worker, concurrency, rateLimit) {
if (typeof rateLimit != 'number') rateLimit = null;
var _worker = wrapAsync(worker);
return queue(function (items, cb) {
_worker(items[0], cb);
}, concurrency, 1);
}, concurrency, rateLimit, 1);
}
70 changes: 70 additions & 0 deletions mocha_test/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,76 @@ describe('queue', function(){
done();
});

it('rate limiting', function(done) {
// this is a long-running test
this.timeout(4000);

var call_order = [],
delays = [40,10,60,10];

// order of completion: 1,2,3,4
// create queue that only complete 1 task a second
var q = async.queue(function (task, callback) {
setTimeout(function () {
call_order.push('process ' + task);
callback('error', 'arg');
}, delays.shift());
},1, 1);
var start = new Date().getTime();

q.push(1, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(3);
call_order.push('callback ' + 1);
});
q.push(2, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(2);
call_order.push('callback ' + 2);
});
q.push(3, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(1);
call_order.push('callback ' + 3);
});
q.push(4, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
call_order.push('callback ' + 4);
});
expect(q.length()).to.equal(4);
expect(q.concurrency).to.equal(1);

q.drain = function () {
expect(call_order).to.eql([
'process 1', 'callback 1',
'process 2', 'callback 2',
'process 3', 'callback 3',
'process 4', 'callback 4'
]);
expect(q.concurrency).to.equal(1);
expect(q.length()).to.equal(0);
q.kill();
// with 4 tasks, running at 1 per second
// this test should take over 3 seconds
expect((new Date().getTime() - start > 3)).to.equal(true);
done();
};
});

it('zero concurrency', function(done){
expect(function () {
async.queue(function (task, callback) {
callback(null, task);
}, 0);
}).to.throw();
done();
});

it('error propagation', function(done){
var results = [];

Expand Down