diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 718d4e48478677..595aadc23c8bec 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -74,6 +74,110 @@ function nop() {} const kOnFinished = Symbol('kOnFinished'); +const kObjectMode = 1 << 0; +const kEnded = 1 << 1; +const kConstructed = 1 << 2; +const kSync = 1 << 3; +const kErrorEmitted = 1 << 4; +const kEmitClose = 1 << 5; +const kAutoDestroy = 1 << 6; +const kDestroyed = 1 << 7; +const kClosed = 1 << 8; +const kCloseEmitted = 1 << 9; +const kFinalCalled = 1 << 10; +const kNeedDrain = 1 << 11; +const kEnding = 1 << 12; +const kFinished = 1 << 13; +const kDecodeStrings = 1 << 14; +const kWriting = 1 << 15; +const kBufferProcessing = 1 << 16; +const kPrefinished = 1 << 17; +const kAllBuffers = 1 << 18; +const kAllNoop = 1 << 19; + +// TODO(benjamingr) it is likely slower to do it this way than with free functions +function makeBitMapDescriptor(bit) { + return { + enumerable: false, + get() { return (this.state & bit) !== 0; }, + set(value) { + if (value) this.state |= bit; + else this.state &= ~bit; + }, + }; +} +ObjectDefineProperties(WritableState.prototype, { + // Object stream flag to indicate whether or not this stream + // contains buffers or objects. + objectMode: makeBitMapDescriptor(kObjectMode), + + // if _final has been called. + finalCalled: makeBitMapDescriptor(kFinalCalled), + + // drain event flag. + needDrain: makeBitMapDescriptor(kNeedDrain), + + // At the start of calling end() + ending: makeBitMapDescriptor(kEnding), + + // When end() has been called, and returned. + ended: makeBitMapDescriptor(kEnded), + + // When 'finish' is emitted. + finished: makeBitMapDescriptor(kFinished), + + // Has it been destroyed. + destroyed: makeBitMapDescriptor(kDestroyed), + + // Should we decode strings into buffers before passing to _write? + // this is here so that some node-core streams can optimize string + // handling at a lower level. + decodeStrings: makeBitMapDescriptor(kDecodeStrings), + + // A flag to see when we're in the middle of a write. + writing: makeBitMapDescriptor(kWriting), + + // A flag to be able to tell if the onwrite cb is called immediately, + // or on a later tick. We set this to true at first, because any + // actions that shouldn't happen until "later" should generally also + // not happen before the first write call. + sync: makeBitMapDescriptor(kSync), + + // A flag to know if we're processing previously buffered items, which + // may call the _write() callback in the same tick, so that we don't + // end up in an overlapped onwrite situation. + bufferProcessing: makeBitMapDescriptor(kBufferProcessing), + + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + constructed: makeBitMapDescriptor(kConstructed), + + // Emit prefinish if the only thing we're waiting for is _write cbs + // This is relevant for synchronous Transform streams. + prefinished: makeBitMapDescriptor(kPrefinished), + + // True if the error was already emitted and should not be thrown again. + errorEmitted: makeBitMapDescriptor(kErrorEmitted), + + // Should close be emitted on destroy. Defaults to true. + emitClose: makeBitMapDescriptor(kEmitClose), + + // Should .destroy() be called after 'finish' (and potentially 'end'). + autoDestroy: makeBitMapDescriptor(kAutoDestroy), + + // Indicates whether the stream has finished destroying. + closed: makeBitMapDescriptor(kClosed), + + // True if close has been emitted or would have been emitted + // depending on emitClose. + closeEmitted: makeBitMapDescriptor(kCloseEmitted), + + allBuffers: makeBitMapDescriptor(kAllBuffers), + allNoop: makeBitMapDescriptor(kAllNoop), +}); + function WritableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. @@ -83,13 +187,12 @@ function WritableState(options, stream, isDuplex) { if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Stream.Duplex; - // Object stream flag to indicate whether or not this stream - // contains buffers or objects. - this.objectMode = !!(options && options.objectMode); + // Bit map field to store WritableState more effciently with 1 bit per field + // instead of a V8 slot per field. + this.state = kSync | kConstructed | kEmitClose | kAutoDestroy; - if (isDuplex) - this.objectMode = this.objectMode || - !!(options && options.writableObjectMode); + if (options && options.objectMode) this.state |= kObjectMode; + if (isDuplex && options && options.writableObjectMode) this.state |= kObjectMode; // The point at which write() starts returning false // Note: 0 is a valid value, means that we always return false if @@ -98,26 +201,13 @@ function WritableState(options, stream, isDuplex) { getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : getDefaultHighWaterMark(false); - // if _final has been called. - this.finalCalled = false; + if (!options || options.decodeStrings !== false) this.state |= kDecodeStrings; - // drain event flag. - this.needDrain = false; - // At the start of calling end() - this.ending = false; - // When end() has been called, and returned. - this.ended = false; - // When 'finish' is emitted. - this.finished = false; + // Should close be emitted on destroy. Defaults to true. + if (options && options.emitClose === false) this.state &= ~kEmitClose; - // Has it been destroyed - this.destroyed = false; - - // Should we decode strings into buffers before passing to _write? - // this is here so that some node-core streams can optimize string - // handling at a lower level. - const noDecode = !!(options && options.decodeStrings === false); - this.decodeStrings = !noDecode; + // Should .destroy() be called after 'end' (and potentially 'finish'). + if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. @@ -136,23 +226,9 @@ function WritableState(options, stream, isDuplex) { // socket or file. this.length = 0; - // A flag to see when we're in the middle of a write. - this.writing = false; - // When true all writes will be buffered until .uncork() call. this.corked = 0; - // A flag to be able to tell if the onwrite cb is called immediately, - // or on a later tick. We set this to true at first, because any - // actions that shouldn't happen until "later" should generally also - // not happen before the first write call. - this.sync = true; - - // A flag to know if we're processing previously buffered items, which - // may call the _write() callback in the same tick, so that we don't - // end up in an overlapped onwrite situation. - this.bufferProcessing = false; - // The callback that's passed to _write(chunk, cb). this.onwrite = onwrite.bind(undefined, stream); @@ -172,45 +248,18 @@ function WritableState(options, stream, isDuplex) { // this must be 0 before 'finish' can be emitted. this.pendingcb = 0; - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true; - - // Emit prefinish if the only thing we're waiting for is _write cbs - // This is relevant for synchronous Transform streams. - this.prefinished = false; - - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false; - - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false; - - // Should .destroy() be called after 'finish' (and potentially 'end'). - this.autoDestroy = !options || options.autoDestroy !== false; - // Indicates whether the stream has errored. When true all write() calls // should return false. This is needed since when autoDestroy // is disabled we need a way to tell whether the stream has failed. this.errored = null; - // Indicates whether the stream has finished destroying. - this.closed = false; - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false; - this[kOnFinished] = []; } function resetBuffer(state) { state.buffered = []; state.bufferedIndex = 0; - state.allBuffers = true; - state.allNoop = true; + state.state |= kAllBuffers | kAllNoop; } WritableState.prototype.getBuffer = function getBuffer() { @@ -307,9 +356,9 @@ function _write(stream, chunk, encoding, cb) { if (chunk === null) { throw new ERR_STREAM_NULL_VALUES(); - } else if (!state.objectMode) { + } else if ((state.state & kObjectMode) === 0) { if (typeof chunk === 'string') { - if (state.decodeStrings !== false) { + if ((state.state & kDecodeStrings) !== 0) { chunk = Buffer.from(chunk, encoding); encoding = 'buffer'; } @@ -325,9 +374,9 @@ function _write(stream, chunk, encoding, cb) { } let err; - if (state.ending) { + if ((state.state & kEnding) !== 0) { err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (state.destroyed) { + } else if ((state.state & kDestroyed) !== 0) { err = new ERR_STREAM_DESTROYED('write'); } @@ -354,7 +403,7 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; - if (!state.writing) + if ((state.state & kWriting) === 0) clearBuffer(this, state); } }; @@ -373,7 +422,7 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, callback) { - const len = state.objectMode ? 1 : chunk.length; + const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; state.length += len; @@ -381,42 +430,40 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { const ret = state.length < state.highWaterMark; // We must ensure that previous needDrain will not be reset to false. if (!ret) - state.needDrain = true; + state.state |= kNeedDrain; - if (state.writing || state.corked || state.errored || !state.constructed) { + if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) { state.buffered.push({ chunk, encoding, callback }); - if (state.allBuffers && encoding !== 'buffer') { - state.allBuffers = false; + if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') { + state.state &= ~kAllBuffers; } - if (state.allNoop && callback !== nop) { - state.allNoop = false; + if ((state.state & kAllNoop) !== 0 && callback !== nop) { + state.state &= ~kAllNoop; } } else { state.writelen = len; state.writecb = callback; - state.writing = true; - state.sync = true; + state.state |= kWriting | kSync; stream._write(chunk, encoding, state.onwrite); - state.sync = false; + state.state &= ~kSync; } // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. - return ret && !state.errored && !state.destroyed; + return ret && !state.errored && (state.state & kDestroyed) === 0; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; state.writecb = cb; - state.writing = true; - state.sync = true; - if (state.destroyed) + state.state |= kWriting | kSync; + if ((state.state & kDestroyed) !== 0) state.onwrite(new ERR_STREAM_DESTROYED('write')); else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); - state.sync = false; + state.state &= ~kSync; } function onwriteError(stream, state, er, cb) { @@ -434,7 +481,7 @@ function onwriteError(stream, state, er, cb) { function onwrite(stream, er) { const state = stream._writableState; - const sync = state.sync; + const sync = (state.state & kSync) !== 0; const cb = state.writecb; if (typeof cb !== 'function') { @@ -442,7 +489,7 @@ function onwrite(stream, er) { return; } - state.writing = false; + state.state &= ~kWriting; state.writecb = null; state.length -= state.writelen; state.writelen = 0; @@ -495,10 +542,9 @@ function afterWriteTick({ stream, state, count, cb }) { } function afterWrite(stream, state, count, cb) { - const needDrain = !state.ending && !stream.destroyed && state.length === 0 && - state.needDrain; + const needDrain = (state.state & (kEnding | kNeedDrain)) === kNeedDrain && !stream.destroyed && state.length === 0; if (needDrain) { - state.needDrain = false; + state.state &= ~kNeedDrain; stream.emit('drain'); } @@ -507,7 +553,7 @@ function afterWrite(stream, state, count, cb) { cb(null); } - if (state.destroyed) { + if ((state.state & kDestroyed) !== 0) { errorBuffer(state); } @@ -516,13 +562,13 @@ function afterWrite(stream, state, count, cb) { // If there's something in the buffer waiting, then invoke callbacks. function errorBuffer(state) { - if (state.writing) { + if ((state.state & kWriting) !== 0) { return; } for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { const { chunk, callback } = state.buffered[n]; - const len = state.objectMode ? 1 : chunk.length; + const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; state.length -= len; callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); } @@ -538,13 +584,13 @@ function errorBuffer(state) { // If there's something in the buffer waiting, then process it. function clearBuffer(stream, state) { if (state.corked || - state.bufferProcessing || - state.destroyed || - !state.constructed) { + (state.state & (kDestroyed | kBufferProcessing)) !== 0 || + (state.state & kConstructed) === 0) { return; } - const { buffered, bufferedIndex, objectMode } = state; + const objectMode = (state.state & kObjectMode) !== 0; + const { buffered, bufferedIndex } = state; const bufferedLength = buffered.length - bufferedIndex; if (!bufferedLength) { @@ -553,20 +599,20 @@ function clearBuffer(stream, state) { let i = bufferedIndex; - state.bufferProcessing = true; + state.state |= kBufferProcessing; if (bufferedLength > 1 && stream._writev) { state.pendingcb -= bufferedLength - 1; - const callback = state.allNoop ? nop : (err) => { + const callback = (state.state & kAllNoop) !== 0 ? nop : (err) => { for (let n = i; n < buffered.length; ++n) { buffered[n].callback(err); } }; // Make a copy of `buffered` if it's going to be used by `callback` above, // since `doWrite` will mutate the array. - const chunks = state.allNoop && i === 0 ? + const chunks = (state.state & kAllNoop) !== 0 && i === 0 ? buffered : ArrayPrototypeSlice(buffered, i); - chunks.allBuffers = state.allBuffers; + chunks.allBuffers = (state.state & kAllBuffers) !== 0; doWrite(stream, state, true, state.length, chunks, '', callback); @@ -577,7 +623,7 @@ function clearBuffer(stream, state) { buffered[i++] = null; const len = objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && !state.writing); + } while (i < buffered.length && (state.state & kWriting) === 0); if (i === buffered.length) { resetBuffer(state); @@ -588,7 +634,7 @@ function clearBuffer(stream, state) { state.bufferedIndex = i; } } - state.bufferProcessing = false; + state.state &= ~kBufferProcessing; } Writable.prototype._write = function(chunk, encoding, cb) { @@ -630,26 +676,26 @@ Writable.prototype.end = function(chunk, encoding, cb) { if (err) { // Do nothing... - } else if (!state.errored && !state.ending) { + } else if (!state.errored && (state.state & kEnding) === 0) { // This is forgiving in terms of unnecessary calls to end() and can hide // logic errors. However, usually such errors are harmless and causing a // hard error can be disproportionately destructive. It is not always // trivial for the user to determine whether end() needs to be called // or not. - state.ending = true; + state.state |= kEnding; finishMaybe(this, state, true); - state.ended = true; - } else if (state.finished) { + state.state |= kEnded; + } else if ((state.state & kFinished) !== 0) { err = new ERR_STREAM_ALREADY_FINISHED('end'); - } else if (state.destroyed) { + } else if ((state.state & kDestroyed) !== 0) { err = new ERR_STREAM_DESTROYED('end'); } if (typeof cb === 'function') { if (err) { process.nextTick(cb, err); - } else if (state.finished) { + } else if ((state.state & kFinished) !== 0) { process.nextTick(cb, null); } else { state[kOnFinished].push(cb); @@ -660,16 +706,20 @@ Writable.prototype.end = function(chunk, encoding, cb) { }; function needFinish(state) { - return (state.ending && - !state.destroyed && - state.constructed && + return ( + // State is ended && constructed but not destroyed, finished, writing, errorEmitted or closedEmitted + (state.state & ( + kEnding | + kDestroyed | + kConstructed | + kFinished | + kWriting | + kErrorEmitted | + kCloseEmitted + )) === (kEnding | kConstructed) && state.length === 0 && !state.errored && - state.buffered.length === 0 && - !state.finished && - !state.writing && - !state.errorEmitted && - !state.closeEmitted); + state.buffered.length === 0); } function callFinal(stream, state) { @@ -688,9 +738,9 @@ function callFinal(stream, state) { for (let i = 0; i < onfinishCallbacks.length; i++) { onfinishCallbacks[i](err); } - errorOrDestroy(stream, err, state.sync); + errorOrDestroy(stream, err, (state.state & kSync) !== 0); } else if (needFinish(state)) { - state.prefinished = true; + state.state |= kPrefinished; stream.emit('prefinish'); // Backwards compat. Don't check state.sync here. // Some streams assume 'finish' will be emitted @@ -700,7 +750,7 @@ function callFinal(stream, state) { } } - state.sync = true; + state.state |= kSync; state.pendingcb++; try { @@ -709,16 +759,16 @@ function callFinal(stream, state) { onFinish(err); } - state.sync = false; + state.state &= ~kSync; } function prefinish(stream, state) { - if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === 'function' && !state.destroyed) { - state.finalCalled = true; + if ((state.state & (kPrefinished | kFinalCalled)) === 0) { + if (typeof stream._final === 'function' && (state.state & kDestroyed) === 0) { + state.state |= kFinalCalled; callFinal(stream, state); } else { - state.prefinished = true; + state.state |= kPrefinished; stream.emit('prefinish'); } } @@ -747,7 +797,7 @@ function finishMaybe(stream, state, sync) { function finish(stream, state) { state.pendingcb--; - state.finished = true; + state.state |= kFinished; const onfinishCallbacks = state[kOnFinished].splice(0); for (let i = 0; i < onfinishCallbacks.length; i++) { @@ -756,7 +806,7 @@ function finish(stream, state) { stream.emit('finish'); - if (state.autoDestroy) { + if ((state.state & kAutoDestroy) !== 0) { // In case of duplex streams we need a way to detect // if the readable side is ready for autoDestroy as well. const rState = stream._readableState; @@ -777,20 +827,21 @@ ObjectDefineProperties(Writable.prototype, { closed: { __proto__: null, get() { - return this._writableState ? this._writableState.closed : false; + return this._writableState ? (this._writableState.state & kClosed) !== 0 : false; }, }, destroyed: { __proto__: null, get() { - return this._writableState ? this._writableState.destroyed : false; + return this._writableState ? (this._writableState.state & kDestroyed) !== 0 : false; }, set(value) { // Backward compatibility, the user is explicitly managing destroyed. - if (this._writableState) { - this._writableState.destroyed = value; - } + if (!this._writableState) return; + + if (value) this._writableState.state |= kDestroyed; + else this._writableState.state &= ~kDestroyed; }, }, @@ -802,8 +853,8 @@ ObjectDefineProperties(Writable.prototype, { // where the writable side was disabled upon construction. // Compat. The user might manually disable writable side through // deprecated setter. - return !!w && w.writable !== false && !w.destroyed && !w.errored && - !w.ending && !w.ended; + return !!w && w.writable !== false && !w.errored && + (w.state & (kEnding | kEnded | kDestroyed)) === 0; }, set(val) { // Backwards compatible. @@ -816,14 +867,14 @@ ObjectDefineProperties(Writable.prototype, { writableFinished: { __proto__: null, get() { - return this._writableState ? this._writableState.finished : false; + return this._writableState ? (this._writableState.state & kFinished) !== 0 : false; }, }, writableObjectMode: { __proto__: null, get() { - return this._writableState ? this._writableState.objectMode : false; + return this._writableState ? (this._writableState.state & kObjectMode) !== 0 : false; }, }, @@ -837,7 +888,7 @@ ObjectDefineProperties(Writable.prototype, { writableEnded: { __proto__: null, get() { - return this._writableState ? this._writableState.ending : false; + return this._writableState ? (this._writableState.state & kEnding) !== 0 : false; }, }, @@ -846,7 +897,9 @@ ObjectDefineProperties(Writable.prototype, { get() { const wState = this._writableState; if (!wState) return false; - return !wState.destroyed && !wState.ending && wState.needDrain; + + // !destroyed && !ending && needDrain + return (wState.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain; }, }, @@ -885,8 +938,8 @@ ObjectDefineProperties(Writable.prototype, { get: function() { return !!( this._writableState.writable !== false && - (this._writableState.destroyed || this._writableState.errored) && - !this._writableState.finished + ((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) && + (this._writableState.state & kFinished) === 0 ); }, }, @@ -897,7 +950,7 @@ Writable.prototype.destroy = function(err, cb) { const state = this._writableState; // Invoke pending callbacks. - if (!state.destroyed && + if ((state.state & kDestroyed) === 0 && (state.bufferedIndex < state.buffered.length || state[kOnFinished].length)) { process.nextTick(errorBuffer, state);