Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: use bitmap in readable state #49745

Merged
merged 10 commits into from
Sep 24, 2023
150 changes: 91 additions & 59 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,75 @@ const nop = () => {};

const { errorOrDestroy } = destroyImpl;

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
const kEndEmitted = 1 << 2;
const kReading = 1 << 3;
const kConstructed = 1 << 4;
const kSync = 1 << 5;
const kNeedReadable = 1 << 6;
const kEmittedReadable = 1 << 7;
const kReadableListening = 1 << 8;
const kResumeScheduled = 1 << 9;
const kErrorEmitted = 1 << 10;
const kEmitClose = 1 << 11;
const kAutoDestroy = 1 << 12;
const kDestroyed = 1 << 13;
const kClosed = 1 << 14;
const kCloseEmitted = 1 << 15;
const kMultiAwaitDrain = 1 << 16;
const kReadingMore = 1 << 17;
const kDataEmitted = 1 << 18;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
return {
enumerable: false,
get() { return (this.state & bit) !== 0; },
set(value) {
if (value) this.state |= bit;
else this.state &= ~bit;
},
};
}
ObjectDefineProperties(ReadableState.prototype, {
objectMode: makeBitMapDescriptor(kObjectMode),
ended: makeBitMapDescriptor(kEnded),
endEmitted: makeBitMapDescriptor(kEndEmitted),
reading: makeBitMapDescriptor(kReading),
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
constructed: makeBitMapDescriptor(kConstructed),
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
sync: makeBitMapDescriptor(kSync),
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
needReadable: makeBitMapDescriptor(kNeedReadable),
emittedReadable: makeBitMapDescriptor(kEmittedReadable),
readableListening: makeBitMapDescriptor(kReadableListening),
resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
// True if the error was already emitted and should not be thrown again.
errorEmitted: makeBitMapDescriptor(kErrorEmitted),
emitClose: makeBitMapDescriptor(kEmitClose),
autoDestroy: makeBitMapDescriptor(kAutoDestroy),
// Has it been destroyed.
destroyed: makeBitMapDescriptor(kDestroyed),
// Indicates whether the stream has finished destroying.
closed: makeBitMapDescriptor(kClosed),
// True if close has been emitted or would have been emitted
// depending on emitClose.
closeEmitted: makeBitMapDescriptor(kCloseEmitted),
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
// If true, a maybeReadMore has been scheduled.
readingMore: makeBitMapDescriptor(kReadingMore),
dataEmitted: makeBitMapDescriptor(kDataEmitted),
});

function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
Expand All @@ -92,13 +161,15 @@ function ReadableState(options, stream, isDuplex) {
if (typeof isDuplex !== 'boolean')
isDuplex = stream instanceof Stream.Duplex;

// Bit map field to store ReadableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
this.objectMode = !!(options && options.objectMode);
if (options && options.objectMode) this.state |= kObjectMode;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (options && options.objectMode) this.state |= kObjectMode;
if (options?.objectMode) this.state |= kObjectMode;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that was slower so I followed the pattern, do you know which of the following is faster?

 if (options && options.objectMode) this.state |= kObjectMode;
 if (options?.objectMode) this.state |= kObjectMode;
this.state |= kObjectMode & +options?.objectMode;


if (isDuplex)
this.objectMode = this.objectMode ||
!!(options && options.readableObjectMode);
if (isDuplex && options && options.readableObjectMode)
this.state |= kObjectMode;

// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
Expand All @@ -113,54 +184,22 @@ function ReadableState(options, stream, isDuplex) {
this.length = 0;
this.pipes = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
this.sync = true;

// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this[kPaused] = null;

// True if the error was already emitted and should not be thrown again.
this.errorEmitted = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = !options || options.emitClose !== false;
if (options && options.emitClose === false) this.state &= ~kEmitClose;

// Should .destroy() be called after 'end' (and potentially 'finish').
this.autoDestroy = !options || options.autoDestroy !== false;
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;

// Has it been destroyed.
this.destroyed = false;

// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = null;

// Indicates whether the stream has finished destroying.
this.closed = false;

// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
Expand All @@ -177,12 +216,6 @@ function ReadableState(options, stream, isDuplex) {
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>.
this.awaitDrainWriters = null;
this.multiAwaitDrain = false;

// If true, a maybeReadMore has been scheduled.
this.readingMore = false;

this.dataEmitted = false;

this.decoder = null;
this.encoding = null;
Expand Down Expand Up @@ -263,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
const state = stream._readableState;

let err;
if (!state.objectMode) {
if ((state.state & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
Expand All @@ -290,11 +323,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.reading = false;
state.state &= ~kReading;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you keep this line and the rest of the assignment as is, wouldn't v8 just inline the setter?

I'm asking as it's not straightforward

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you do the changes for Writable we'll go over the system analyzer together and see I basically did the changes based on generated IR/maps

onEofChunk(stream, state);
} else if (state.objectMode || (chunk && chunk.length > 0)) {
} else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if (state.endEmitted)
if ((state.state & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
Expand All @@ -305,7 +338,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.reading = false;
state.state &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
Expand All @@ -317,7 +350,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
}
}
} else if (!addToFront) {
state.reading = false;
state.state &= ~kReading;
maybeReadMore(stream, state);
}

Expand All @@ -333,7 +366,7 @@ function addChunk(stream, state, chunk, addToFront) {
stream.listenerCount('data') > 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if (state.multiAwaitDrain) {
if ((state.state & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
Expand All @@ -349,7 +382,7 @@ function addChunk(stream, state, chunk, addToFront) {
else
state.buffer.push(chunk);

if (state.needReadable)
if ((state.state & kNeedReadable) !== 0)
emitReadable(stream);
}
maybeReadMore(stream, state);
Expand Down Expand Up @@ -404,7 +437,7 @@ function computeNewHighWaterMark(n) {
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
return 0;
if (state.objectMode)
if ((state.state & kObjectMode) !== 0)
return 1;
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
Expand Down Expand Up @@ -435,7 +468,7 @@ Readable.prototype.read = function(n) {
state.highWaterMark = computeNewHighWaterMark(n);

if (n !== 0)
state.emittedReadable = false;
state.state &= ~kEmittedReadable;

// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
Expand Down Expand Up @@ -486,7 +519,7 @@ Readable.prototype.read = function(n) {
// 3. Actually pull the requested chunks out of the buffer and return.

// if we need a readable event, then we need to do some reading.
let doRead = state.needReadable;
let doRead = (state.state & kNeedReadable) !== 0;
ronag marked this conversation as resolved.
Show resolved Hide resolved
debug('need readable', doRead);

// If we currently have less than the highWaterMark, then also read some.
Expand All @@ -504,20 +537,19 @@ Readable.prototype.read = function(n) {
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
state.state |= kReading | kSync;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
state.state |= kNeedReadable;

// Call internal read method
try {
this._read(state.highWaterMark);
} catch (err) {
errorOrDestroy(this, err);
}
state.state &= ~kSync;

state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (!state.reading)
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
Expand Down