Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: expose FixedQueue internally and fix nextTick bug #20468

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions lib/internal/fixed_queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'use strict';

// Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two.
const kSize = 2048;
const kMask = kSize - 1;

// The FixedQueue is implemented as a singly-linked list of fixed-size
// circular buffers. It looks something like this:
//
// head tail
// | |
// v v
// +-----------+ <-----\ +-----------+ <------\ +-----------+
// | [null] | \----- | next | \------- | next |
// +-----------+ +-----------+ +-----------+
// | item | <-- bottom | item | <-- bottom | [empty] |
// | item | | item | | [empty] |
// | item | | item | | [empty] |
// | item | | item | | [empty] |
// | item | | item | bottom --> | item |
// | item | | item | | item |
// | ... | | ... | | ... |
// | item | | item | | item |
// | item | | item | | item |
// | [empty] | <-- top | item | | item |
// | [empty] | | item | | item |
// | [empty] | | [empty] | <-- top top --> | [empty] |
// +-----------+ +-----------+ +-----------+
//
// Or, if there is only one circular buffer, it looks something
// like either of these:
//
// head tail head tail
// | | | |
// v v v v
// +-----------+ +-----------+
// | [null] | | [null] |
// +-----------+ +-----------+
// | [empty] | | item |
// | [empty] | | item |
// | item | <-- bottom top --> | [empty] |
// | item | | [empty] |
// | [empty] | <-- top bottom --> | item |
// | [empty] | | item |
// +-----------+ +-----------+
//
// Adding a value means moving `top` forward by one, removing means
// moving `bottom` forward by one. After reaching the end, the queue
// wraps around.
//
// When `top === bottom` the current queue is empty and when
// `top + 1 === bottom` it's full. This wastes a single space of storage
// but allows much quicker checks.

const FixedCircularBuffer = class FixedCircularBuffer {
constructor() {
this.bottom = 0;
this.top = 0;
this.list = new Array(kSize);
this.next = null;
}

isEmpty() {
return this.top === this.bottom;
}

isFull() {
return ((this.top + 1) & kMask) === this.bottom;
}

push(data) {
this.list[this.top] = data;
this.top = (this.top + 1) & kMask;
}

shift() {
const nextItem = this.list[this.bottom];
if (nextItem === undefined)
return null;
this.list[this.bottom] = undefined;
this.bottom = (this.bottom + 1) & kMask;
return nextItem;
}
};

module.exports = class FixedQueue {
constructor() {
this.head = this.tail = new FixedCircularBuffer();
}

isEmpty() {
return this.head.isEmpty();
}

push(data) {
if (this.head.isFull()) {
// Head is full: Creates a new queue, sets the old queue's `.next` to it,
// and sets it as the new main queue.
this.head = this.head.next = new FixedCircularBuffer();
}
this.head.push(data);
}

shift() {
const { tail } = this;
const next = tail.shift();
if (tail.isEmpty() && tail.next !== null) {
// If there is another queue, it forms the new tail.
this.tail = tail.next;
}
return next;
}
};
124 changes: 8 additions & 116 deletions lib/internal/process/next_tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ function setupNextTick() {
} = require('internal/async_hooks');
const promises = require('internal/process/promises');
const { ERR_INVALID_CALLBACK } = require('internal/errors').codes;
const FixedQueue = require('internal/fixed_queue');
const { emitPromiseRejectionWarnings } = promises;

