Skip to content

Commit

Permalink
lib: expose FixedQueue internally and fix nextTick bug
Browse files Browse the repository at this point in the history
A bug was introduced together with the FixedQueue implementation for
process.nextTick which meant that the queue wouldn't necessarily
fully clear on each run through. Fix it and abstract the data
structure into an internal module that can later be used elsewhere.

PR-URL: #20468
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Ruben Bridgewater <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
apapirovski authored and MylesBorins committed May 8, 2018
1 parent 73f2dab commit d0cbb4c
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 116 deletions.
113 changes: 113 additions & 0 deletions lib/internal/fixed_queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'use strict';

// Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two.
const kSize = 2048;
const kMask = kSize - 1;

// The FixedQueue is implemented as a singly-linked list of fixed-size
// circular buffers. It looks something like this:
//
// head tail
// | |
// v v
// +-----------+ <-----\ +-----------+ <------\ +-----------+
// | [null] | \----- | next | \------- | next |
// +-----------+ +-----------+ +-----------+
// | item | <-- bottom | item | <-- bottom | [empty] |
// | item | | item | | [empty] |
// | item | | item | | [empty] |
// | item | | item | | [empty] |
// | item | | item | bottom --> | item |
// | item | | item | | item |
// | ... | | ... | | ... |
// | item | | item | | item |
// | item | | item | | item |
// | [empty] | <-- top | item | | item |
// | [empty] | | item | | item |
// | [empty] | | [empty] | <-- top top --> | [empty] |
// +-----------+ +-----------+ +-----------+
//
// Or, if there is only one circular buffer, it looks something
// like either of these:
//
// head tail head tail
// | | | |
// v v v v
// +-----------+ +-----------+
// | [null] | | [null] |
// +-----------+ +-----------+
// | [empty] | | item |
// | [empty] | | item |
// | item | <-- bottom top --> | [empty] |
// | item | | [empty] |
// | [empty] | <-- top bottom --> | item |
// | [empty] | | item |
// +-----------+ +-----------+
//
// Adding a value means moving `top` forward by one, removing means
// moving `bottom` forward by one. After reaching the end, the queue
// wraps around.
//
// When `top === bottom` the current queue is empty and when
// `top + 1 === bottom` it's full. This wastes a single space of storage
// but allows much quicker checks.

const FixedCircularBuffer = class FixedCircularBuffer {
constructor() {
this.bottom = 0;
this.top = 0;
this.list = new Array(kSize);
this.next = null;
}

isEmpty() {
return this.top === this.bottom;
}

isFull() {
return ((this.top + 1) & kMask) === this.bottom;
}

push(data) {
this.list[this.top] = data;
this.top = (this.top + 1) & kMask;
}

shift() {
const nextItem = this.list[this.bottom];
if (nextItem === undefined)
return null;
this.list[this.bottom] = undefined;
this.bottom = (this.bottom + 1) & kMask;
return nextItem;
}
};

module.exports = class FixedQueue {
constructor() {
this.head = this.tail = new FixedCircularBuffer();
}

isEmpty() {
return this.head.isEmpty();
}

push(data) {
if (this.head.isFull()) {
// Head is full: Creates a new queue, sets the old queue's `.next` to it,
// and sets it as the new main queue.
this.head = this.head.next = new FixedCircularBuffer();
}
this.head.push(data);
}

shift() {
const { tail } = this;
const next = tail.shift();
if (tail.isEmpty() && tail.next !== null) {
// If there is another queue, it forms the new tail.
this.tail = tail.next;
}
return next;
}
};
124 changes: 8 additions & 116 deletions lib/internal/process/next_tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ function setupNextTick() {
} = require('internal/async_hooks');
const promises = require('internal/process/promises');
const { ERR_INVALID_CALLBACK } = require('internal/errors').codes;
const FixedQueue = require('internal/fixed_queue');
const { emitPromiseRejectionWarnings } = promises;

