From af29120fa7e1aa52e25eb69b921f67cbab704a76 Mon Sep 17 00:00:00 2001 From: Jason Date: Mon, 13 May 2024 01:24:43 +0800 Subject: [PATCH] stream: use `ByteLengthQueuingStrategy` when not in `objectMode` Fixes: https://github.com/nodejs/node/issues/46347 PR-URL: https://github.com/nodejs/node/pull/48847 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- lib/internal/webstreams/adapters.js | 7 +-- test/parallel/test-stream-readable-to-web.js | 62 ++++++++++++++++++++ 2 files changed, 64 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream-readable-to-web.js diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 878fd0a0d4d76b..8993ced806d1ff 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -25,6 +25,7 @@ const { const { CountQueuingStrategy, + ByteLengthQueuingStrategy, } = require('internal/webstreams/queuingstrategies'); const { @@ -417,11 +418,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj return new CountQueuingStrategy({ highWaterMark }); } - // When not running in objectMode explicitly, we just fall - // back to a minimal strategy that just specifies the highWaterMark - // and no size algorithm. Using a ByteLengthQueuingStrategy here - // is unnecessary. - return { highWaterMark }; + return new ByteLengthQueuingStrategy({ highWaterMark }); }; const strategy = evaluateStrategyOrFallback(options?.strategy); diff --git a/test/parallel/test-stream-readable-to-web.js b/test/parallel/test-stream-readable-to-web.js new file mode 100644 index 00000000000000..753672b509c173 --- /dev/null +++ b/test/parallel/test-stream-readable-to-web.js @@ -0,0 +1,62 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) { common.skip('missing crypto'); } + +const { Readable } = require('stream'); +const process = require('process'); +const { randomBytes } = require('crypto'); +const assert = require('assert'); + +// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707 +// edit: make it cross-platform as /dev/urandom is not available on Windows +{ + let currentMemoryUsage = process.memoryUsage().arrayBuffers; + + // We initialize a stream, but not start consuming it + const randomNodeStream = new Readable({ + read(size) { + randomBytes(size, (err, buffer) => { + if (err) { + // If an error occurs, emit an 'error' event + this.emit('error', err); + return; + } + + // Push the random bytes to the stream + this.push(buffer); + }); + } + }); + // after 2 seconds, it'll get converted to web stream + let randomWebStream; + + // We check memory usage every second + // since it's a stream, it shouldn't be higher than the chunk size + const reportMemoryUsage = () => { + const { arrayBuffers } = process.memoryUsage(); + currentMemoryUsage = arrayBuffers; + + assert(currentMemoryUsage <= 256 * 1024 * 1024); + }; + setInterval(reportMemoryUsage, 1000); + + // after 1 second we use Readable.toWeb + // memory usage should stay pretty much the same since it's still a stream + setTimeout(() => { + randomWebStream = Readable.toWeb(randomNodeStream); + }, 1000); + + // after 2 seconds we start consuming the stream + // memory usage will grow, but the old chunks should be garbage-collected pretty quickly + setTimeout(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of randomWebStream) { + // Do nothing, just let the stream flow + } + }, 2000); + + setTimeout(() => { + // Test considered passed if we don't crash + process.exit(0); + }, 5000); +}