diff --git a/lib/api/readable.js b/lib/api/readable.js index f4d00c34118..4e440b213c8 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -19,7 +19,20 @@ const kBytesRead = Symbol('kBytesRead') const noop = () => {} +/** + * @class + * @extends {Readable} + * @see https://fetch.spec.whatwg.org/#body + */ class BodyReadable extends Readable { + /** + * @param {object} opts + * @param {(this: Readable, size: number) => void} opts.resume + * @param {() => (void | null)} opts.abort + * @param {string} [opts.contentType = ''] + * @param {number} [opts.contentLength] + * @param {number} [opts.highWaterMark = 64 * 1024] + */ constructor ({ resume, abort, @@ -36,8 +49,15 @@ class BodyReadable extends Readable { this._readableState.dataEmitted = false this[kAbort] = abort + + /** + * @type {Consume | null} + */ this[kConsume] = null this[kBytesRead] = 0 + /** + * @type {ReadableStream|null} + */ this[kBody] = null this[kUsed] = false this[kContentType] = contentType @@ -50,6 +70,11 @@ class BodyReadable extends Readable { this[kReading] = false } + /** + * @param {Error|null} err + * @param {(error:(Error|null)) => void} callback + * @returns {void} + */ _destroy (err, callback) { if (!err && !this._readableState.endEmitted) { err = new RequestAbortedError() @@ -72,21 +97,36 @@ class BodyReadable extends Readable { } } - on (ev, ...args) { - if (ev === 'data' || ev === 'readable') { + /** + * @param {string} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + on (event, listener) { + if (event === 'data' || event === 'readable') { this[kReading] = true this[kUsed] = true } - return super.on(ev, ...args) + return super.on(event, listener) } - addListener (ev, ...args) { - return this.on(ev, ...args) + /** + * @param {string} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + addListener (event, listener) { + return this.on(event, listener) } - off (ev, ...args) { - const ret = super.off(ev, ...args) - if (ev === 'data' || ev === 'readable') { + /** + * @param {string|symbol} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + off (event, listener) { + const ret = super.off(event, listener) + if (event === 'data' || event === 'readable') { this[kReading] = ( this.listenerCount('data') > 0 || this.listenerCount('readable') > 0 @@ -95,10 +135,19 @@ class BodyReadable extends Readable { return ret } - removeListener (ev, ...args) { - return this.off(ev, ...args) + /** + * @param {string|symbol} event + * @param {(...args: any[]) => void} listener + * @returns {this} + */ + removeListener (event, listener) { + return this.off(event, listener) } + /** + * @param {Buffer|null} chunk + * @returns {boolean} + */ push (chunk) { this[kBytesRead] += chunk ? chunk.length : 0 @@ -109,43 +158,84 @@ class BodyReadable extends Readable { return super.push(chunk) } - // https://fetch.spec.whatwg.org/#dom-body-text + /** + * Consumes and returns the body as a string. + * + * @see https://fetch.spec.whatwg.org/#dom-body-text + * @returns {Promise} + */ async text () { return consume(this, 'text') } - // https://fetch.spec.whatwg.org/#dom-body-json + /** + * Consumes and returns the body as a JavaScript Object. + * + * @see https://fetch.spec.whatwg.org/#dom-body-json + * @returns {Promise} + */ async json () { return consume(this, 'json') } - // https://fetch.spec.whatwg.org/#dom-body-blob + /** + * Consumes and returns the body as a Blob + * + * @see https://fetch.spec.whatwg.org/#dom-body-blob + * @returns {Promise} + */ async blob () { return consume(this, 'blob') } - // https://fetch.spec.whatwg.org/#dom-body-bytes + /** + * Consumes and returns the body as an Uint8Array. + * + * @see https://fetch.spec.whatwg.org/#dom-body-bytes + * @returns {Promise} + */ async bytes () { return consume(this, 'bytes') } - // https://fetch.spec.whatwg.org/#dom-body-arraybuffer + /** + * Consumes and returns the body as an ArrayBuffer. + * + * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer + * @returns {Promise} + */ async arrayBuffer () { return consume(this, 'arrayBuffer') } - // https://fetch.spec.whatwg.org/#dom-body-formdata + /** + * Not implemented + * + * @see https://fetch.spec.whatwg.org/#dom-body-formdata + * @throws {NotSupportedError} + */ async formData () { // TODO: Implement. throw new NotSupportedError() } - // https://fetch.spec.whatwg.org/#dom-body-bodyused + /** + * Returns true if the body is not null and the body has been consumed. + * Otherwise, returns false. + * + * @see https://fetch.spec.whatwg.org/#dom-body-bodyused + * @readonly + * @returns {boolean} + */ get bodyUsed () { return util.isDisturbed(this) } - // https://fetch.spec.whatwg.org/#dom-body-body + /** + * @see https://fetch.spec.whatwg.org/#dom-body-body + * @readonly + * @returns {ReadableStream} + */ get body () { if (!this[kBody]) { this[kBody] = ReadableStreamFrom(this) @@ -158,6 +248,13 @@ class BodyReadable extends Readable { return this[kBody] } + /** + * Dumps the response body by reading `limit` number of bytes. + * @param {object} opts + * @param {number} [opts.limit = 131072] Number of bytes to read. + * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump. + * @returns {Promise} + */ async dump (opts) { const signal = opts?.signal @@ -165,7 +262,9 @@ class BodyReadable extends Readable { throw new InvalidArgumentError('signal must be an AbortSignal') } - const limit = Number.isFinite(opts?.limit) ? opts.limit : 128 * 1024 + const limit = opts?.limit && Number.isFinite(opts.limit) + ? opts.limit + : 128 * 1024 signal?.throwIfAborted() @@ -174,26 +273,34 @@ class BodyReadable extends Readable { } return await new Promise((resolve, reject) => { - if (this[kContentLength] > limit || this[kBytesRead] > limit) { + if ( + (this[kContentLength] && (this[kContentLength] > limit)) || + this[kBytesRead] > limit + ) { this.destroy(new AbortError()) } - const onAbort = () => { - this.destroy(signal.reason ?? new AbortError()) + if (signal) { + const onAbort = () => { + this.destroy(signal.reason ?? new AbortError()) + } + signal.addEventListener('abort', onAbort) + this + .on('close', function () { + signal.removeEventListener('abort', onAbort) + if (signal.aborted) { + reject(signal.reason ?? new AbortError()) + } else { + resolve(null) + } + }) + } else { + this.on('close', resolve) } - signal?.addEventListener('abort', onAbort) this - .on('close', function () { - signal?.removeEventListener('abort', onAbort) - if (signal?.aborted) { - reject(signal.reason ?? new AbortError()) - } else { - resolve(null) - } - }) .on('error', noop) - .on('data', function (chunk) { + .on('data', () => { if (this[kBytesRead] > limit) { this.destroy() } @@ -204,7 +311,7 @@ class BodyReadable extends Readable { /** * @param {BufferEncoding} encoding - * @returns {BodyReadable} + * @returns {this} */ setEncoding (encoding) { if (Buffer.isEncoding(encoding)) { @@ -214,17 +321,40 @@ class BodyReadable extends Readable { } } -// https://streams.spec.whatwg.org/#readablestream-locked -function isLocked (self) { +/** + * @see https://streams.spec.whatwg.org/#readablestream-locked + * @param {BodyReadable} bodyReadable + * @returns {boolean} + */ +function isLocked (bodyReadable) { // Consume is an implicit lock. - return (self[kBody] && self[kBody].locked === true) || self[kConsume] + return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null } -// https://fetch.spec.whatwg.org/#body-unusable -function isUnusable (self) { - return util.isDisturbed(self) || isLocked(self) +/** + * @see https://fetch.spec.whatwg.org/#body-unusable + * @param {BodyReadable} bodyReadable + * @returns {boolean} + */ +function isUnusable (bodyReadable) { + return util.isDisturbed(bodyReadable) || isLocked(bodyReadable) } +/** + * @typedef {object} Consume + * @property {string} type + * @property {BodyReadable} stream + * @property {((value?: any) => void)} resolve + * @property {((err: Error) => void)} reject + * @property {number} length + * @property {Buffer[]} body + */ + +/** + * @param {BodyReadable} stream + * @param {string} type + * @returns {Promise} + */ async function consume (stream, type) { assert(!stream[kConsume]) @@ -269,6 +399,10 @@ async function consume (stream, type) { }) } +/** + * @param {Consume} consume + * @returns {void} + */ function consumeStart (consume) { if (consume.body === null) { return @@ -356,6 +490,11 @@ function chunksConcat (chunks, length) { return buffer } +/** + * @param {Consume} consume + * @param {BufferEncoding} encoding + * @returns {void} + */ function consumeEnd (consume, encoding) { const { type, body, resolve, stream, length } = consume @@ -378,11 +517,21 @@ function consumeEnd (consume, encoding) { } } +/** + * @param {Consume} consume + * @param {Buffer} chunk + * @returns {void} + */ function consumePush (consume, chunk) { consume.length += chunk.length consume.body.push(chunk) } +/** + * @param {Consume} consume + * @param {Error} [err] + * @returns {void} + */ function consumeFinish (consume, err) { if (consume.body === null) { return @@ -394,6 +543,7 @@ function consumeFinish (consume, err) { consume.resolve() } + // Reset the consume object to allow for garbage collection. consume.type = null consume.stream = null consume.resolve = null @@ -402,4 +552,7 @@ function consumeFinish (consume, err) { consume.body = null } -module.exports = { Readable: BodyReadable, chunksDecode } +module.exports = { + Readable: BodyReadable, + chunksDecode +} diff --git a/lib/core/util.js b/lib/core/util.js index 56c895cf25d..cbdadebd728 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -558,7 +558,7 @@ function getSocketInfo (socket) { } /** - * @returns {globalThis['ReadableStream']} + * @returns {ReadableStream} */ function ReadableStreamFrom (iterable) { // We cannot use ReadableStream.from here because it does not return a byte stream. diff --git a/test/types/readable.test-d.ts b/test/types/readable.test-d.ts index b5d32f6c221..02ebf8aa717 100644 --- a/test/types/readable.test-d.ts +++ b/test/types/readable.test-d.ts @@ -2,10 +2,16 @@ import { expectAssignable } from 'tsd' import BodyReadable from '../../types/readable' import { Blob } from 'buffer' -expectAssignable(new BodyReadable()) +expectAssignable(new BodyReadable({ + abort: () => null, + resume: () => null +})) { - const readable = new BodyReadable() + const readable = new BodyReadable({ + abort: () => null, + resume: () => null + }) // dump expectAssignable>(readable.dump()) diff --git a/types/readable.d.ts b/types/readable.d.ts index 703f6fa806f..e4f314b4a0e 100644 --- a/types/readable.d.ts +++ b/types/readable.d.ts @@ -4,11 +4,13 @@ import { Blob } from 'buffer' export default BodyReadable declare class BodyReadable extends Readable { - constructor ( - resume?: (this: Readable, size: number) => void | null, - abort?: () => void | null, - contentType?: string - ) + constructor (opts: { + resume: (this: Readable, size: number) => void | null; + abort: () => void | null; + contentType?: string; + contentLength?: number; + highWaterMark?: number; + }) /** Consumes and returns the body as a string * https://fetch.spec.whatwg.org/#dom-body-text @@ -59,7 +61,8 @@ declare class BodyReadable extends Readable { readonly body: never | undefined /** Dumps the response body by reading `limit` number of bytes. - * @param opts.limit Number of bytes to read (optional) - Default: 262144 + * @param opts.limit Number of bytes to read (optional) - Default: 131072 + * @param opts.signal AbortSignal to cancel the operation (optional) */ - dump (opts?: { limit: number }): Promise + dump (opts?: { limit: number; signal?: AbortSignal }): Promise }