From 698a29420f92844478101ec1fccdc81b46954e2e Mon Sep 17 00:00:00 2001 From: ran Date: Mon, 26 Aug 2019 17:00:06 +0800 Subject: [PATCH] stream: fix readable state `awaitDrain` increase in recursion PR-URL: https://github.com/nodejs/node/pull/27572 Reviewed-By: Anna Henningsen --- lib/_stream_readable.js | 65 +++++++++++++++---- ...riters-in-synchronously-recursion-write.js | 28 ++++++++ ...t-stream-pipe-await-drain-manual-resume.js | 25 +++---- ...tream-pipe-await-drain-push-while-write.js | 6 +- test/parallel/test-stream-pipe-await-drain.js | 12 ++-- test/parallel/test-stream2-basic.js | 1 - 6 files changed, 101 insertions(+), 36 deletions(-) create mode 100644 test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ac37930d122d91..cfa36731e3d661 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -134,8 +134,10 @@ function ReadableState(options, stream, isDuplex) { // Everything else in the universe uses 'utf8', though. this.defaultEncoding = options.defaultEncoding || 'utf8'; - // The number of writers that are awaiting a drain event in .pipe()s - this.awaitDrain = 0; + // Ref the piped dest which we need a drain event on it + // type: null | Writable | Set + this.awaitDrainWriters = null; + this.multiAwaitDrain = false; // If true, a maybeReadMore has been scheduled this.readingMore = false; @@ -310,7 +312,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync) { - state.awaitDrain = 0; + // Use the guard to avoid creating `Set()` repeatedly + // when we have multiple pipes. + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; + } stream.emit('data', chunk); } else { // Update the buffer info. @@ -511,7 +519,11 @@ Readable.prototype.read = function(n) { n = 0; } else { state.length -= n; - state.awaitDrain = 0; + if (state.multiAwaitDrain) { + state.awaitDrainWriters.clear(); + } else { + state.awaitDrainWriters = null; + } } if (state.length === 0) { @@ -656,6 +668,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) { const src = this; const state = this._readableState; + if (state.pipes.length === 1) { + if (!state.multiAwaitDrain) { + state.multiAwaitDrain = true; + state.awaitDrainWriters = new Set( + state.awaitDrainWriters ? [state.awaitDrainWriters] : [] + ); + } + } + state.pipes.push(dest); debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); @@ -709,7 +730,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // flowing again. // So, if this is awaiting a drain, then we just call it now. // If we don't know, then assume that we are waiting for one. - if (ondrain && state.awaitDrain && + if (ondrain && state.awaitDrainWriters && (!dest._writableState || dest._writableState.needDrain)) ondrain(); } @@ -724,16 +745,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // to get stuck in a permanently paused state if that write // also returned false. // => Check whether `dest` is still a piping destination. - if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) { - debug('false write response, pause', state.awaitDrain); - state.awaitDrain++; + if (!cleanedUp) { + if (state.pipes.length === 1 && state.pipes[0] === dest) { + debug('false write response, pause', 0); + state.awaitDrainWriters = dest; + state.multiAwaitDrain = false; + } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { + debug('false write response, pause', state.awaitDrainWriters.size); + state.awaitDrainWriters.add(dest); + } } if (!ondrain) { // When the dest drains, it reduces the awaitDrain counter // on the source. This would be more elegant with a .once() // handler in flow(), but adding and removing repeatedly is // too slow. - ondrain = pipeOnDrain(src); + ondrain = pipeOnDrain(src, dest); dest.on('drain', ondrain); } src.pause(); @@ -783,13 +810,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) { return dest; }; -function pipeOnDrain(src) { +function pipeOnDrain(src, dest) { return function pipeOnDrainFunctionResult() { const state = src._readableState; - debug('pipeOnDrain', state.awaitDrain); - if (state.awaitDrain) - state.awaitDrain--; - if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { + + // `ondrain` will call directly, + // `this` maybe not a reference to dest, + // so we use the real dest here. + if (state.awaitDrainWriters === dest) { + debug('pipeOnDrain', 1); + state.awaitDrainWriters = null; + } else if (state.multiAwaitDrain) { + debug('pipeOnDrain', state.awaitDrainWriters.size); + state.awaitDrainWriters.delete(dest); + } + + if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && + EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } diff --git a/test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js b/test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js new file mode 100644 index 00000000000000..110d46bb9f23cc --- /dev/null +++ b/test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js @@ -0,0 +1,28 @@ +'use strict'; +const common = require('../common'); +const { PassThrough } = require('stream'); + +const encode = new PassThrough({ + highWaterMark: 1 +}); + +const decode = new PassThrough({ + highWaterMark: 1 +}); + +const send = common.mustCall((buf) => { + encode.write(buf); +}, 4); + +let i = 0; +const onData = common.mustCall(() => { + if (++i === 2) { + send(Buffer.from([0x3])); + send(Buffer.from([0x4])); + } +}, 4); + +encode.pipe(decode).on('data', onData); + +send(Buffer.from([0x1])); +send(Buffer.from([0x2])); diff --git a/test/parallel/test-stream-pipe-await-drain-manual-resume.js b/test/parallel/test-stream-pipe-await-drain-manual-resume.js index 37acead9969443..a95a5e05aea9ef 100644 --- a/test/parallel/test-stream-pipe-await-drain-manual-resume.js +++ b/test/parallel/test-stream-pipe-await-drain-manual-resume.js @@ -28,10 +28,10 @@ readable.pipe(writable); readable.once('pause', common.mustCall(() => { assert.strictEqual( - readable._readableState.awaitDrain, - 1, - 'Expected awaitDrain to equal 1 but instead got ' + - `${readable._readableState.awaitDrain}` + readable._readableState.awaitDrainWriters, + writable, + 'Expected awaitDrainWriters to be a Writable but instead got ' + + `${readable._readableState.awaitDrainWriters}` ); // First pause, resume manually. The next write() to writable will still // return false, because chunks are still being buffered, so it will increase @@ -43,10 +43,10 @@ readable.once('pause', common.mustCall(() => { readable.once('pause', common.mustCall(() => { assert.strictEqual( - readable._readableState.awaitDrain, - 1, - '.resume() should not reset the counter but instead got ' + - `${readable._readableState.awaitDrain}` + readable._readableState.awaitDrainWriters, + writable, + '.resume() should not reset the awaitDrainWriters, but instead got ' + + `${readable._readableState.awaitDrainWriters}` ); // Second pause, handle all chunks from now on. Once all callbacks that // are currently queued up are handled, the awaitDrain drain counter should @@ -65,10 +65,11 @@ readable.push(null); writable.on('finish', common.mustCall(() => { assert.strictEqual( - readable._readableState.awaitDrain, - 0, - 'awaitDrain should equal 0 after all chunks are written but instead got' + - `${readable._readableState.awaitDrain}` + readable._readableState.awaitDrainWriters, + null, + `awaitDrainWriters should be reset to null + after all chunks are written but instead got + ${readable._readableState.awaitDrainWriters}` ); // Everything okay, all chunks were written. })); diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js index d14ad46cb066ec..6dbf3c669bc177 100644 --- a/test/parallel/test-stream-pipe-await-drain-push-while-write.js +++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js @@ -6,8 +6,8 @@ const assert = require('assert'); const writable = new stream.Writable({ write: common.mustCall(function(chunk, encoding, cb) { assert.strictEqual( - readable._readableState.awaitDrain, - 0 + readable._readableState.awaitDrainWriters, + null, ); if (chunk.length === 32 * 1024) { // first chunk @@ -15,7 +15,7 @@ const writable = new stream.Writable({ // We should check if awaitDrain counter is increased in the next // tick, because awaitDrain is incremented after this method finished process.nextTick(() => { - assert.strictEqual(readable._readableState.awaitDrain, 1); + assert.strictEqual(readable._readableState.awaitDrainWriters, writable); }); } diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js index 9286ceb791ce5f..3ae248e08b854f 100644 --- a/test/parallel/test-stream-pipe-await-drain.js +++ b/test/parallel/test-stream-pipe-await-drain.js @@ -24,10 +24,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) { writer1.once('chunk-received', () => { assert.strictEqual( - reader._readableState.awaitDrain, + reader._readableState.awaitDrainWriters.size, 0, 'awaitDrain initial value should be 0, actual is ' + - reader._readableState.awaitDrain + reader._readableState.awaitDrainWriters ); setImmediate(() => { // This one should *not* get through to writer1 because writer2 is not @@ -39,10 +39,10 @@ writer1.once('chunk-received', () => { // A "slow" consumer: writer2._write = common.mustCall((chunk, encoding, cb) => { assert.strictEqual( - reader._readableState.awaitDrain, + reader._readableState.awaitDrainWriters.size, 1, 'awaitDrain should be 1 after first push, actual is ' + - reader._readableState.awaitDrain + reader._readableState.awaitDrainWriters ); // Not calling cb here to "simulate" slow stream. // This should be called exactly once, since the first .write() call @@ -51,10 +51,10 @@ writer2._write = common.mustCall((chunk, encoding, cb) => { writer3._write = common.mustCall((chunk, encoding, cb) => { assert.strictEqual( - reader._readableState.awaitDrain, + reader._readableState.awaitDrainWriters.size, 2, 'awaitDrain should be 2 after second push, actual is ' + - reader._readableState.awaitDrain + reader._readableState.awaitDrainWriters ); // Not calling cb here to "simulate" slow stream. // This should be called exactly once, since the first .write() call diff --git a/test/parallel/test-stream2-basic.js b/test/parallel/test-stream2-basic.js index 5e0f9c6e913d72..7121f7bda75b02 100644 --- a/test/parallel/test-stream2-basic.js +++ b/test/parallel/test-stream2-basic.js @@ -355,7 +355,6 @@ class TestWriter extends EE { assert.strictEqual(v, null); const w = new R(); - w.write = function(buffer) { written = true; assert.strictEqual(ended, false);