Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: implement min option for ReadableStreamBYOBReader.read(view) #50888

Merged
merged 11 commits into from
Jan 4, 2024
Merged
15 changes: 12 additions & 3 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ added: v16.5.0
-->

* Returns: A promise fulfilled with an object:
* `value` {ArrayBuffer}
* `value` {any}
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
* `done` {boolean}

Requests the next chunk of data from the underlying {ReadableStream}
Expand Down Expand Up @@ -617,15 +617,24 @@ added: v16.5.0
{ReadableStream} is closed or rejected if the stream errors or the reader's
lock is released before the stream finishes closing.

#### `readableStreamBYOBReader.read(view)`
#### `readableStreamBYOBReader.read(view[, options])`

<!-- YAML
added: v16.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/50888
description: Added `min` option.
-->

* `view` {Buffer|TypedArray|DataView}
* `options` {Object}
* `min` {number} When set, the returned promise will only be
fulfilled as soon as `min` number of elements are available.
When not set, the promise fulfills when at least one element
is available.
* Returns: A promise fulfilled with an object:
* `value` {ArrayBuffer}
* `value` {TypedArray|DataView}
* `done` {boolean}

Requests the next chunk of data from the underlying {ReadableStream}
Expand Down
8 changes: 1 addition & 7 deletions lib/internal/encoding.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ const {
const {
validateString,
validateObject,
kValidateObjectAllowNullable,
kValidateObjectAllowArray,
kValidateObjectAllowFunction,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');
const binding = internalBinding('encoding_binding');
const {
Expand Down Expand Up @@ -393,10 +391,6 @@ const TextDecoder =
makeTextDecoderICU() :
makeTextDecoderJS();

const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
kValidateObjectAllowArray |
kValidateObjectAllowFunction;

function makeTextDecoderICU() {
const {
decode: _decode,
Expand Down
7 changes: 7 additions & 0 deletions lib/internal/validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ const kValidateObjectNone = 0;
const kValidateObjectAllowNullable = 1 << 0;
const kValidateObjectAllowArray = 1 << 1;
const kValidateObjectAllowFunction = 1 << 2;
const kValidateObjectAllowObjects = kValidateObjectAllowArray |
kValidateObjectAllowFunction;
const kValidateObjectAllowObjectsAndNull = kValidateObjectAllowNullable |
kValidateObjectAllowArray |
kValidateObjectAllowFunction;

/**
* @callback validateObject
Expand Down Expand Up @@ -583,6 +588,8 @@ module.exports = {
kValidateObjectAllowNullable,
kValidateObjectAllowArray,
kValidateObjectAllowFunction,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
validateOneOf,
validatePlainFunction,
validatePort,
Expand Down
72 changes: 52 additions & 20 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
SymbolAsyncIterator,
SymbolDispose,
SymbolToStringTag,
TypedArrayPrototypeGetLength,
Uint8Array,
} = primordials;

Expand All @@ -33,6 +34,7 @@ const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_STATE,
ERR_INVALID_THIS,
ERR_OUT_OF_RANGE,
},
} = require('internal/errors');

Expand All @@ -58,8 +60,8 @@ const {
validateAbortSignal,
validateBuffer,
validateObject,
kValidateObjectAllowNullable,
kValidateObjectAllowFunction,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');

const {
Expand Down Expand Up @@ -248,8 +250,8 @@ class ReadableStream {
*/
constructor(source = {}, strategy = kEmptyObject) {
markTransferMode(this, false, true);
if (source === null)
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
validateObject(source, 'source', kValidateObjectAllowObjects);
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
Expand Down Expand Up @@ -332,7 +334,7 @@ class ReadableStream {
getReader(options = kEmptyObject) {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
validateObject(options, 'options', kValidateObjectAllowNullable | kValidateObjectAllowFunction);
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const mode = options?.mode;

if (mode === undefined)
Expand Down Expand Up @@ -370,6 +372,7 @@ class ReadableStream {

// The web platform tests require that these be handled one at a
// time and in a specific order. options can be null or undefined.
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventAbort = options?.preventAbort;
const preventCancel = options?.preventCancel;
const preventClose = options?.preventClose;
Expand Down Expand Up @@ -412,6 +415,7 @@ class ReadableStream {
destination);
}

validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventAbort = options?.preventAbort;
const preventCancel = options?.preventCancel;
const preventClose = options?.preventClose;
Expand Down Expand Up @@ -456,10 +460,8 @@ class ReadableStream {
values(options = kEmptyObject) {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
validateObject(options, 'options');
const {
preventCancel = false,
} = options;
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventCancel = !!(options?.preventCancel);

// eslint-disable-next-line no-use-before-define
const reader = new ReadableStreamDefaultReader(this);
Expand Down Expand Up @@ -931,12 +933,15 @@ class ReadableStreamBYOBReader {

/**
* @param {ArrayBufferView} view
* @param {{
* min? : number
* }} [options]
* @returns {Promise<{
* view : ArrayBufferView,
* done : boolean,
* }>}
*/
read(view) {
read(view, options = kEmptyObject) {
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
if (!isReadableStreamBYOBReader(this))
return PromiseReject(new ERR_INVALID_THIS('ReadableStreamBYOBReader'));
if (!isArrayBufferView(view)) {
Expand All @@ -950,6 +955,7 @@ class ReadableStreamBYOBReader {
],
view));
}
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);

const viewByteLength = ArrayBufferViewGetByteLength(view);
const viewBuffer = ArrayBufferViewGetBuffer(view);
Expand All @@ -965,13 +971,29 @@ class ReadableStreamBYOBReader {

// Supposed to assert here that the view's buffer is not
// detached, but there's no API available to use to check that.

const min = options?.min ?? 1;
if (typeof min !== 'number')
return PromiseReject(new ERR_INVALID_ARG_TYPE('options.min', 'number', min));
if (!NumberIsInteger(min))
return PromiseReject(new ERR_INVALID_ARG_VALUE('options.min', min, 'must be an integer'));
if (min <= 0)
return PromiseReject(new ERR_INVALID_ARG_VALUE('options.min', min, 'must be greater than 0'));
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
if (!isDataView(view)) {
if (min > TypedArrayPrototypeGetLength(view)) {
return PromiseReject(new ERR_OUT_OF_RANGE('options.min', '<= view.length', min));
}
} else if (min > viewByteLength) {
return PromiseReject(new ERR_OUT_OF_RANGE('options.min', '<= view.byteLength', min));
}

if (this[kState].stream === undefined) {
return PromiseReject(
new ERR_INVALID_STATE.TypeError(
'The reader is not attached to a stream'));
}
const readIntoRequest = new ReadIntoRequest();
readableStreamBYOBReaderRead(this, view, readIntoRequest);
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
return readIntoRequest.promise;
}

Expand Down Expand Up @@ -1885,7 +1907,7 @@ function readableByteStreamTee(stream) {
reading = false;
},
};
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
readableStreamBYOBReaderRead(reader, view, 1, readIntoRequest);
}

function pull1Algorithm() {
Expand Down Expand Up @@ -2212,7 +2234,7 @@ function readableStreamReaderGenericRelease(reader) {
reader[kState].stream = undefined;
}

function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
function readableStreamBYOBReaderRead(reader, view, min, readIntoRequest) {
const {
stream,
} = reader[kState];
Expand All @@ -2225,6 +2247,7 @@ function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
readableByteStreamControllerPullInto(
stream[kState].controller,
view,
min,
readIntoRequest);
}

Expand Down Expand Up @@ -2497,7 +2520,7 @@ function readableByteStreamControllerClose(controller) {

if (pendingPullIntos.length) {
const firstPendingPullInto = pendingPullIntos[0];
if (firstPendingPullInto.bytesFilled > 0) {
if (firstPendingPullInto.bytesFilled % firstPendingPullInto.elementSize !== 0) {
const error = new ERR_INVALID_STATE.TypeError('Partial read');
readableByteStreamControllerError(controller, error);
throw error;
Expand All @@ -2514,7 +2537,7 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {

let done = false;
if (stream[kState].state === 'closed') {
desc.bytesFilled = 0;
assert(desc.bytesFilled % desc.elementSize === 0);
done = true;
}

Expand Down Expand Up @@ -2603,6 +2626,7 @@ function readableByteStreamControllerHandleQueueDrain(controller) {
function readableByteStreamControllerPullInto(
controller,
view,
min,
readIntoRequest) {
const {
closeRequested,
Expand All @@ -2615,6 +2639,11 @@ function readableByteStreamControllerPullInto(
elementSize = view.constructor.BYTES_PER_ELEMENT;
ctor = view.constructor;
}

const minimumFill = min * elementSize;
assert(minimumFill >= elementSize && minimumFill <= view.byteLength);
assert(minimumFill % elementSize === 0);

const buffer = ArrayBufferViewGetBuffer(view);
const byteOffset = ArrayBufferViewGetByteOffset(view);
const byteLength = ArrayBufferViewGetByteLength(view);
Expand All @@ -2633,6 +2662,7 @@ function readableByteStreamControllerPullInto(
byteOffset,
byteLength,
bytesFilled: 0,
minimumFill,
elementSize,
ctor,
type: 'byob',
Expand Down Expand Up @@ -2720,7 +2750,7 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
}

function readableByteStreamControllerRespondInClosedState(controller, desc) {
assert(!desc.bytesFilled);
assert(desc.bytesFilled % desc.elementSize === 0);
if (desc.type === 'none') {
readableByteStreamControllerShiftPendingPullInto(controller);
}
Expand Down Expand Up @@ -2897,17 +2927,18 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
byteLength,
byteOffset,
bytesFilled,
minimumFill,
elementSize,
} = desc;
const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
const maxBytesToCopy = MathMin(
controller[kState].queueTotalSize,
byteLength - bytesFilled);
const maxBytesFilled = bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
assert(bytesFilled < minimumFill);
if (maxAlignedBytes >= minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
ready = true;
}
Expand Down Expand Up @@ -2950,7 +2981,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
if (!ready) {
assert(!controller[kState].queueTotalSize);
assert(desc.bytesFilled > 0);
assert(desc.bytesFilled < elementSize);
assert(desc.bytesFilled < minimumFill);
}
return ready;
}
Expand Down Expand Up @@ -3006,7 +3037,7 @@ function readableByteStreamControllerRespondInReadableState(
return;
}

if (desc.bytesFilled < desc.elementSize)
if (desc.bytesFilled < desc.minimumFill)
return;

readableByteStreamControllerShiftPendingPullInto(controller);
Expand Down Expand Up @@ -3191,6 +3222,7 @@ function readableByteStreamControllerPullSteps(controller, readRequest) {
byteOffset: 0,
byteLength: autoAllocateChunkSize,
bytesFilled: 0,
minimumFill: 1,
elementSize: 1,
ctor: Uint8Array,
type: 'default',
Expand Down
11 changes: 10 additions & 1 deletion lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ const {
kEnumerableProperty,
} = require('internal/util');

const {
validateObject,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');

const {
kDeserialize,
kTransfer,
Expand Down Expand Up @@ -119,10 +125,13 @@ class TransformStream {
* @param {QueuingStrategy} [readableStrategy]
*/
constructor(
transformer = null,
transformer = {},
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
writableStrategy = kEmptyObject,
readableStrategy = kEmptyObject) {
markTransferMode(this, false, true);
validateObject(transformer, 'transformer', kValidateObjectAllowObjects);
validateObject(writableStrategy, 'writableStrategy', kValidateObjectAllowObjectsAndNull);
validateObject(readableStrategy, 'readableStrategy', kValidateObjectAllowObjectsAndNull);
const readableType = transformer?.readableType;
const writableType = transformer?.writableType;
const start = transformer?.start;
Expand Down
10 changes: 9 additions & 1 deletion lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ const {
SideEffectFreeRegExpPrototypeSymbolReplace,
} = require('internal/util');

const {
validateObject,
kValidateObjectAllowObjects,
kValidateObjectAllowObjectsAndNull,
} = require('internal/validators');

const {
MessageChannel,
} = require('internal/worker/io');
Expand Down Expand Up @@ -154,8 +160,10 @@ class WritableStream {
* @param {UnderlyingSink} [sink]
* @param {QueuingStrategy} [strategy]
*/
constructor(sink = null, strategy = kEmptyObject) {
constructor(sink = {}, strategy = kEmptyObject) {
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
markTransferMode(this, false, true);
validateObject(sink, 'sink', kValidateObjectAllowObjects);
validateObject(strategy, 'strategy', kValidateObjectAllowObjectsAndNull);
const type = sink?.type;
if (type !== undefined)
throw new ERR_INVALID_ARG_VALUE.RangeError('type', type);
Expand Down
Loading
Loading