Skip to content

Commit

Permalink
stream: use lazy registration for drain for fast destinations
Browse files Browse the repository at this point in the history
PR-URL: #29095
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
  • Loading branch information
ronag authored and Trott committed Aug 16, 2019
1 parent 4111c57 commit 7195cd6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
21 changes: 13 additions & 8 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -676,20 +676,17 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.end();
}

// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
const ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
let ondrain;

var cleanedUp = false;
function cleanup() {
debug('cleanup');
// Cleanup event handlers once the pipe is broken
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
dest.removeListener('drain', ondrain);
if (ondrain) {
dest.removeListener('drain', ondrain);
}
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
Expand All @@ -703,7 +700,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
if (state.awaitDrain &&
if (ondrain && state.awaitDrain &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
Expand All @@ -722,6 +719,14 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
}
src.pause();
}
}
Expand Down
23 changes: 23 additions & 0 deletions test/parallel/test-stream-pipe-flow.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable, Writable, PassThrough } = require('stream');

{
Expand Down Expand Up @@ -65,3 +66,25 @@ const { Readable, Writable, PassThrough } = require('stream');
wrapper.resume();
wrapper.on('end', common.mustCall());
}

{
// Only register drain if there is backpressure.
const rs = new Readable({ read() {} });

const pt = rs
.pipe(new PassThrough({ objectMode: true, highWaterMark: 2 }));
assert.strictEqual(pt.listenerCount('drain'), 0);
pt.on('finish', () => {
assert.strictEqual(pt.listenerCount('drain'), 0);
});

rs.push('asd');
assert.strictEqual(pt.listenerCount('drain'), 0);

process.nextTick(() => {
rs.push('asd');
assert.strictEqual(pt.listenerCount('drain'), 0);
rs.push(null);
assert.strictEqual(pt.listenerCount('drain'), 0);
});
}
7 changes: 0 additions & 7 deletions test/parallel/test-stream2-readable-legacy-drain.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,4 @@ function drain() {

w.end = common.mustCall();

// Just for kicks, let's mess with the drain count.
// This verifies that even if it gets negative in the
// pipe() cleanup function, we'll still function properly.
r.on('readable', function() {
w.emit('drain');
});

r.pipe(w);

0 comments on commit 7195cd6

Please sign in to comment.