From 03e496446ba06bf7f147ec7f8258d0209d618682 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Jul 2020 18:55:50 +0200 Subject: [PATCH 1/3] stream: cleanup and fix Readable.wrap Cleans up Readable.wrap and also ensures destroy is called for certain events. --- lib/_stream_readable.js | 77 ++++++------------- .../test-stream2-readable-wrap-destroy.js | 27 +++++++ test/parallel/test-stream2-readable-wrap.js | 10 +++ 3 files changed, 61 insertions(+), 53 deletions(-) create mode 100644 test/parallel/test-stream2-readable-wrap-destroy.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8ddb9562a41af2..58cf9eb5e741d1 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -26,6 +26,7 @@ const { NumberIsInteger, NumberIsNaN, ObjectDefineProperties, + ObjectKeys, ObjectSetPrototypeOf, Set, SymbolAsyncIterator, @@ -1007,83 +1008,53 @@ function flow(stream) { // This is *not* part of the readable stream interface. // It is an ugly unfortunate mess of history. Readable.prototype.wrap = function(stream) { - const state = this._readableState; - let paused = false; + let paused; - stream.on('end', () => { - debug('wrapped end'); - if (state.decoder && !state.ended) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) - this.push(chunk); - } - - this.push(null); - }); + // TODO (ronag): Should this.destroy(err) emit + // 'error' on the wrapped stream? Would require + // a static factory method, e.g. Readable.wrap(stream). stream.on('data', (chunk) => { - debug('wrapped data'); - if (state.decoder) - chunk = state.decoder.write(chunk); - - // Don't skip over falsy values in objectMode. - if (state.objectMode && (chunk === null || chunk === undefined)) - return; - else if (!state.objectMode && (!chunk || !chunk.length)) - return; - - const ret = this.push(chunk); - if (!ret) { + if (!this.push(chunk) && stream.pause) { paused = true; stream.pause(); } }); - // Proxy all the other methods. Important when wrapping filters and duplexes. - for (const i in stream) { - if (this[i] === undefined && typeof stream[i] === 'function') { - this[i] = function methodWrap(method) { - return function methodWrapReturnFunction() { - return stream[method].apply(stream, arguments); - }; - }(i); - } - } + stream.on('end', () => { + this.push(null); + }); stream.on('error', (err) => { errorOrDestroy(this, err); }); stream.on('close', () => { - // TODO(ronag): Update readable state? - this.emit('close'); + this.destroy(); }); stream.on('destroy', () => { - // TODO(ronag): this.destroy()? - this.emit('destroy'); + this.destroy(); }); - stream.on('pause', () => { - // TODO(ronag): this.pause()? - this.emit('pause'); - }); - - stream.on('resume', () => { - // TODO(ronag): this.resume()? - this.emit('resume'); - }); - - // When we try to consume some more bytes, simply unpause the - // underlying stream. - this._read = (n) => { - debug('wrapped _read', n); - if (paused) { + this._read = () => { + if (paused && stream.resume) { paused = false; stream.resume(); } }; + // Proxy all the other methods. Important when wrapping filters and duplexes. + for (const i of ObjectKeys(stream)) { + if (this[i] === undefined && typeof stream[i] === 'function') { + this[i] = function methodWrap(method) { + return function methodWrapReturnFunction() { + return stream[method].apply(stream, arguments); + }; + }(i); + } + } + return this; }; diff --git a/test/parallel/test-stream2-readable-wrap-destroy.js b/test/parallel/test-stream2-readable-wrap-destroy.js new file mode 100644 index 00000000000000..b0f4714c741202 --- /dev/null +++ b/test/parallel/test-stream2-readable-wrap-destroy.js @@ -0,0 +1,27 @@ +'use strict'; +const common = require('../common'); + +const Readable = require('_stream_readable'); +const EE = require('events').EventEmitter; + +const oldStream = new EE(); +oldStream.pause = () => {}; +oldStream.resume = () => {}; + +{ + new Readable({ + autoDestroy: false, + destroy: common.mustCall() + }) + .wrap(oldStream); + oldStream.emit('destroy'); +} + +{ + new Readable({ + autoDestroy: false, + destroy: common.mustCall() + }) + .wrap(oldStream); + oldStream.emit('close'); +} diff --git a/test/parallel/test-stream2-readable-wrap.js b/test/parallel/test-stream2-readable-wrap.js index 0c9cb5861d936e..69f055fd7e535e 100644 --- a/test/parallel/test-stream2-readable-wrap.js +++ b/test/parallel/test-stream2-readable-wrap.js @@ -44,6 +44,16 @@ function runTest(highWaterMark, objectMode, produce) { flow(); }; + // Make sure pause is only emitted once. + let pausing = false; + r.on('pause', () => { + assert.strictEqual(pausing, false); + pausing = true; + process.nextTick(() => { + pausing = false; + }); + }); + let flowing; let chunks = 10; let oldEnded = false; From 7f91aa4351d55c656eafecaa06a4fa81d372c8ea Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 5 Jul 2020 00:22:39 +0200 Subject: [PATCH 2/3] fixup --- lib/_stream_readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 58cf9eb5e741d1..636752890090f8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -1008,7 +1008,7 @@ function flow(stream) { // This is *not* part of the readable stream interface. // It is an ugly unfortunate mess of history. Readable.prototype.wrap = function(stream) { - let paused; + let paused = false; // TODO (ronag): Should this.destroy(err) emit // 'error' on the wrapped stream? Would require From c2e7f03acb3a76d9a8ba0c90393143afebeacc31 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 6 Jul 2020 22:07:11 +0200 Subject: [PATCH 3/3] fixup --- lib/_stream_readable.js | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 636752890090f8..cab1f4d8785028 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -1047,11 +1047,7 @@ Readable.prototype.wrap = function(stream) { // Proxy all the other methods. Important when wrapping filters and duplexes. for (const i of ObjectKeys(stream)) { if (this[i] === undefined && typeof stream[i] === 'function') { - this[i] = function methodWrap(method) { - return function methodWrapReturnFunction() { - return stream[method].apply(stream, arguments); - }; - }(i); + this[i] = stream[i].bind(stream); } }