Skip to content

Commit

Permalink
stream: fix enqueue race condition on esm modules
Browse files Browse the repository at this point in the history
stream: use nextTick on close

PR-URL: #40901
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
RafaelGSS authored and danielleadams committed Feb 1, 2022
1 parent a6e7cf5 commit 8cf507a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
19 changes: 12 additions & 7 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1446,13 +1446,18 @@ function readableStreamTee(stream, cloneForBranch2) {
});
},
[kClose]() {
reading = false;
if (!canceled1)
readableStreamDefaultControllerClose(branch1[kState].controller);
if (!canceled2)
readableStreamDefaultControllerClose(branch2[kState].controller);
if (!canceled1 || !canceled2)
cancelPromise.resolve();
// The `process.nextTick()` is not part of the spec.
// This approach was needed to avoid a race condition working with esm
// Further information, see: https://github.com/nodejs/node/issues/39758
process.nextTick(() => {
reading = false;
if (!canceled1)
readableStreamDefaultControllerClose(branch1[kState].controller);
if (!canceled2)
readableStreamDefaultControllerClose(branch2[kState].controller);
if (!canceled1 || !canceled2)
cancelPromise.resolve();
});
},
[kError]() {
reading = false;
Expand Down
36 changes: 36 additions & 0 deletions test/parallel/test-whatwg-readablestream.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { mustCall } from '../common/index.mjs';
import { ReadableStream } from 'stream/web';
import assert from 'assert';

{
// Test tee() with close in the nextTick after enqueue
async function read(stream) {
const chunks = [];
for await (const chunk of stream)
chunks.push(chunk);
return Buffer.concat(chunks).toString();
}

const [r1, r2] = new ReadableStream({
start(controller) {
process.nextTick(() => {
controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114]));

process.nextTick(() => {
controller.close();
});
});
}
}).tee();

(async () => {
const [dataReader1, dataReader2] = await Promise.all([
read(r1),
read(r2),
]);

assert.strictEqual(dataReader1, dataReader2);
assert.strictEqual(dataReader1, 'foobar');
assert.strictEqual(dataReader2, 'foobar');
})().then(mustCall());
}

0 comments on commit 8cf507a

Please sign in to comment.