Skip to content

Commit

Permalink
stream: Fixes missing 'unpipe' event for pipes made with {end: false}
Browse files Browse the repository at this point in the history
  Currently when the destination emits an 'error', 'finish' or 'close'
event the pipe calls unpipe to emit 'unpipe' and trigger the clean up
of all it's listeners.
  When the source emits an 'end' event without {end: false} it calls
end() on the destination leading it to emit a 'close', this will again
lead to the pipe calling unpipe. However the source emitting an 'end'
event along side {end: false} is the only time the cleanup gets ran
directly without unpipe being called. This fixes that so the 'unpipe'
event does get emitted and cleanup in turn gets ran by that event.

Fixes: nodejs#11837
  • Loading branch information
zaide-chris committed Apr 6, 2017
1 parent 65c100a commit 850cd09
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 2 deletions.
4 changes: 2 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stdout &&
dest !== process.stderr;

var endFn = doEnd ? onend : cleanup;
var endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
process.nextTick(endFn);
else
Expand Down Expand Up @@ -547,7 +547,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
src.removeListener('end', cleanup);
src.removeListener('end', unpipe);
src.removeListener('data', ondata);

cleanedUp = true;
Expand Down
69 changes: 69 additions & 0 deletions test/parallel/test-stream-unpipe-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const {Writable, Readable} = require('stream');
class NullWriteable extends Writable {
_write(chunk, encoding, callback) {
return callback();
}
}
class QuickEndReadable extends Readable {
_read() {
this.push(null);
}
}
class NeverEndReadable extends Readable {
_read() {}
}

const dest1 = new NullWriteable();
dest1.on('pipe', common.mustCall(() => {}));
dest1.on('unpipe', common.mustCall(() => {}));
const src1 = new QuickEndReadable();
src1.pipe(dest1);


const dest2 = new NullWriteable();
dest2.on('pipe', common.mustCall(() => {}));
dest2.on('unpipe', () => {
throw new Error('unpipe should not have been emited');
});
const src2 = new NeverEndReadable();
src2.pipe(dest2);

const dest3 = new NullWriteable();
dest3.on('pipe', common.mustCall(() => {}));
dest3.on('unpipe', common.mustCall(() => {}));
const src3 = new NeverEndReadable();
src3.pipe(dest3);
src3.unpipe(dest3);

const dest4 = new NullWriteable();
dest4.on('pipe', common.mustCall(() => {}));
dest4.on('unpipe', common.mustCall(() => {}));
const src4 = new QuickEndReadable();
src4.pipe(dest4, {end: false});

const dest5 = new NullWriteable();
dest5.on('pipe', common.mustCall(() => {}));
dest5.on('unpipe', () => {
throw new Error('unpipe should not have been emited');
});
const src5 = new NeverEndReadable();
src5.pipe(dest5, {end: false});

const dest6 = new NullWriteable();
dest6.on('pipe', common.mustCall(() => {}));
dest6.on('unpipe', common.mustCall(() => {}));
const src6 = new NeverEndReadable();
src6.pipe(dest6, {end: false});
src6.unpipe(dest6);

setImmediate(() => {
assert.strictEqual(src1._readableState.pipesCount, 0);
assert.strictEqual(src2._readableState.pipesCount, 1);
assert.strictEqual(src3._readableState.pipesCount, 0);
assert.strictEqual(src4._readableState.pipesCount, 0);
assert.strictEqual(src5._readableState.pipesCount, 1);
assert.strictEqual(src6._readableState.pipesCount, 0);
});

0 comments on commit 850cd09

Please sign in to comment.