From 824dc576db6399195298206270ce5f446ee34359 Mon Sep 17 00:00:00 2001 From: Weijia Wang Date: Thu, 11 Jul 2019 23:14:49 +0800 Subject: [PATCH] stream: simplify `.pipe()` and `.unpipe()` in Readable Now we are using `pipes` and `pipesCount` in Readable state and the `pipes` value can be a stream or an array of streams. This change reducing them into one `pipes` value, which is an array of streams. PR-URL: https://github.com/nodejs/node/pull/28583 Reviewed-By: Ruben Bridgewater Reviewed-By: Matteo Collina Reviewed-By: Anna Henningsen --- lib/_stream_readable.js | 62 +++++-------------- ...test-stream-pipe-same-destination-twice.js | 12 ++-- .../test-stream-pipe-unpipe-streams.js | 7 +-- test/parallel/test-stream-unpipe-event.js | 12 ++-- test/parallel/test-stream2-basic.js | 4 +- 5 files changed, 33 insertions(+), 64 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index d6db7188750ebd..d2de4122bb0e85 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -97,8 +97,7 @@ function ReadableState(options, stream, isDuplex) { // array.shift() this.buffer = new BufferList(); this.length = 0; - this.pipes = null; - this.pipesCount = 0; + this.pipes = []; this.flowing = null; this.ended = false; this.endEmitted = false; @@ -148,6 +147,13 @@ function ReadableState(options, stream, isDuplex) { } } +// Legacy getter for `pipesCount` +Object.defineProperty(ReadableState.prototype, 'pipesCount', { + get() { + return this.pipes.length; + } +}); + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -635,19 +641,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { const src = this; const state = this._readableState; - switch (state.pipesCount) { - case 0: - state.pipes = dest; - break; - case 1: - state.pipes = [state.pipes, dest]; - break; - default: - state.pipes.push(dest); - break; - } - state.pipesCount += 1; - debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); + state.pipes.push(dest); + debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -717,9 +712,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // to get stuck in a permanently paused state if that write // also returned false. // => Check whether `dest` is still a piping destination. - if (((state.pipesCount === 1 && state.pipes === dest) || - (state.pipesCount > 1 && state.pipes.includes(dest))) && - !cleanedUp) { + if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) { debug('false write response, pause', state.awaitDrain); state.awaitDrain++; } @@ -789,38 +782,16 @@ Readable.prototype.unpipe = function(dest) { const unpipeInfo = { hasUnpiped: false }; // If we're not piping anywhere, then do nothing. - if (state.pipesCount === 0) + if (state.pipes.length === 0) return this; - // Just one destination. most common case. - if (state.pipesCount === 1) { - // Passed in one, but it's not the right one. - if (dest && dest !== state.pipes) - return this; - - if (!dest) - dest = state.pipes; - - // got a match. - state.pipes = null; - state.pipesCount = 0; - state.flowing = false; - if (dest) - dest.emit('unpipe', this, unpipeInfo); - return this; - } - - // Slow case with multiple pipe destinations. - if (!dest) { // remove all. var dests = state.pipes; - var len = state.pipesCount; - state.pipes = null; - state.pipesCount = 0; + state.pipes = []; state.flowing = false; - for (var i = 0; i < len; i++) + for (var i = 0; i < dests.length; i++) dests[i].emit('unpipe', this, { hasUnpiped: false }); return this; } @@ -831,9 +802,8 @@ Readable.prototype.unpipe = function(dest) { return this; state.pipes.splice(index, 1); - state.pipesCount -= 1; - if (state.pipesCount === 1) - state.pipes = state.pipes[0]; + if (state.pipes.length === 0) + state.flowing = false; dest.emit('unpipe', this, unpipeInfo); diff --git a/test/parallel/test-stream-pipe-same-destination-twice.js b/test/parallel/test-stream-pipe-same-destination-twice.js index 1824c0606451a2..ff71639588ea49 100644 --- a/test/parallel/test-stream-pipe-same-destination-twice.js +++ b/test/parallel/test-stream-pipe-same-destination-twice.js @@ -20,15 +20,15 @@ const { PassThrough, Writable } = require('stream'); passThrough.pipe(dest); assert.strictEqual(passThrough._events.data.length, 2); - assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes.length, 2); assert.strictEqual(passThrough._readableState.pipes[0], dest); assert.strictEqual(passThrough._readableState.pipes[1], dest); passThrough.unpipe(dest); assert.strictEqual(passThrough._events.data.length, 1); - assert.strictEqual(passThrough._readableState.pipesCount, 1); - assert.strictEqual(passThrough._readableState.pipes, dest); + assert.strictEqual(passThrough._readableState.pipes.length, 1); + assert.deepStrictEqual(passThrough._readableState.pipes, [dest]); passThrough.write('foobar'); passThrough.pipe(dest); @@ -47,7 +47,7 @@ const { PassThrough, Writable } = require('stream'); passThrough.pipe(dest); assert.strictEqual(passThrough._events.data.length, 2); - assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes.length, 2); assert.strictEqual(passThrough._readableState.pipes[0], dest); assert.strictEqual(passThrough._readableState.pipes[1], dest); @@ -64,7 +64,7 @@ const { PassThrough, Writable } = require('stream'); passThrough.pipe(dest); assert.strictEqual(passThrough._events.data.length, 2); - assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes.length, 2); assert.strictEqual(passThrough._readableState.pipes[0], dest); assert.strictEqual(passThrough._readableState.pipes[1], dest); @@ -72,7 +72,7 @@ const { PassThrough, Writable } = require('stream'); passThrough.unpipe(dest); assert.strictEqual(passThrough._events.data, undefined); - assert.strictEqual(passThrough._readableState.pipesCount, 0); + assert.strictEqual(passThrough._readableState.pipes.length, 0); passThrough.write('foobar'); } diff --git a/test/parallel/test-stream-pipe-unpipe-streams.js b/test/parallel/test-stream-pipe-unpipe-streams.js index c8a383bc61a24b..4cb8413af22f18 100644 --- a/test/parallel/test-stream-pipe-unpipe-streams.js +++ b/test/parallel/test-stream-pipe-unpipe-streams.js @@ -22,7 +22,7 @@ assert.strictEqual(source._readableState.pipes.length, 2); source.unpipe(dest2); -assert.strictEqual(source._readableState.pipes, dest1); +assert.deepStrictEqual(source._readableState.pipes, [dest1]); assert.notStrictEqual(source._readableState.pipes, dest2); dest2.on('unpipe', common.mustNotCall()); @@ -30,7 +30,7 @@ source.unpipe(dest2); source.unpipe(dest1); -assert.strictEqual(source._readableState.pipes, null); +assert.strictEqual(source._readableState.pipes.length, 0); { // Test `cleanup()` if we unpipe all streams. @@ -43,8 +43,7 @@ assert.strictEqual(source._readableState.pipes, null); const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe']; const checkSrcCleanup = common.mustCall(() => { - assert.strictEqual(source._readableState.pipes, null); - assert.strictEqual(source._readableState.pipesCount, 0); + assert.strictEqual(source._readableState.pipes.length, 0); assert.strictEqual(source._readableState.flowing, false); srcCheckEventNames.forEach((eventName) => { diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js index 340502d1a98b7c..46cc8e8cb0ae9e 100644 --- a/test/parallel/test-stream-unpipe-event.js +++ b/test/parallel/test-stream-unpipe-event.js @@ -23,7 +23,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustCall()); src.pipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } @@ -34,7 +34,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); src.pipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 1); + assert.strictEqual(src._readableState.pipes.length, 1); }); } @@ -46,7 +46,7 @@ class NeverEndReadable extends Readable { src.pipe(dest); src.unpipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } @@ -57,7 +57,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustCall()); src.pipe(dest, { end: false }); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } @@ -68,7 +68,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); src.pipe(dest, { end: false }); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 1); + assert.strictEqual(src._readableState.pipes.length, 1); }); } @@ -80,6 +80,6 @@ class NeverEndReadable extends Readable { src.pipe(dest, { end: false }); src.unpipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } diff --git a/test/parallel/test-stream2-basic.js b/test/parallel/test-stream2-basic.js index fa1443fd2ac3a6..094ecabd498171 100644 --- a/test/parallel/test-stream2-basic.js +++ b/test/parallel/test-stream2-basic.js @@ -171,10 +171,10 @@ class TestWriter extends EE { w[0].on('write', function() { if (--writes === 0) { r.unpipe(); - assert.strictEqual(r._readableState.pipes, null); + assert.deepStrictEqual(r._readableState.pipes, []); w[0].end(); r.pipe(w[1]); - assert.strictEqual(r._readableState.pipes, w[1]); + assert.deepStrictEqual(r._readableState.pipes, [w[1]]); } });