diff --git a/lib/cargo.js b/lib/cargo.js index 5060223fa..62d9a99e0 100644 --- a/lib/cargo.js +++ b/lib/cargo.js @@ -78,5 +78,5 @@ import queue from './internal/queue'; * }); */ export default function cargo(worker, payload) { - return queue(worker, 1, payload); + return queue(worker, 1, null, payload); } diff --git a/lib/internal/queue.js b/lib/internal/queue.js index e357447f9..3624c1f46 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -7,7 +7,7 @@ import setImmediate from './setImmediate'; import DLL from './DoublyLinkedList'; import wrapAsync from './wrapAsync'; -export default function queue(worker, concurrency, payload) { +export default function queue(worker, concurrency, rateLimit, payload) { if (concurrency == null) { concurrency = 1; } @@ -15,9 +15,26 @@ export default function queue(worker, concurrency, payload) { throw new Error('Concurrency must not be zero'); } + if (typeof rateLimit === 'number' && rateLimit <= 0) { + throw new Error('rateLimit must greater than zero'); + } + var _worker = wrapAsync(worker); var numRunning = 0; var workersList = []; + var tokens = 0; + var interval = null; + + // add tokens to the token count at the given rateLimit + if (rateLimit) { + tokens = (rateLimit > 1)? rateLimit : 1; + interval = setInterval(function() { + if (tokens < 1) { + tokens += rateLimit; + } + q.process(); + }, 1000); + } function _insert(data, insertAtFront, callback) { if (callback != null && typeof callback !== 'function') { @@ -97,6 +114,11 @@ export default function queue(worker, concurrency, payload) { kill: function () { q.drain = noop; q._tasks.empty(); + if (interval) { + clearInterval(interval); + interval = null; + } + }, unshift: function (data, callback) { _insert(data, true, callback); @@ -111,7 +133,7 @@ export default function queue(worker, concurrency, payload) { return; } isProcessing = true; - while(!q.paused && numRunning < q.concurrency && q._tasks.length){ + while(!q.paused && numRunning < q.concurrency && q._tasks.length && (!rateLimit || (rateLimit && tokens >= 1))) { var tasks = [], data = []; var l = q._tasks.length; if (q.payload) l = Math.min(l, q.payload); @@ -134,6 +156,10 @@ export default function queue(worker, concurrency, payload) { var cb = onlyOnce(_next(tasks)); _worker(data, cb); + if (rateLimit) { + tokens--; + } + } isProcessing = false; }, diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index aff729379..a73fdc5e1 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -22,15 +22,19 @@ import queue from './queue'; * @param {number} concurrency - An `integer` for determining how many `worker` * functions should be run in parallel. If omitted, the concurrency defaults to * `1`. If the concurrency is `0`, an error is thrown. +* @param {number} [rateLimit=null] - A `number` that determines the maximum number + * of times per second work will be consumed from the queue. If omitted, the rateLimit + * defaults to `1` - 1 item per second. If the rateLimit is `0` or less, an error is thrown. + * A rate limit of `null` means "no rate limit - as fast as possible" * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two * differences between `queue` and `priorityQueue` objects: * * `push(task, priority, [callback])` - `priority` should be a number. If an * array of `tasks` is given, all tasks will be assigned the same priority. * * The `unshift` method was removed. */ -export default function(worker, concurrency) { +export default function(worker, concurrency, rateLimit) { // Start with a normal queue - var q = queue(worker, concurrency); + var q = queue(worker, concurrency, rateLimit); // Override push to accept second parameter representing priority q.push = function(data, priority, callback) { diff --git a/lib/queue.js b/lib/queue.js index 2ba1747b3..970efa78d 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -72,6 +72,10 @@ import wrapAsync from './internal/wrapAsync'; * @param {number} [concurrency=1] - An `integer` for determining how many * `worker` functions should be run in parallel. If omitted, the concurrency * defaults to `1`. If the concurrency is `0`, an error is thrown. + * @param {number} [rateLimit=null] - A `number` that determines the maximum number + * of times per second work will be consumed from the queue. If omitted, the rateLimit + * defaults to `1` - 1 item per second. If the rateLimit is `0` or less, an error is thrown. + * A rate limit of `null` means "no rate limit - as fast as possible" * @returns {module:ControlFlow.QueueObject} A queue object to manage the tasks. Callbacks can * attached as certain properties to listen for specific events during the * lifecycle of the queue. @@ -106,9 +110,10 @@ import wrapAsync from './internal/wrapAsync'; * console.log('finished processing bar'); * }); */ -export default function (worker, concurrency) { +export default function (worker, concurrency, rateLimit) { + if (typeof rateLimit != 'number') rateLimit = null; var _worker = wrapAsync(worker); return queue(function (items, cb) { _worker(items[0], cb); - }, concurrency, 1); + }, concurrency, rateLimit, 1); } diff --git a/mocha_test/queue.js b/mocha_test/queue.js index 7525c3056..99790fa81 100644 --- a/mocha_test/queue.js +++ b/mocha_test/queue.js @@ -126,6 +126,76 @@ describe('queue', function(){ done(); }); + it('rate limiting', function(done) { + // this is a long-running test + this.timeout(4000); + + var call_order = [], + delays = [40,10,60,10]; + + // order of completion: 1,2,3,4 + // create queue that only complete 1 task a second + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + callback('error', 'arg'); + }, delays.shift()); + },1, 1); + var start = new Date().getTime(); + + q.push(1, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(3); + call_order.push('callback ' + 1); + }); + q.push(2, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(2); + call_order.push('callback ' + 2); + }); + q.push(3, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(1); + call_order.push('callback ' + 3); + }); + q.push(4, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(0); + call_order.push('callback ' + 4); + }); + expect(q.length()).to.equal(4); + expect(q.concurrency).to.equal(1); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 1', 'callback 1', + 'process 2', 'callback 2', + 'process 3', 'callback 3', + 'process 4', 'callback 4' + ]); + expect(q.concurrency).to.equal(1); + expect(q.length()).to.equal(0); + q.kill(); + // with 4 tasks, running at 1 per second + // this test should take over 3 seconds + expect((new Date().getTime() - start > 3)).to.equal(true); + done(); + }; + }); + + it('zero concurrency', function(done){ + expect(function () { + async.queue(function (task, callback) { + callback(null, task); + }, 0); + }).to.throw(); + done(); + }); + it('error propagation', function(done){ var results = [];