// tickInfo is used so that the C++ code in src/node.cc can
Expand All @@ -31,119 +32,7 @@ function setupNextTick() {
const kHasScheduled = 0;
const kHasPromiseRejections = 1;

// Queue size for each tick array. Must be a power of two.
const kQueueSize = 2048;
const kQueueMask = kQueueSize - 1;

// The next tick queue is implemented as a singly-linked list of fixed-size
// circular buffers. It looks something like this:
//
// head tail
// | |
// v v
// +-----------+ <-----\ +-----------+ <------\ +-----------+
// | [null] | \----- | next | \------- | next |
// +-----------+ +-----------+ +-----------+
// | tick | <-- bottom | tick | <-- bottom | [empty] |
// | tick | | tick | | [empty] |
// | tick | | tick | | [empty] |
// | tick | | tick | | [empty] |
// | tick | | tick | bottom --> | tick |
// | tick | | tick | | tick |
// | ... | | ... | | ... |
// | tick | | tick | | tick |
// | tick | | tick | | tick |
// | [empty] | <-- top | tick | | tick |
// | [empty] | | tick | | tick |
// | [empty] | | tick | | tick |
// +-----------+ +-----------+ <-- top top --> +-----------+
//
// Or, if there is only one fixed-size queue, it looks something
// like either of these:
//
// head tail head tail
// | | | |
// v v v v
// +-----------+ +-----------+
// | [null] | | [null] |
// +-----------+ +-----------+
// | [empty] | | tick |
// | [empty] | | tick |
// | tick | <-- bottom top --> | [empty] |
// | tick | | [empty] |
// | [empty] | <-- top bottom --> | tick |
// | [empty] | | tick |
// +-----------+ +-----------+
//
// Adding a value means moving `top` forward by one, removing means
// moving `bottom` forward by one.
//
// We let `bottom` and `top` wrap around, so when `top` is conceptually
// pointing to the end of the list, that means that the actual value is `0`.
//
// In particular, when `top === bottom`, this can mean *either* that the
// current queue is empty or that it is full. We can differentiate by
// checking whether an entry in the queue is empty (a.k.a. `=== undefined`).

class FixedQueue {
constructor() {
this.bottom = 0;
this.top = 0;
this.list = new Array(kQueueSize);
this.next = null;
}

push(data) {
this.list[this.top] = data;
this.top = (this.top + 1) & kQueueMask;
}

shift() {
const nextItem = this.list[this.bottom];
if (nextItem === undefined)
return null;
this.list[this.bottom] = undefined;
this.bottom = (this.bottom + 1) & kQueueMask;
return nextItem;
}
}

var head = new FixedQueue();
var tail = head;

function push(data) {
if (head.bottom === head.top) {
// Either empty or full:
if (head.list[head.top] !== undefined) {
// It's full: Creates a new queue, sets the old queue's `.next` to it,
// and sets it as the new main queue.
head = head.next = new FixedQueue();
} else {
// If the head is empty, that means that it was the only fixed-sized
// queue in existence.
DCHECK_EQ(head.next, null);
// This is the first tick object in existence, so we need to inform
// the C++ side that we do want to run `_tickCallback()`.
tickInfo[kHasScheduled] = 1;
}
}
head.push(data);
}

function shift() {
const next = tail.shift();
if (tail.top === tail.bottom) { // -> .shift() emptied the current queue.
if (tail.next !== null) {
// If there is another queue, it forms the new tail.
tail = tail.next;
} else {
// We've just run out of items. Let the native side know that it
// doesn't need to bother calling into JS to run the queue.
tickInfo[kHasScheduled] = 0;
}
}
return next;
}
const queue = new FixedQueue();

process.nextTick = nextTick;
// Needs to be accessible from beyond this scope.
Expand All @@ -152,7 +41,7 @@ function setupNextTick() {
function _tickCallback() {
let tock;
do {
while (tock = shift()) {
while (tock = queue.shift()) {
const asyncId = tock[async_id_symbol];
emitBefore(asyncId, tock[trigger_async_id_symbol]);
// emitDestroy() places the async_id_symbol into an asynchronous queue
Expand All @@ -175,8 +64,9 @@ function setupNextTick() {

emitAfter(asyncId);
}
tickInfo[kHasScheduled] = 0;
runMicrotasks();
} while (head.top !== head.bottom || emitPromiseRejectionWarnings());
} while (!queue.isEmpty() || emitPromiseRejectionWarnings());
tickInfo[kHasPromiseRejections] = 0;
}

Expand Down Expand Up @@ -222,6 +112,8 @@ function setupNextTick() {
args[i - 1] = arguments[i];
}

push(new TickObject(callback, args, getDefaultTriggerAsyncId()));
if (queue.isEmpty())
tickInfo[kHasScheduled] = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

queue.push(new TickObject(callback, args, getDefaultTriggerAsyncId()));
}
}
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
'lib/internal/constants.js',
'lib/internal/encoding.js',
'lib/internal/errors.js',
'lib/internal/fixed_queue.js',
'lib/internal/freelist.js',
'lib/internal/fs.js',
'lib/internal/http.js',
Expand Down
34 changes: 34 additions & 0 deletions test/parallel/test-fixed-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Flags: --expose-internals
'use strict';

require('../common');

const assert = require('assert');
const FixedQueue = require('internal/fixed_queue');

{
const queue = new FixedQueue();
assert.strictEqual(queue.head, queue.tail);
assert(queue.isEmpty());
queue.push('a');
assert(!queue.isEmpty());
assert.strictEqual(queue.shift(), 'a');
assert.strictEqual(queue.shift(), null);
}

{
const queue = new FixedQueue();
for (let i = 0; i < 2047; i++)
queue.push('a');
assert(queue.head.isFull());
queue.push('a');
assert(!queue.head.isFull());

assert.notStrictEqual(queue.head, queue.tail);
for (let i = 0; i < 2047; i++)
assert.strictEqual(queue.shift(), 'a');
assert.strictEqual(queue.head, queue.tail);
assert(!queue.isEmpty());
assert.strictEqual(queue.shift(), 'a');
assert(queue.isEmpty());
}
18 changes: 18 additions & 0 deletions test/parallel/test-next-tick-fixed-queue-regression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict';

const common = require('../common');

// This tests a highly specific regression tied to the FixedQueue size, which
// was introduced in Node.js 9.7.0: https://github.com/nodejs/node/pull/18617
// More specifically, a nextTick list could potentially end up not fully
// clearing in one run through if exactly 2048 ticks were added after
// microtasks were executed within the nextTick loop.

process.nextTick(() => {
Promise.resolve(1).then(() => {
for (let i = 0; i < 2047; i++)
process.nextTick(common.mustCall());
const immediate = setImmediate(common.mustNotCall());
process.nextTick(common.mustCall(() => clearImmediate(immediate)));
});
});