Skip to content

Commit

Permalink
jsdoc: lib/api/readable.js, fix some types (#3567)
Browse files Browse the repository at this point in the history
* jsdoc: lib/api/readable.js

* use semi

* correct the types

* make resume and abort mandatory in types

* on second thought remove trunc
  • Loading branch information
Uzlopak authored Sep 8, 2024
1 parent ebe6b15 commit 8ce5ff3
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 50 deletions.
233 changes: 193 additions & 40 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,20 @@ const kBytesRead = Symbol('kBytesRead')

const noop = () => {}

/**
* @class
* @extends {Readable}
* @see https://fetch.spec.whatwg.org/#body
*/
class BodyReadable extends Readable {
/**
* @param {object} opts
* @param {(this: Readable, size: number) => void} opts.resume
* @param {() => (void | null)} opts.abort
* @param {string} [opts.contentType = '']
* @param {number} [opts.contentLength]
* @param {number} [opts.highWaterMark = 64 * 1024]
*/
constructor ({
resume,
abort,
Expand All @@ -36,8 +49,15 @@ class BodyReadable extends Readable {
this._readableState.dataEmitted = false

this[kAbort] = abort

/**
* @type {Consume | null}
*/
this[kConsume] = null
this[kBytesRead] = 0
/**
* @type {ReadableStream|null}
*/
this[kBody] = null
this[kUsed] = false
this[kContentType] = contentType
Expand All @@ -50,6 +70,11 @@ class BodyReadable extends Readable {
this[kReading] = false
}

/**
* @param {Error|null} err
* @param {(error:(Error|null)) => void} callback
* @returns {void}
*/
_destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
Expand All @@ -72,21 +97,36 @@ class BodyReadable extends Readable {
}
}

on (ev, ...args) {
if (ev === 'data' || ev === 'readable') {
/**
* @param {string} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
on (event, listener) {
if (event === 'data' || event === 'readable') {
this[kReading] = true
this[kUsed] = true
}
return super.on(ev, ...args)
return super.on(event, listener)
}

addListener (ev, ...args) {
return this.on(ev, ...args)
/**
* @param {string} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
addListener (event, listener) {
return this.on(event, listener)
}

off (ev, ...args) {
const ret = super.off(ev, ...args)
if (ev === 'data' || ev === 'readable') {
/**
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
off (event, listener) {
const ret = super.off(event, listener)
if (event === 'data' || event === 'readable') {
this[kReading] = (
this.listenerCount('data') > 0 ||
this.listenerCount('readable') > 0
Expand All @@ -95,10 +135,19 @@ class BodyReadable extends Readable {
return ret
}

removeListener (ev, ...args) {
return this.off(ev, ...args)
/**
* @param {string|symbol} event
* @param {(...args: any[]) => void} listener
* @returns {this}
*/
removeListener (event, listener) {
return this.off(event, listener)
}

/**
* @param {Buffer|null} chunk
* @returns {boolean}
*/
push (chunk) {
this[kBytesRead] += chunk ? chunk.length : 0

Expand All @@ -109,43 +158,84 @@ class BodyReadable extends Readable {
return super.push(chunk)
}

// https://fetch.spec.whatwg.org/#dom-body-text
/**
* Consumes and returns the body as a string.
*
* @see https://fetch.spec.whatwg.org/#dom-body-text
* @returns {Promise<string>}
*/
async text () {
return consume(this, 'text')
}

// https://fetch.spec.whatwg.org/#dom-body-json
/**
* Consumes and returns the body as a JavaScript Object.
*
* @see https://fetch.spec.whatwg.org/#dom-body-json
* @returns {Promise<unknown>}
*/
async json () {
return consume(this, 'json')
}

// https://fetch.spec.whatwg.org/#dom-body-blob
/**
* Consumes and returns the body as a Blob
*
* @see https://fetch.spec.whatwg.org/#dom-body-blob
* @returns {Promise<Blob>}
*/
async blob () {
return consume(this, 'blob')
}

// https://fetch.spec.whatwg.org/#dom-body-bytes
/**
* Consumes and returns the body as an Uint8Array.
*
* @see https://fetch.spec.whatwg.org/#dom-body-bytes
* @returns {Promise<Uint8Array>}
*/
async bytes () {
return consume(this, 'bytes')
}

// https://fetch.spec.whatwg.org/#dom-body-arraybuffer
/**
* Consumes and returns the body as an ArrayBuffer.
*
* @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer
* @returns {Promise<ArrayBuffer>}
*/
async arrayBuffer () {
return consume(this, 'arrayBuffer')
}

// https://fetch.spec.whatwg.org/#dom-body-formdata
/**
* Not implemented
*
* @see https://fetch.spec.whatwg.org/#dom-body-formdata
* @throws {NotSupportedError}
*/
async formData () {
// TODO: Implement.
throw new NotSupportedError()
}

// https://fetch.spec.whatwg.org/#dom-body-bodyused
/**
* Returns true if the body is not null and the body has been consumed.
* Otherwise, returns false.
*
* @see https://fetch.spec.whatwg.org/#dom-body-bodyused
* @readonly
* @returns {boolean}
*/
get bodyUsed () {
return util.isDisturbed(this)
}

// https://fetch.spec.whatwg.org/#dom-body-body
/**
* @see https://fetch.spec.whatwg.org/#dom-body-body
* @readonly
* @returns {ReadableStream}
*/
get body () {
if (!this[kBody]) {
this[kBody] = ReadableStreamFrom(this)
Expand All @@ -158,14 +248,23 @@ class BodyReadable extends Readable {
return this[kBody]
}

/**
* Dumps the response body by reading `limit` number of bytes.
* @param {object} opts
* @param {number} [opts.limit = 131072] Number of bytes to read.
* @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump.
* @returns {Promise<null>}
*/
async dump (opts) {
const signal = opts?.signal

if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) {
throw new InvalidArgumentError('signal must be an AbortSignal')
}

const limit = Number.isFinite(opts?.limit) ? opts.limit : 128 * 1024
const limit = opts?.limit && Number.isFinite(opts.limit)
? opts.limit
: 128 * 1024

signal?.throwIfAborted()

Expand All @@ -174,26 +273,34 @@ class BodyReadable extends Readable {
}

return await new Promise((resolve, reject) => {
if (this[kContentLength] > limit || this[kBytesRead] > limit) {
if (
(this[kContentLength] && (this[kContentLength] > limit)) ||
this[kBytesRead] > limit
) {
this.destroy(new AbortError())
}

const onAbort = () => {
this.destroy(signal.reason ?? new AbortError())
if (signal) {
const onAbort = () => {
this.destroy(signal.reason ?? new AbortError())
}
signal.addEventListener('abort', onAbort)
this
.on('close', function () {
signal.removeEventListener('abort', onAbort)
if (signal.aborted) {
reject(signal.reason ?? new AbortError())
} else {
resolve(null)
}
})
} else {
this.on('close', resolve)
}
signal?.addEventListener('abort', onAbort)

this
.on('close', function () {
signal?.removeEventListener('abort', onAbort)
if (signal?.aborted) {
reject(signal.reason ?? new AbortError())
} else {
resolve(null)
}
})
.on('error', noop)
.on('data', function (chunk) {
.on('data', () => {
if (this[kBytesRead] > limit) {
this.destroy()
}
Expand All @@ -204,7 +311,7 @@ class BodyReadable extends Readable {

/**
* @param {BufferEncoding} encoding
* @returns {BodyReadable}
* @returns {this}
*/
setEncoding (encoding) {
if (Buffer.isEncoding(encoding)) {
Expand All @@ -214,17 +321,40 @@ class BodyReadable extends Readable {
}
}

// https://streams.spec.whatwg.org/#readablestream-locked
function isLocked (self) {
/**
* @see https://streams.spec.whatwg.org/#readablestream-locked
* @param {BodyReadable} bodyReadable
* @returns {boolean}
*/
function isLocked (bodyReadable) {
// Consume is an implicit lock.
return (self[kBody] && self[kBody].locked === true) || self[kConsume]
return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null
}

// https://fetch.spec.whatwg.org/#body-unusable
function isUnusable (self) {
return util.isDisturbed(self) || isLocked(self)
/**
* @see https://fetch.spec.whatwg.org/#body-unusable
* @param {BodyReadable} bodyReadable
* @returns {boolean}
*/
function isUnusable (bodyReadable) {
return util.isDisturbed(bodyReadable) || isLocked(bodyReadable)
}

/**
* @typedef {object} Consume
* @property {string} type
* @property {BodyReadable} stream
* @property {((value?: any) => void)} resolve
* @property {((err: Error) => void)} reject
* @property {number} length
* @property {Buffer[]} body
*/

/**
* @param {BodyReadable} stream
* @param {string} type
* @returns {Promise<any>}
*/
async function consume (stream, type) {
assert(!stream[kConsume])

Expand Down Expand Up @@ -269,6 +399,10 @@ async function consume (stream, type) {
})
}

/**
* @param {Consume} consume
* @returns {void}
*/
function consumeStart (consume) {
if (consume.body === null) {
return
Expand Down Expand Up @@ -356,6 +490,11 @@ function chunksConcat (chunks, length) {
return buffer
}

/**
* @param {Consume} consume
* @param {BufferEncoding} encoding
* @returns {void}
*/
function consumeEnd (consume, encoding) {
const { type, body, resolve, stream, length } = consume

Expand All @@ -378,11 +517,21 @@ function consumeEnd (consume, encoding) {
}
}

/**
* @param {Consume} consume
* @param {Buffer} chunk
* @returns {void}
*/
function consumePush (consume, chunk) {
consume.length += chunk.length
consume.body.push(chunk)
}

/**
* @param {Consume} consume
* @param {Error} [err]
* @returns {void}
*/
function consumeFinish (consume, err) {
if (consume.body === null) {
return
Expand All @@ -394,6 +543,7 @@ function consumeFinish (consume, err) {
consume.resolve()
}

// Reset the consume object to allow for garbage collection.
consume.type = null
consume.stream = null
consume.resolve = null
Expand All @@ -402,4 +552,7 @@ function consumeFinish (consume, err) {
consume.body = null
}

module.exports = { Readable: BodyReadable, chunksDecode }
module.exports = {
Readable: BodyReadable,
chunksDecode
}
Loading

0 comments on commit 8ce5ff3

Please sign in to comment.