From 6993eb08971431d25c71a0f8fe2cf4fdcb5741f3 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 29 Apr 2017 20:25:35 +0200 Subject: [PATCH] stream: fix y.pipe(x)+y.pipe(x)+y.unpipe(x) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix the uncommon situation when a readable stream is piped twice into the same destination stream, and then unpiped once. Previously, the `unpipe` event handlers weren’t able to tell whether they were corresponding to the “right” conceptual pipe that was being removed; this fixes this by adding a counter to the `unpipe` event handler and only removing a single piping destination at most. Fixes: https://github.com/nodejs/node/issues/12718 PR-URL: https://github.com/nodejs/node/pull/12746 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina --- lib/_stream_readable.js | 14 ++-- ...test-stream-pipe-same-destination-twice.js | 78 +++++++++++++++++++ 2 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream-pipe-same-destination-twice.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 7c5f17378292fd..9887bd774e1159 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -518,10 +518,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.once('end', endFn); dest.on('unpipe', onunpipe); - function onunpipe(readable) { + function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { - cleanup(); + if (unpipeInfo && unpipeInfo.hasUnpiped === false) { + unpipeInfo.hasUnpiped = true; + cleanup(); + } } } @@ -647,6 +650,7 @@ function pipeOnDrain(src) { Readable.prototype.unpipe = function(dest) { var state = this._readableState; + var unpipeInfo = { hasUnpiped: false }; // if we're not piping anywhere, then do nothing. if (state.pipesCount === 0) @@ -666,7 +670,7 @@ Readable.prototype.unpipe = function(dest) { state.pipesCount = 0; state.flowing = false; if (dest) - dest.emit('unpipe', this); + dest.emit('unpipe', this, unpipeInfo); return this; } @@ -681,7 +685,7 @@ Readable.prototype.unpipe = function(dest) { state.flowing = false; for (var i = 0; i < len; i++) - dests[i].emit('unpipe', this); + dests[i].emit('unpipe', this, unpipeInfo); return this; } @@ -695,7 +699,7 @@ Readable.prototype.unpipe = function(dest) { if (state.pipesCount === 1) state.pipes = state.pipes[0]; - dest.emit('unpipe', this); + dest.emit('unpipe', this, unpipeInfo); return this; }; diff --git a/test/parallel/test-stream-pipe-same-destination-twice.js b/test/parallel/test-stream-pipe-same-destination-twice.js new file mode 100644 index 00000000000000..1824c0606451a2 --- /dev/null +++ b/test/parallel/test-stream-pipe-same-destination-twice.js @@ -0,0 +1,78 @@ +'use strict'; +const common = require('../common'); + +// Regression test for https://github.com/nodejs/node/issues/12718. +// Tests that piping a source stream twice to the same destination stream +// works, and that a subsequent unpipe() call only removes the pipe *once*. +const assert = require('assert'); +const { PassThrough, Writable } = require('stream'); + +{ + const passThrough = new PassThrough(); + const dest = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert.strictEqual(`${chunk}`, 'foobar'); + cb(); + }) + }); + + passThrough.pipe(dest); + passThrough.pipe(dest); + + assert.strictEqual(passThrough._events.data.length, 2); + assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes[0], dest); + assert.strictEqual(passThrough._readableState.pipes[1], dest); + + passThrough.unpipe(dest); + + assert.strictEqual(passThrough._events.data.length, 1); + assert.strictEqual(passThrough._readableState.pipesCount, 1); + assert.strictEqual(passThrough._readableState.pipes, dest); + + passThrough.write('foobar'); + passThrough.pipe(dest); +} + +{ + const passThrough = new PassThrough(); + const dest = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert.strictEqual(`${chunk}`, 'foobar'); + cb(); + }, 2) + }); + + passThrough.pipe(dest); + passThrough.pipe(dest); + + assert.strictEqual(passThrough._events.data.length, 2); + assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes[0], dest); + assert.strictEqual(passThrough._readableState.pipes[1], dest); + + passThrough.write('foobar'); +} + +{ + const passThrough = new PassThrough(); + const dest = new Writable({ + write: common.mustNotCall() + }); + + passThrough.pipe(dest); + passThrough.pipe(dest); + + assert.strictEqual(passThrough._events.data.length, 2); + assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes[0], dest); + assert.strictEqual(passThrough._readableState.pipes[1], dest); + + passThrough.unpipe(dest); + passThrough.unpipe(dest); + + assert.strictEqual(passThrough._events.data, undefined); + assert.strictEqual(passThrough._readableState.pipesCount, 0); + + passThrough.write('foobar'); +}