diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 1830eea66decc9..01f95104b8e7d8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -676,12 +676,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.end(); } - // When the dest drains, it reduces the awaitDrain counter - // on the source. This would be more elegant with a .once() - // handler in flow(), but adding and removing repeatedly is - // too slow. - const ondrain = pipeOnDrain(src); - dest.on('drain', ondrain); + let ondrain; var cleanedUp = false; function cleanup() { @@ -689,7 +684,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // Cleanup event handlers once the pipe is broken dest.removeListener('close', onclose); dest.removeListener('finish', onfinish); - dest.removeListener('drain', ondrain); + if (ondrain) { + dest.removeListener('drain', ondrain); + } dest.removeListener('error', onerror); dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); @@ -703,7 +700,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // flowing again. // So, if this is awaiting a drain, then we just call it now. // If we don't know, then assume that we are waiting for one. - if (state.awaitDrain && + if (ondrain && state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain(); } @@ -722,6 +719,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('false write response, pause', state.awaitDrain); state.awaitDrain++; } + if (!ondrain) { + // When the dest drains, it reduces the awaitDrain counter + // on the source. This would be more elegant with a .once() + // handler in flow(), but adding and removing repeatedly is + // too slow. + ondrain = pipeOnDrain(src); + dest.on('drain', ondrain); + } src.pause(); } } diff --git a/test/parallel/test-stream-pipe-flow.js b/test/parallel/test-stream-pipe-flow.js index b696821c0d51fb..1f2e8f54cec409 100644 --- a/test/parallel/test-stream-pipe-flow.js +++ b/test/parallel/test-stream-pipe-flow.js @@ -1,5 +1,6 @@ 'use strict'; const common = require('../common'); +const assert = require('assert'); const { Readable, Writable, PassThrough } = require('stream'); { @@ -65,3 +66,25 @@ const { Readable, Writable, PassThrough } = require('stream'); wrapper.resume(); wrapper.on('end', common.mustCall()); } + +{ + // Only register drain if there is backpressure. + const rs = new Readable({ read() {} }); + + const pt = rs + .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); + assert.strictEqual(pt.listenerCount('drain'), 0); + pt.on('finish', () => { + assert.strictEqual(pt.listenerCount('drain'), 0); + }); + + rs.push('asd'); + assert.strictEqual(pt.listenerCount('drain'), 0); + + process.nextTick(() => { + rs.push('asd'); + assert.strictEqual(pt.listenerCount('drain'), 0); + rs.push(null); + assert.strictEqual(pt.listenerCount('drain'), 0); + }); +} diff --git a/test/parallel/test-stream2-readable-legacy-drain.js b/test/parallel/test-stream2-readable-legacy-drain.js index 69780a078db216..beb36577766137 100644 --- a/test/parallel/test-stream2-readable-legacy-drain.js +++ b/test/parallel/test-stream2-readable-legacy-drain.js @@ -52,11 +52,4 @@ function drain() { w.end = common.mustCall(); -// Just for kicks, let's mess with the drain count. -// This verifies that even if it gets negative in the -// pipe() cleanup function, we'll still function properly. -r.on('readable', function() { - w.emit('drain'); -}); - r.pipe(w);