Skip to content

Commit

Permalink
stream: fix y.pipe(x)+y.pipe(x)+y.unpipe(x)
Browse files Browse the repository at this point in the history
Fix the uncommon situation when a readable stream is piped twice into
the same destination stream, and then unpiped once.

Previously, the `unpipe` event handlers weren’t able to tell whether
they were corresponding to the “right” conceptual pipe that was being
removed; this fixes this by adding a counter to the `unpipe` event
handler and only removing a single piping destination at most.

Fixes: nodejs#12718
  • Loading branch information
addaleax committed May 2, 2017
1 parent 0f58d3c commit 86b9243
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 5 deletions.
14 changes: 9 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
src.once('end', endFn);

dest.on('unpipe', onunpipe);
function onunpipe(readable) {
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
if (readable === src) {
cleanup();
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
cleanup();
}
}
}

Expand Down Expand Up @@ -647,6 +650,7 @@ function pipeOnDrain(src) {

Readable.prototype.unpipe = function(dest) {
var state = this._readableState;
var unpipeInfo = { hasUnpiped: false };

// if we're not piping anywhere, then do nothing.
if (state.pipesCount === 0)
Expand All @@ -666,7 +670,7 @@ Readable.prototype.unpipe = function(dest) {
state.pipesCount = 0;
state.flowing = false;
if (dest)
dest.emit('unpipe', this);
dest.emit('unpipe', this, unpipeInfo);
return this;
}

Expand All @@ -681,7 +685,7 @@ Readable.prototype.unpipe = function(dest) {
state.flowing = false;

for (var i = 0; i < len; i++)
dests[i].emit('unpipe', this);
dests[i].emit('unpipe', this, unpipeInfo);
return this;
}

Expand All @@ -695,7 +699,7 @@ Readable.prototype.unpipe = function(dest) {
if (state.pipesCount === 1)
state.pipes = state.pipes[0];

dest.emit('unpipe', this);
dest.emit('unpipe', this, unpipeInfo);

return this;
};
Expand Down
78 changes: 78 additions & 0 deletions test/parallel/test-stream-pipe-same-destination-twice.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'use strict';
const common = require('../common');

// Regression test for https://github.com/nodejs/node/issues/12718.
// Tests that piping a source stream twice to the same destination stream
// works, and that a subsequent unpipe() call only removes the pipe *once*.
const assert = require('assert');
const { PassThrough, Writable } = require('stream');

{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(`${chunk}`, 'foobar');
cb();
})
});

passThrough.pipe(dest);
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data.length, 1);
assert.strictEqual(passThrough._readableState.pipesCount, 1);
assert.strictEqual(passThrough._readableState.pipes, dest);

passThrough.write('foobar');
passThrough.pipe(dest);
}

{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(`${chunk}`, 'foobar');
cb();
}, 2)
});

passThrough.pipe(dest);
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.write('foobar');
}

{
const passThrough = new PassThrough();
const dest = new Writable({
write: common.mustNotCall()
});

passThrough.pipe(dest);
passThrough.pipe(dest);

assert.strictEqual(passThrough._events.data.length, 2);
assert.strictEqual(passThrough._readableState.pipesCount, 2);
assert.strictEqual(passThrough._readableState.pipes[0], dest);
assert.strictEqual(passThrough._readableState.pipes[1], dest);

passThrough.unpipe(dest);
passThrough.unpipe(dest);

assert.strictEqual(passThrough._events.data, undefined);
assert.strictEqual(passThrough._readableState.pipesCount, 0);

passThrough.write('foobar');
}

0 comments on commit 86b9243

Please sign in to comment.