Skip to content

Commit

Permalink
ensure q.workersList() contains items being processed [fixes #1428] (#…
Browse files Browse the repository at this point in the history
…1429)

* ensure q.workersList() contains items being processed [fixes #1428]

* remove newline

* improve q.workersList() test
  • Loading branch information
hargasinski authored Jun 11, 2017
1 parent ec9dab7 commit 0364ff3
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 5 deletions.
5 changes: 3 additions & 2 deletions lib/internal/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ export default function queue(worker, concurrency, payload) {

for (var i = 0, l = tasks.length; i < l; i++) {
var task = tasks[i];

var index = indexOf(workersList, task, 0);
if (index >= 0) {
workersList.splice(index)
workersList.splice(index, 1);
}

task.callback.apply(task, arguments);
Expand Down Expand Up @@ -118,11 +119,11 @@ export default function queue(worker, concurrency, payload) {
for (var i = 0; i < l; i++) {
var node = q._tasks.shift();
tasks.push(node);
workersList.push(node);
data.push(node.data);
}

numRunning += 1;
workersList.push(tasks[0]);

if (q._tasks.length === 0) {
q.empty();
Expand Down
55 changes: 54 additions & 1 deletion mocha_test/cargo.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ describe('cargo', function () {

it('expose payload', function (done) {
var called_once = false;
var cargo= async.cargo(function(tasks, cb) {
var cargo = async.cargo(function(tasks, cb) {
if (!called_once) {
expect(cargo.payload).to.equal(1);
assert(tasks.length === 1, 'should start with payload = 1');
Expand All @@ -261,4 +261,57 @@ describe('cargo', function () {
}, 15);
});

it('workersList', function(done) {
var called_once = false;

function getWorkersListData(cargo) {
return cargo.workersList().map(function(v) {
return v.data;
});
}

var cargo = async.cargo(function(tasks, cb) {
if (!called_once) {
expect(tasks).to.eql(['foo', 'bar']);
} else {
expect(tasks).to.eql(['baz']);
}
expect(getWorkersListData(cargo)).to.eql(tasks);
async.setImmediate(function() {
// ensure nothing has changed
expect(getWorkersListData(cargo)).to.eql(tasks);
called_once = true;
cb();
});
}, 2);

cargo.drain = function() {
expect(cargo.workersList()).to.eql([]);
expect(cargo.running()).to.equal(0);
done();
};

cargo.push('foo');
cargo.push('bar');
cargo.push('baz');
});

it('running', function(done) {
var cargo = async.cargo(function(tasks, cb) {
expect(cargo.running()).to.equal(1);
async.setImmediate(function() {
expect(cargo.running()).to.equal(1);
cb();
});
}, 2);

cargo.drain = function() {
expect(cargo.running()).to.equal(0);
done();
};

cargo.push('foo');
cargo.push('bar');
cargo.push('baz');
})
});
65 changes: 63 additions & 2 deletions mocha_test/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ describe('queue', function(){
});
});

context('q.unsaturated(): ',function() {
context('q.unsaturated(): ', function() {
it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){
var calls = [];
var q = async.queue(function(task, cb) {
Expand Down Expand Up @@ -719,6 +719,68 @@ describe('queue', function(){
});
});

context('workersList', function() {
it('should be the same length as running()', function(done) {
var q = async.queue(function(task, cb) {
async.setImmediate(function() {
expect(q.workersList().length).to.equal(q.running());
cb();
});
}, 2);

q.drain = function() {
expect(q.workersList().length).to.equal(0);
expect(q.running()).to.equal(0);
done();
};

q.push('foo');
q.push('bar');
q.push('baz');
});

it('should contain the items being processed', function(done) {
var itemsBeingProcessed = {
'foo': ['foo'],
'foo_cb': ['foo', 'bar'],
'bar': ['foo', 'bar'],
'bar_cb': ['bar', 'baz'],
'baz': ['bar', 'baz'],
'baz_cb': ['baz']
};

function getWorkersListData(q) {
return q.workersList().map(function(v) {
return v.data;
});
}

var q = async.queue(function(task, cb) {
expect(
getWorkersListData(q)
).to.eql(itemsBeingProcessed[task]);
expect(q.workersList().length).to.equal(q.running());
async.setImmediate(function() {
expect(
getWorkersListData(q)
).to.eql(itemsBeingProcessed[task+'_cb']);
expect(q.workersList().length).to.equal(q.running());
cb();
});
}, 2);

q.drain = function() {
expect(q.workersList()).to.eql([]);
expect(q.workersList().length).to.equal(q.running());
done();
};

q.push('foo');
q.push('bar');
q.push('baz');
});
})

it('remove', function(done) {
var result = [];
var q = async.queue(function(data, cb) {
Expand All @@ -738,4 +800,3 @@ describe('queue', function(){
}
});
});

0 comments on commit 0364ff3

Please sign in to comment.