From a4a83613cb1bee9a8d7d7248c8af45044fa37cc7 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Thu, 28 Nov 2024 00:04:18 +0100 Subject: [PATCH] stream: commit pull-into descriptors after filling from queue Fixes: https://github.com/nodejs/node/issues/56044 PR-URL: https://github.com/nodejs/node/pull/56072 Reviewed-By: Matteo Collina Reviewed-By: Matthew Aitken --- lib/internal/webstreams/readablestream.js | 55 +++++++++++++++-------- lib/internal/webstreams/util.js | 11 +++++ 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 2369175733c115..8d884c43c2f9c3 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -94,6 +94,7 @@ const { ArrayBufferViewGetByteLength, ArrayBufferViewGetByteOffset, AsyncIterator, + canCopyArrayBuffer, cloneAsUint8Array, copyArrayBuffer, createPromiseCallback, @@ -2552,6 +2553,15 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) { } } +function readableByteStreamControllerCommitPullIntoDescriptors(stream, descriptors) { + for (let i = 0; i < descriptors.length; ++i) { + readableByteStreamControllerCommitPullIntoDescriptor( + stream, + descriptors[i], + ); + } +} + function readableByteStreamControllerInvalidateBYOBRequest(controller) { if (controller[kState].byobRequest === null) return; @@ -2758,11 +2768,11 @@ function readableByteStreamControllerRespondInClosedState(controller, desc) { stream, } = controller[kState]; if (readableStreamHasBYOBReader(stream)) { - while (readableStreamGetNumReadIntoRequests(stream) > 0) { - readableByteStreamControllerCommitPullIntoDescriptor( - stream, - readableByteStreamControllerShiftPendingPullInto(controller)); + const filledPullIntos = []; + for (let i = 0; i < readableStreamGetNumReadIntoRequests(stream); ++i) { + ArrayPrototypePush(filledPullIntos, readableByteStreamControllerShiftPendingPullInto(controller)); } + readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos); } } @@ -2843,8 +2853,9 @@ function readableByteStreamControllerEnqueue(controller, chunk) { transferredBuffer, byteOffset, byteLength); - readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( controller); + readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos); } else { assert(!isReadableStreamLocked(stream)); readableByteStreamControllerEnqueueChunkToQueue( @@ -2937,6 +2948,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize); let totalBytesToCopyRemaining = maxBytesToCopy; let ready = false; + assert(!ArrayBufferPrototypeGetDetached(buffer)); assert(bytesFilled < minimumFill); if (maxAlignedBytes >= minimumFill) { totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled; @@ -2952,12 +2964,12 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue( totalBytesToCopyRemaining, headOfQueue.byteLength); const destStart = byteOffset + desc.bytesFilled; - const arrayBufferByteLength = ArrayBufferPrototypeGetByteLength(buffer); - if (arrayBufferByteLength - destStart < bytesToCopy) { - throw new ERR_INVALID_STATE.RangeError( - 'view ArrayBuffer size is invalid'); - } - assert(arrayBufferByteLength - destStart >= bytesToCopy); + assert(canCopyArrayBuffer( + buffer, + destStart, + headOfQueue.buffer, + headOfQueue.byteOffset, + bytesToCopy)); copyArrayBuffer( buffer, destStart, @@ -2991,26 +3003,30 @@ function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( const { closeRequested, pendingPullIntos, - stream, } = controller[kState]; assert(!closeRequested); + const filledPullIntos = []; while (pendingPullIntos.length) { if (!controller[kState].queueTotalSize) - return; + break; const desc = pendingPullIntos[0]; if (readableByteStreamControllerFillPullIntoDescriptorFromQueue( controller, desc)) { readableByteStreamControllerShiftPendingPullInto(controller); - readableByteStreamControllerCommitPullIntoDescriptor(stream, desc); + ArrayPrototypePush(filledPullIntos, desc); } } + return filledPullIntos; } function readableByteStreamControllerRespondInReadableState( controller, bytesWritten, desc) { + const { + stream, + } = controller[kState]; const { buffer, bytesFilled, @@ -3031,9 +3047,10 @@ function readableByteStreamControllerRespondInReadableState( controller, desc, ); - readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( controller, ); + readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos); return; } @@ -3059,10 +3076,10 @@ function readableByteStreamControllerRespondInReadableState( ArrayBufferPrototypeGetByteLength(remainder)); } desc.bytesFilled -= remainderSize; - readableByteStreamControllerCommitPullIntoDescriptor( - controller[kState].stream, - desc); - readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller); + const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller); + + readableByteStreamControllerCommitPullIntoDescriptor(stream, desc); + readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos); } function readableByteStreamControllerRespondWithNewView(controller, view) { diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 2c70ef7acdfe66..5bf016f73b7af5 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -1,6 +1,8 @@ 'use strict'; const { + ArrayBufferPrototypeGetByteLength, + ArrayBufferPrototypeGetDetached, ArrayBufferPrototypeSlice, ArrayPrototypePush, ArrayPrototypeShift, @@ -107,6 +109,14 @@ function cloneAsUint8Array(view) { ); } +function canCopyArrayBuffer(toBuffer, toIndex, fromBuffer, fromIndex, count) { + return toBuffer !== fromBuffer && + !ArrayBufferPrototypeGetDetached(toBuffer) && + !ArrayBufferPrototypeGetDetached(fromBuffer) && + toIndex + count <= ArrayBufferPrototypeGetByteLength(toBuffer) && + fromIndex + count <= ArrayBufferPrototypeGetByteLength(fromBuffer); +} + function isBrandCheck(brand) { return (value) => { return value != null && @@ -261,6 +271,7 @@ module.exports = { ArrayBufferViewGetByteLength, ArrayBufferViewGetByteOffset, AsyncIterator, + canCopyArrayBuffer, createPromiseCallback, cloneAsUint8Array, copyArrayBuffer,