// tickInfo is used so that the C++ code in src/node.cc can
Expand All @@ -31,119 +32,7 @@ function setupNextTick() {
const kHasScheduled = 0;
const kHasPromiseRejections = 1;

// Queue size for each tick array. Must be a power of two.
const kQueueSize = 2048;
const kQueueMask = kQueueSize - 1;

// The next tick queue is implemented as a singly-linked list of fixed-size
// circular buffers. It looks something like this:
//
// head tail
// | |
// v v
// +-----------+ <-----\ +-----------+ <------\ +-----------+
// | [null] | \----- | next | \------- | next |
// +-----------+ +-----------+ +-----------+
// | tick | <-- bottom | tick | <-- bottom | [empty] |
// | tick | | tick | | [empty] |
// | tick | | tick | | [empty] |
// | tick | | tick | | [empty] |
// | tick | | tick | bottom --> | tick |
// | tick | | tick | | tick |
// | ... | | ... | | ... |
// | tick | | tick | | tick |
// | tick | | tick | | tick |
// | [empty] | <-- top | tick | | tick |
// | [empty] | | tick | | tick |
// | [empty] | | tick | | tick |
// +-----------+ +-----------+ <-- top top --> +-----------+
//
// Or, if there is only one fixed-size queue, it looks something
// like either of these:
//
// head tail head tail
// | | | |
// v v v v
// +-----------+ +-----------+
// | [null] | | [null] |
// +-----------+ +-----------+
// | [empty] | | tick |
// | [empty] | | tick |
// | tick | <-- bottom top --> | [empty] |
// | tick | | [empty] |
// | [empty] | <-- top bottom --> | tick |
// | [empty] | | tick |
// +-----------+ +-----------+
//
// Adding a value means moving `top` forward by one, removing means
// moving `bottom` forward by one.
//
// We let `bottom` and `top` wrap around, so when `top` is conceptually
// pointing to the end of the list, that means that the actual value is `0`.
//
// In particular, when `top === bottom`, this can mean *either* that the
// current queue is empty or that it is full. We can differentiate by
// checking whether an entry in the queue is empty (a.k.a. `=== undefined`).

class FixedQueue {
constructor() {
this.bottom = 0;
this.top = 0;
this.list = new Array(kQueueSize);
this.next = null;
}

push(data) {
this.list[this.top] = data;
this.top = (this.top + 1) & kQueueMask;
}

shift() {
const nextItem = this.list[this.bottom];
if (nextItem === undefined)
return null;
this.list[this.bottom] = undefined;
this.bottom = (this.bottom + 1) & kQueueMask;
return nextItem;
}
}

var head = new FixedQueue();
var tail = head;

function push(data) {
if (head.bottom === head.top) {
// Either empty or full:
if (head.list[head.top] !== undefined) {
// It's full: Creates a new queue, sets the old queue's `.next` to it,
// and sets it as the new main queue.
head = head.next = new FixedQueue();
} else {
// If the head is empty, that means that it was the only fixed-sized
// queue in existence.
DCHECK_EQ(head.next, null);
// This is the first tick object in existence, so we need to inform
// the C++ side that we do want to run `_tickCallback()`.
tickInfo[kHasScheduled] = 1;
}
}
head.push(data);
}

function shift() {
const next = tail.shift();
if (tail.top === tail.bottom) { // -> .shift() emptied the current queue.
if (tail.next !== null) {
// If there is another queue, it forms the new tail.
tail = tail.next;
} else {
// We've just run out of items. Let the native side know that it
// doesn't need to bother calling into JS to run the queue.
tickInfo[kHasScheduled] = 0;
}
}
return next;
}
const queue = new FixedQueue();

process.nextTick = nextTick;
// Needs to be accessible from beyond this scope.
Expand All @@ -152,7 +41,7 @@ function setupNextTick() {
function _tickCallback() {
let tock;
do {
while (tock = shift()) {
while (tock = queue.shift()) {
const asyncId = tock[async_id_symbol];
emitBefore(asyncId, tock[trigger_async_id_symbol]);
// emitDestroy() places the async_id_symbol into an asynchronous queue
Expand All @@ -175,8 +64,9 @@ function setupNextTick() {

emitAfter(asyncId);
}
tickInfo[kHasScheduled] = 0;
runMicrotasks();
} while (head.top !== head.bottom || emitPromiseRejectionWarnings());
} while (!queue.isEmpty() || emitPromiseRejectionWarnings());
tickInfo[kHasPromiseRejections] = 0;
}

Expand Down Expand Up @@ -222,6 +112,8 @@ function setupNextTick() {
args[i - 1] = arguments[i];
}

push(new TickObject(callback, args, getDefaultTriggerAsyncId()));
if (queue.isEmpty())
tickInfo[kHasScheduled] = 1;
queue.push(new TickObject(callback, args, getDefaultTriggerAsyncId()));
}
}
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
'lib/internal/constants.js',
'lib/internal/encoding.js',
'lib/internal/errors.js',
'lib/internal/fixed_queue.js',
'lib/internal/freelist.js',
'lib/internal/fs.js',
'lib/internal/http.js',
Expand Down
34 changes: 34 additions & 0 deletions test/parallel/test-fixed-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Flags: --expose-internals
'use strict';

require('../common');

const assert = require('assert');
const FixedQueue = require('internal/fixed_queue');

{
const queue = new FixedQueue();
assert.strictEqual(queue.head, queue.tail);
assert(queue.isEmpty());
queue.push('a');
assert(!queue.isEmpty());
assert.strictEqual(queue.shift(), 'a');
assert.strictEqual(queue.shift(), null);
}

{
const queue = new FixedQueue();
for (let i = 0; i < 2047; i++)
queue.push('a');
assert(queue.head.isFull());
queue.push('a');
assert(!queue.head.isFull());

assert.notStrictEqual(queue.head, queue.tail);
for (let i = 0; i < 2047; i++)
assert.strictEqual(queue.shift(), 'a');
assert.strictEqual(queue.head, queue.tail);
assert(!queue.isEmpty());
assert.strictEqual(queue.shift(), 'a');
assert(queue.isEmpty());
}
18 changes: 18 additions & 0 deletions test/parallel/test-next-tick-fixed-queue-regression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict';

const common = require('../common');

// This tests a highly specific regression tied to the FixedQueue size, which
// was introduced in Node.js 9.7.0: https://github.com/nodejs/node/pull/18617
// More specifically, a nextTick list could potentially end up not fully
// clearing in one run through if exactly 2048 ticks were added after
// microtasks were executed within the nextTick loop.

process.nextTick(() => {
Promise.resolve(1).then(() => {
for (let i = 0; i < 2047; i++)
process.nextTick(common.mustCall());
const immediate = setImmediate(common.mustNotCall());
process.nextTick(common.mustCall(() => clearImmediate(immediate)));
});
});

0 comments on commit d0cbb4c

Please sign in to comment.