From d44958d770ba1437a250342ce427b350325d27b5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 24 Nov 2024 21:30:36 +0100 Subject: [PATCH 1/6] feat: new hooks --- lib/core/util.js | 5 ++ lib/dispatcher/dispatcher-base.js | 3 +- lib/dispatcher/dispatcher.js | 4 ++ lib/handler/redirect-handler.js | 2 - lib/handler/unwrap-handler.js | 96 ++++++++++++++++++++++++++++++ lib/handler/wrap-handler.js | 98 +++++++++++++++++++++++++++++++ test/mock-agent.js | 83 -------------------------- test/node-test/client-dispatch.js | 36 +----------- 8 files changed, 206 insertions(+), 121 deletions(-) create mode 100644 lib/handler/unwrap-handler.js create mode 100644 lib/handler/wrap-handler.js diff --git a/lib/core/util.js b/lib/core/util.js index fcdd5da0483..dfefac6d15c 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -511,6 +511,11 @@ function assertRequestHandler (handler, method, upgrade) { throw new InvalidArgumentError('handler must be an object') } + if (typeof handler.onRequestStart === 'function') { + // TODO (fix): More checks... + return + } + if (typeof handler.onConnect !== 'function') { throw new InvalidArgumentError('invalid onConnect method') } diff --git a/lib/dispatcher/dispatcher-base.js b/lib/dispatcher/dispatcher-base.js index afe4e9086db..cb3f0e02a3c 100644 --- a/lib/dispatcher/dispatcher-base.js +++ b/lib/dispatcher/dispatcher-base.js @@ -1,6 +1,7 @@ 'use strict' const Dispatcher = require('./dispatcher') +const UnwrapHandler = require('../handler/unwrap-handler') const { ClientDestroyedError, ClientClosedError, @@ -142,7 +143,7 @@ class DispatcherBase extends Dispatcher { throw new ClientClosedError() } - return this[kDispatch](opts, handler) + return this[kDispatch](opts, UnwrapHandler.unwrap(handler)) } catch (err) { if (typeof handler.onError !== 'function') { throw new InvalidArgumentError('invalid onError method') diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index ecff2a9b168..3019ad0e1da 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -1,5 +1,8 @@ 'use strict' const EventEmitter = require('node:events') +const WrapHandler = require('../handler/wrap-handler') + +const wrapInterceptor = (dispatch) => (opts, handler) => dispatch(opts, WrapHandler.wrap(handler)) class Dispatcher extends EventEmitter { dispatch () { @@ -28,6 +31,7 @@ class Dispatcher extends EventEmitter { throw new TypeError(`invalid interceptor, expected function received ${typeof interceptor}`) } + dispatch = wrapInterceptor(dispatch) dispatch = interceptor(dispatch) if (dispatch == null || typeof dispatch !== 'function' || dispatch.length !== 2) { diff --git a/lib/handler/redirect-handler.js b/lib/handler/redirect-handler.js index 02c302d9aa8..df2baee19d5 100644 --- a/lib/handler/redirect-handler.js +++ b/lib/handler/redirect-handler.js @@ -40,8 +40,6 @@ class RedirectHandler { throw new InvalidArgumentError('maxRedirections must be a positive number') } - util.assertRequestHandler(handler, opts.method, opts.upgrade) - this.dispatch = dispatch this.location = null this.abort = null diff --git a/lib/handler/unwrap-handler.js b/lib/handler/unwrap-handler.js new file mode 100644 index 00000000000..5fc46b004d0 --- /dev/null +++ b/lib/handler/unwrap-handler.js @@ -0,0 +1,96 @@ +'use strict' + +const { parseHeaders } = require('../core/util') +const { InvalidArgumentError } = require('../core/errors') + +const kResume = Symbol('resume') + +class UnwrapController { + #paused = false + #reason = null + #aborted = false + #abort + + [kResume] = null + + constructor (abort) { + this.#abort = abort + } + + pause () { + this.#paused = true + } + + resume () { + if (this.#paused) { + this.#paused = false + this[kResume]?.() + } + } + + abort (reason) { + if (!this.#aborted) { + this.#aborted = true + this.#reason = reason + this.#abort(reason) + } + } + + get aborted () { + return this.#aborted + } + + get reason () { + return this.#reason + } + + get paused () { + return this.#paused + } +} + +module.exports = class UnwrapHandler { + #handler + #controller + + constructor (handler) { + this.#handler = handler + } + + static unwrap (handler) { + // TODO (fix): More checks... + return handler.onConnect ? handler : new UnwrapHandler(handler) + } + + onConnect (abort, context) { + this.#controller = new UnwrapController(abort) + this.#handler.onRequestStart?.(this.#controller, context) + } + + onUpgrade (statusCode, rawHeaders, socket) { + this.#handler.onRequestUpgrade?.(statusCode, parseHeaders(rawHeaders), socket) + } + + onHeaders (statusCode, rawHeaders, resume, statusMessage) { + this.#controller[kResume] = resume + this.#handler.onResponseStart?.(this.#controller, statusCode, parseHeaders(rawHeaders)) + return !this.#controller.paused + } + + onData (data) { + this.#handler.onResponseData?.(this.#controller, data) + return !this.#controller.paused + } + + onComplete (rawTrailers) { + this.#handler.onResponseEnd?.(this.#controller, parseHeaders(rawTrailers)) + } + + onError (err) { + if (!this.#handler.onError) { + throw new InvalidArgumentError('invalid onError method') + } + + this.#handler.onResponseError?.(this.#controller, err) + } +} diff --git a/lib/handler/wrap-handler.js b/lib/handler/wrap-handler.js new file mode 100644 index 00000000000..036d9271b52 --- /dev/null +++ b/lib/handler/wrap-handler.js @@ -0,0 +1,98 @@ +'use strict' + +const { InvalidArgumentError } = require('../core/errors') + +module.exports = class WrapHandler { + #handler + + constructor (handler) { + this.#handler = handler + } + + static wrap (handler) { + // TODO (fix): More checks... + return handler.onRequestStart ? handler : new WrapHandler(handler) + } + + // Unwrap Interface + + onConnect (abort, context) { + return this.#handler.onConnect?.(abort, context) + } + + onHeaders (statusCode, rawHeaders, resume, statusMessage) { + return this.#handler.onHeaders?.(statusCode, rawHeaders, resume, statusMessage) + } + + onUpgrade (statusCode, rawHeaders, socket) { + return this.#handler.onUpgrade?.(statusCode, rawHeaders, socket) + } + + onData (data) { + return this.#handler.onData?.(data) + } + + onComplete (trailers) { + return this.#handler.onComplete?.(trailers) + } + + onError (err) { + if (!this.#handler.onError) { + throw new InvalidArgumentError('invalid onError method') + } + + return this.#handler.onError?.(err) + } + + // Wrap Interface + + onRequestStart (controller, context) { + this.#handler.onConnect?.((reason) => controller.abort(reason), context) + } + + onRequestUpgrade (statusCode, headers, socket) { + const rawHeaders = [] + for (const [key, val] of Object.entries(headers)) { + // TODO (fix): What if val is Array + rawHeaders.push(Buffer.from(key), Buffer.from(val)) + } + + this.#handler.onUpgrade?.(statusCode, rawHeaders, socket) + } + + onResponseHeaders (controller, statusCode, statusMessage, headers) { + const rawHeaders = [] + for (const [key, val] of Object.entries(headers)) { + // TODO (fix): What if val is Array + rawHeaders.push(Buffer.from(key), Buffer.from(val)) + } + + if (this.#handler.onHeaders?.(statusCode, rawHeaders, () => controller.resume(), statusMessage) === false) { + controller.pause() + } + } + + onResponseData (controller, data) { + if (this.#handler.onData?.(data) === false) { + controller.pause() + } + } + + onResponseEnd (controller, trailers) { + const rawTrailers = [] + for (const [key, val] of Object.entries(trailers)) { + // TODO (fix): What if val is Array + rawTrailers.push(Buffer.from(key), Buffer.from(val)) + } + + this.#handler.onComplete?.(rawTrailers) + } + + onResponseError (controller, error) { + if (!this.#handler.onError) { + throw new InvalidArgumentError('invalid onError method') + } + + this.#handler.onError?.(error) + } +} diff --git a/test/mock-agent.js b/test/mock-agent.js index e8afa8b00d6..d6fa744049d 100644 --- a/test/mock-agent.js +++ b/test/mock-agent.js @@ -142,89 +142,6 @@ describe('MockAgent - dispatch', () => { onError: () => {} })) }) - - test('should throw if handler is not valid on redirect', (t) => { - t = tspl(t, { plan: 7 }) - - const baseUrl = 'http://localhost:9999' - - const mockAgent = new MockAgent() - after(() => mockAgent.close()) - - t.throws(() => mockAgent.dispatch({ - origin: baseUrl, - path: '/foo', - method: 'GET' - }, { - onError: 'INVALID' - }), new InvalidArgumentError('invalid onError method')) - - t.throws(() => mockAgent.dispatch({ - origin: baseUrl, - path: '/foo', - method: 'GET' - }, { - onError: (err) => { throw err }, - onConnect: 'INVALID' - }), new InvalidArgumentError('invalid onConnect method')) - - t.throws(() => mockAgent.dispatch({ - origin: baseUrl, - path: '/foo', - method: 'GET' - }, { - onError: (err) => { throw err }, - onConnect: () => {}, - onBodySent: 'INVALID' - }), new InvalidArgumentError('invalid onBodySent method')) - - t.throws(() => mockAgent.dispatch({ - origin: baseUrl, - path: '/foo', - method: 'CONNECT' - }, { - onError: (err) => { throw err }, - onConnect: () => {}, - onBodySent: () => {}, - onUpgrade: 'INVALID' - }), new InvalidArgumentError('invalid onUpgrade method')) - - t.throws(() => mockAgent.dispatch({ - origin: baseUrl, - path: '/foo', - method: 'GET' - }, { - onError: (err) => { throw err }, - onConnect: () => {}, - onBodySent: () => {}, - onHeaders: 'INVALID' - }), new InvalidArgumentError('invalid onHeaders method')) - - t.throws(() => mockAgent.dispatch({ - origin: baseUrl, - path: '/foo', - method: 'GET' - }, { - onError: (err) => { throw err }, - onConnect: () => {}, - onBodySent: () => {}, - onHeaders: () => {}, - onData: 'INVALID' - }), new InvalidArgumentError('invalid onData method')) - - t.throws(() => mockAgent.dispatch({ - origin: baseUrl, - path: '/foo', - method: 'GET' - }, { - onError: (err) => { throw err }, - onConnect: () => {}, - onBodySent: () => {}, - onHeaders: () => {}, - onData: () => {}, - onComplete: 'INVALID' - }), new InvalidArgumentError('invalid onComplete method')) - }) }) test('MockAgent - .close should clean up registered pools', async (t) => { diff --git a/test/node-test/client-dispatch.js b/test/node-test/client-dispatch.js index 296e3b8d075..1a0680916a3 100644 --- a/test/node-test/client-dispatch.js +++ b/test/node-test/client-dispatch.js @@ -419,40 +419,6 @@ test('connect call onUpgrade once', async (t) => { await p.completed }) -test('dispatch onConnect missing', async (t) => { - const p = tspl(t, { plan: 1 }) - - const server = http.createServer((req, res) => { - res.end('ad') - }) - t.after(closeServerAsPromise(server)) - - server.listen(0, () => { - const client = new Client(`http://localhost:${server.address().port}`) - t.after(() => { return client.close() }) - - client.dispatch({ - path: '/', - method: 'GET' - }, { - onHeaders (statusCode, headers) { - t.ok(true, 'should not throw') - }, - onData (buf) { - t.ok(true, 'should not throw') - }, - onComplete (trailers) { - t.ok(true, 'should not throw') - }, - onError (err) { - p.strictEqual(err.code, 'UND_ERR_INVALID_ARG') - } - }) - }) - - await p.completed -}) - test('dispatch onHeaders missing', async (t) => { const p = tspl(t, { plan: 1 }) @@ -667,7 +633,7 @@ test('dispatch pool onError missing', async (t) => { client.dispatch({ path: '/', method: 'GET', - upgrade: 'Websocket' + upgrade: 1 }, { }) } catch (err) { From 7e539df0fc1b2cd4b935dc031d3c07b1cab1253a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 25 Nov 2024 09:24:02 +0100 Subject: [PATCH 2/6] fixup --- lib/cache/sqlite-cache-store.js | 18 +-- lib/dispatcher/dispatcher.js | 1 + lib/handler/cache-handler.js | 169 +++++----------------- lib/handler/cache-revalidation-handler.js | 88 +++-------- lib/handler/unwrap-handler.js | 4 +- lib/handler/wrap-handler.js | 6 +- lib/interceptor/cache.js | 62 ++++---- lib/util/cache.js | 4 +- types/cache-interceptor.d.ts | 4 +- 9 files changed, 114 insertions(+), 242 deletions(-) diff --git a/lib/cache/sqlite-cache-store.js b/lib/cache/sqlite-cache-store.js index afbdbc4771d..d579b650e34 100644 --- a/lib/cache/sqlite-cache-store.js +++ b/lib/cache/sqlite-cache-store.js @@ -4,7 +4,7 @@ const { DatabaseSync } = require('node:sqlite') const { Writable } = require('stream') const { assertCacheKey, assertCacheValue } = require('../util/cache.js') -const VERSION = 1 +const VERSION = 2 /** * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore @@ -12,7 +12,7 @@ const VERSION = 1 * * @typedef {{ * id: Readonly - * rawHeaders?: string + * headers?: Record * vary?: string | object * body: string * } & import('../../types/cache-interceptor.d.ts').default.CacheValue} SqliteStoreValue @@ -107,7 +107,7 @@ class SqliteCacheStore { deleteAt INTEGER NOT NULL, statusCode INTEGER NOT NULL, statusMessage TEXT NOT NULL, - rawHeaders TEXT NULL, + headers TEXT NULL, etag TEXT NULL, vary TEXT NULL, cachedAt INTEGER NOT NULL, @@ -126,7 +126,7 @@ class SqliteCacheStore { deleteAt, statusCode, statusMessage, - rawHeaders, + headers, etag, vary, cachedAt, @@ -145,7 +145,7 @@ class SqliteCacheStore { deleteAt = ?, statusCode = ?, statusMessage = ?, - rawHeaders = ?, + headers = ?, etag = ?, cachedAt = ?, staleAt = ?, @@ -162,7 +162,7 @@ class SqliteCacheStore { deleteAt, statusCode, statusMessage, - rawHeaders, + headers, etag, vary, cachedAt, @@ -221,7 +221,7 @@ class SqliteCacheStore { body: value.body ? parseBufferArray(JSON.parse(value.body)) : null, statusCode: value.statusCode, statusMessage: value.statusMessage, - rawHeaders: value.rawHeaders ? parseBufferArray(JSON.parse(value.rawHeaders)) : undefined, + headers: value.headers ? JSON.parse(value.headers) : undefined, etag: value.etag ? value.etag : undefined, cachedAt: value.cachedAt, staleAt: value.staleAt, @@ -275,7 +275,7 @@ class SqliteCacheStore { value.deleteAt, value.statusCode, value.statusMessage, - value.rawHeaders ? JSON.stringify(stringifyBufferArray(value.rawHeaders)) : null, + value.headers ? JSON.stringify(value.headers) : null, value.etag, value.cachedAt, value.staleAt, @@ -291,7 +291,7 @@ class SqliteCacheStore { value.deleteAt, value.statusCode, value.statusMessage, - value.rawHeaders ? JSON.stringify(stringifyBufferArray(value.rawHeaders)) : null, + value.headers ? JSON.stringify(value.headers) : null, value.etag ? value.etag : null, value.vary ? JSON.stringify(value.vary) : null, value.cachedAt, diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index 3019ad0e1da..a74cf06af6f 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -33,6 +33,7 @@ class Dispatcher extends EventEmitter { dispatch = wrapInterceptor(dispatch) dispatch = interceptor(dispatch) + dispatch = wrapInterceptor(dispatch) if (dispatch == null || typeof dispatch !== 'function' || dispatch.length !== 2) { throw new TypeError('invalid interceptor') diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 03039bcf54c..82ca34760c3 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -1,7 +1,6 @@ 'use strict' const util = require('../core/util') -const DecoratorHandler = require('../handler/decorator-handler') const { parseCacheControlHeader, parseVaryHeader, @@ -13,7 +12,7 @@ function noop () {} /** * Writes a response to a CacheStore and then passes it on to the next handler */ -class CacheHandler extends DecoratorHandler { +class CacheHandler { /** * @type {import('../../types/cache-interceptor.d.ts').default.CacheKey} */ @@ -42,51 +41,30 @@ class CacheHandler extends DecoratorHandler { constructor (opts, cacheKey, handler) { const { store } = opts - super(handler) - this.#store = store this.#cacheKey = cacheKey this.#handler = handler } - onConnect (...args) { - if (this.#writeStream) { - this.#writeStream.destroy() - this.#writeStream = undefined - } - - if (typeof this.#handler.onConnect === 'function') { - this.#handler.onConnect(...args) - } + onRequestStart (controller, context) { + this.#writeStream?.destroy() + this.#writeStream = undefined + this.#handler.onRequestStart?.(controller, context) } - /** - * @see {DispatchHandlers.onHeaders} - * - * @param {number} statusCode - * @param {Buffer[]} rawHeaders - * @param {() => void} resume - * @param {string} statusMessage - * @returns {boolean} - */ - onHeaders ( + onResponseStart ( + controller, statusCode, - rawHeaders, - resume, - statusMessage + statusMessage, + headers ) { - const downstreamOnHeaders = () => { - if (typeof this.#handler.onHeaders === 'function') { - return this.#handler.onHeaders( - statusCode, - rawHeaders, - resume, - statusMessage - ) - } else { - return true - } - } + const downstreamOnHeaders = () => + this.#handler.onResponseStart?.( + controller, + statusCode, + statusMessage, + headers + ) if ( !util.safeHTTPMethods.includes(this.#cacheKey.method) && @@ -102,9 +80,6 @@ class CacheHandler extends DecoratorHandler { return downstreamOnHeaders() } - const parsedRawHeaders = util.parseRawHeaders(rawHeaders) - const headers = util.parseHeaders(parsedRawHeaders) - const cacheControlHeader = headers['cache-control'] if (!cacheControlHeader) { // Don't have the cache control header or the cache is full @@ -124,11 +99,7 @@ class CacheHandler extends DecoratorHandler { : undefined const deleteAt = determineDeleteAt(now, cacheControlDirectives, staleAt) - const strippedHeaders = stripNecessaryHeaders( - rawHeaders, - parsedRawHeaders, - cacheControlDirectives - ) + const strippedHeaders = stripNecessaryHeaders(headers, cacheControlDirectives) /** * @type {import('../../types/cache-interceptor.d.ts').default.CacheValue} @@ -136,7 +107,7 @@ class CacheHandler extends DecoratorHandler { const value = { statusCode, statusMessage, - rawHeaders: strippedHeaders, + headers: strippedHeaders, vary: varyDirectives, cachedAt: now, staleAt, @@ -152,7 +123,7 @@ class CacheHandler extends DecoratorHandler { if (this.#writeStream) { const handler = this this.#writeStream - .on('drain', resume) + .on('drain', () => controller.resume()) .on('error', function () { // TODO (fix): Make error somehow observable? }) @@ -162,7 +133,7 @@ class CacheHandler extends DecoratorHandler { } // TODO (fix): Should we resume even if was paused downstream? - resume() + controller.resume() }) } } @@ -170,39 +141,17 @@ class CacheHandler extends DecoratorHandler { return downstreamOnHeaders() } - /** - * @see {DispatchHandlers.onData} - * - * @param {Buffer} chunk - * @returns {boolean} - */ - onData (chunk) { - let paused = false - - if (this.#writeStream) { - paused ||= this.#writeStream.write(chunk) === false - } - - if (typeof this.#handler.onData === 'function') { - paused ||= this.#handler.onData(chunk) === false + onResponseData (controller, chunk) { + if (this.#writeStream?.write(chunk) === false) { + controller.pause() } - return !paused + this.#handler.onResponseData?.(controller, chunk) } - /** - * @see {DispatchHandlers.onComplete} - * - * @param {string[] | null} rawTrailers - */ - onComplete (rawTrailers) { - if (this.#writeStream) { - this.#writeStream.end() - } - - if (typeof this.#handler.onComplete === 'function') { - return this.#handler.onComplete(rawTrailers) - } + onResponseEnd (controller, trailers) { + this.#writeStream?.end() + this.#handler.onResponseEnd?.(controller, trailers) } /** @@ -210,15 +159,10 @@ class CacheHandler extends DecoratorHandler { * * @param {Error} err */ - onError (err) { - if (this.#writeStream) { - this.#writeStream.destroy(err) - this.#writeStream = undefined - } - - if (typeof this.#handler.onError === 'function') { - this.#handler.onError(err) - } + onResponseError (controller, err) { + this.#writeStream?.destroy(err) + this.#writeStream = undefined + this.#handler?.onResponseError(controller, err) } } @@ -323,12 +267,11 @@ function determineDeleteAt (now, cacheControlDirectives, staleAt) { /** * Strips headers required to be removed in cached responses - * @param {Buffer[]} rawHeaders - * @param {string[]} parsedRawHeaders + * @param {Record} headers * @param {import('../util/cache.js').CacheControlDirectives} cacheControlDirectives - * @returns {Buffer[]} + * @returns {Record} */ -function stripNecessaryHeaders (rawHeaders, parsedRawHeaders, cacheControlDirectives) { +function stripNecessaryHeaders (headers, cacheControlDirectives) { const headersToRemove = ['connection'] if (Array.isArray(cacheControlDirectives['no-cache'])) { @@ -340,49 +283,13 @@ function stripNecessaryHeaders (rawHeaders, parsedRawHeaders, cacheControlDirect } let strippedHeaders - - let offset = 0 - for (let i = 0; i < parsedRawHeaders.length; i += 2) { - const headerName = parsedRawHeaders[i] - + for (const headerName of Object.keys(headers)) { if (headersToRemove.includes(headerName)) { - // We have at least one header we want to remove - if (!strippedHeaders) { - // This is the first header we want to remove, let's create the array - // Since we're stripping headers, this will over allocate. We'll trim - // it later. - strippedHeaders = new Array(parsedRawHeaders.length) - - // Backfill the previous headers into it - for (let j = 0; j < i; j += 2) { - strippedHeaders[j] = parsedRawHeaders[j] - strippedHeaders[j + 1] = parsedRawHeaders[j + 1] - } - } - - // We can't map indices 1:1 from stripped headers to rawHeaders without - // creating holes (if we skip a header, we now have two holes where at - // element should be). So, let's keep an offset to keep strippedHeaders - // flattened. We can also use this at the end for trimming the empty - // elements off of strippedHeaders. - offset += 2 - - continue - } - - // We want to keep this header. Let's add it to strippedHeaders if it exists - if (strippedHeaders) { - strippedHeaders[i - offset] = parsedRawHeaders[i] - strippedHeaders[i + 1 - offset] = parsedRawHeaders[i + 1] + strippedHeaders ??= { ...headers } + delete headers[headerName] } } - - if (strippedHeaders) { - // Trim off the empty values at the end - strippedHeaders.length -= offset - } - - return strippedHeaders ? util.encodeRawHeaders(strippedHeaders) : rawHeaders + return strippedHeaders ?? headers } module.exports = CacheHandler diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 9e0a7f288bc..956e4c2045e 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -1,7 +1,6 @@ 'use strict' const assert = require('node:assert') -const DecoratorHandler = require('../handler/decorator-handler') /** * This takes care of revalidation requests we send to the origin. If we get @@ -15,9 +14,8 @@ const DecoratorHandler = require('../handler/decorator-handler') * @see https://www.rfc-editor.org/rfc/rfc9111.html#name-validation * * @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandlers} DispatchHandlers - * @implements {DispatchHandlers} */ -class CacheRevalidationHandler extends DecoratorHandler { +class CacheRevalidationHandler { #successful = false /** * @type {((boolean) => void) | null} @@ -28,7 +26,7 @@ class CacheRevalidationHandler extends DecoratorHandler { */ #handler - #abort + #context /** * @param {(boolean) => void} callback Function to call if the cached value is valid @@ -39,31 +37,20 @@ class CacheRevalidationHandler extends DecoratorHandler { throw new TypeError('callback must be a function') } - super(handler) - this.#callback = callback this.#handler = handler } - onConnect (abort) { + onRequestStart (controller, context) { this.#successful = false - this.#abort = abort + this.#context = context } - /** - * @see {DispatchHandlers.onHeaders} - * - * @param {number} statusCode - * @param {Buffer[]} rawHeaders - * @param {() => void} resume - * @param {string} statusMessage - * @returns {boolean} - */ - onHeaders ( + onResponseStart ( + controller, statusCode, - rawHeaders, - resume, - statusMessage + statusMessage, + headers ) { assert(this.#callback != null) @@ -76,61 +63,32 @@ class CacheRevalidationHandler extends DecoratorHandler { return true } - if (typeof this.#handler.onConnect === 'function') { - this.#handler.onConnect(this.#abort) - } - - if (typeof this.#handler.onHeaders === 'function') { - return this.#handler.onHeaders( - statusCode, - rawHeaders, - resume, - statusMessage - ) - } - - return true + this.#handler.onRequestStart?.(controller, this.#context) + this.#handler.onResponseStart?.( + controller, + statusCode, + statusMessage, + headers + ) } - /** - * @see {DispatchHandlers.onData} - * - * @param {Buffer} chunk - * @returns {boolean} - */ - onData (chunk) { + onResponseData (controller, chunk) { if (this.#successful) { - return true - } - - if (typeof this.#handler.onData === 'function') { - return this.#handler.onData(chunk) + return } - return true + return this.#handler.onResponseData(controller, chunk) } - /** - * @see {DispatchHandlers.onComplete} - * - * @param {string[] | null} rawTrailers - */ - onComplete (rawTrailers) { + onResponseEnd (controller, trailers) { if (this.#successful) { return } - if (typeof this.#handler.onComplete === 'function') { - this.#handler.onComplete(rawTrailers) - } + this.#handler.onResponseEnd?.(controller, trailers) } - /** - * @see {DispatchHandlers.onError} - * - * @param {Error} err - */ - onError (err) { + onResponseError (controller, err) { if (this.#successful) { return } @@ -140,8 +98,8 @@ class CacheRevalidationHandler extends DecoratorHandler { this.#callback = null } - if (typeof this.#handler.onError === 'function') { - this.#handler.onError(err) + if (typeof this.#handler.onResponseError === 'function') { + this.#handler.onResponseError(controller, err) } else { throw err } diff --git a/lib/handler/unwrap-handler.js b/lib/handler/unwrap-handler.js index 5fc46b004d0..4b5d87a5d7d 100644 --- a/lib/handler/unwrap-handler.js +++ b/lib/handler/unwrap-handler.js @@ -59,7 +59,7 @@ module.exports = class UnwrapHandler { static unwrap (handler) { // TODO (fix): More checks... - return handler.onConnect ? handler : new UnwrapHandler(handler) + return !handler.onRequestStart ? handler : new UnwrapHandler(handler) } onConnect (abort, context) { @@ -73,7 +73,7 @@ module.exports = class UnwrapHandler { onHeaders (statusCode, rawHeaders, resume, statusMessage) { this.#controller[kResume] = resume - this.#handler.onResponseStart?.(this.#controller, statusCode, parseHeaders(rawHeaders)) + this.#handler.onResponseStart?.(this.#controller, statusCode, statusMessage, parseHeaders(rawHeaders)) return !this.#controller.paused } diff --git a/lib/handler/wrap-handler.js b/lib/handler/wrap-handler.js index 036d9271b52..baa202dc479 100644 --- a/lib/handler/wrap-handler.js +++ b/lib/handler/wrap-handler.js @@ -60,7 +60,7 @@ module.exports = class WrapHandler { this.#handler.onUpgrade?.(statusCode, rawHeaders, socket) } - onResponseHeaders (controller, statusCode, statusMessage, headers) { + onResponseStart (controller, statusCode, statusMessage, headers) { const rawHeaders = [] for (const [key, val] of Object.entries(headers)) { // TODO (fix): What if val is Array @@ -88,11 +88,11 @@ module.exports = class WrapHandler { this.#handler.onComplete?.(rawTrailers) } - onResponseError (controller, error) { + onResponseError (controller, err) { if (!this.#handler.onError) { throw new InvalidArgumentError('invalid onError method') } - this.#handler.onError?.(error) + this.#handler.onError?.(err) } } diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 90c92300844..f8474add775 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -8,8 +8,6 @@ const MemoryCacheStore = require('../cache/memory-cache-store') const CacheRevalidationHandler = require('../handler/cache-revalidation-handler') const { assertCacheStore, assertCacheMethods, makeCacheKey, parseCacheControlHeader } = require('../util/cache.js') -const AGE_HEADER = Buffer.from('age') - /** * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler */ @@ -148,7 +146,7 @@ module.exports = (opts = {}) => { * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result * @param {number} age */ - const respondWithCachedValue = ({ rawHeaders, statusCode, statusMessage, body }, age) => { + const respondWithCachedValue = ({ headers, statusCode, statusMessage, body }, age) => { const stream = util.isStream(body) ? body : Readable.from(body ?? []) @@ -156,52 +154,60 @@ module.exports = (opts = {}) => { assert(!stream.destroyed, 'stream should not be destroyed') assert(!stream.readableDidRead, 'stream should not be readableDidRead') + const controller = { + resume () { + stream.resume() + }, + pause () { + stream.pause() + }, + get aborted () { + return stream.destroyed + }, + get reason () { + return stream.errored + }, + abort (reason) { + stream.destroy(reason) + } + } + stream .on('error', function (err) { if (!this.readableEnded) { - if (typeof handler.onError === 'function') { - handler.onError(err) + if (typeof handler.onResponseError === 'function') { + handler.onResponseError(controller, err) } else { throw err } } }) .on('close', function () { - if (!this.errored && typeof handler.onComplete === 'function') { - handler.onComplete([]) + if (!this.errored) { + handler.onResponseEnd?.(controller, {}) } }) - if (typeof handler.onConnect === 'function') { - handler.onConnect((err) => { - stream.destroy(err) - }) + handler.onRequestStart?.(controller) - if (stream.destroyed) { - return - } + if (stream.destroyed) { + return } - if (typeof handler.onHeaders === 'function') { - // Add the age header - // https://www.rfc-editor.org/rfc/rfc9111.html#name-age - const age = Math.round((Date.now() - result.cachedAt) / 1000) - - // TODO (fix): What if rawHeaders already contains age header? - rawHeaders = [...rawHeaders, AGE_HEADER, Buffer.from(`${age}`)] - - if (handler.onHeaders(statusCode, rawHeaders, () => stream?.resume(), statusMessage) === false) { - stream.pause() - } + // Add the age header + // https://www.rfc-editor.org/rfc/rfc9111.html#name-age + headers = { + ...headers, + age: String(Math.round((Date.now() - result.cachedAt) / 1000)) } + handler.onResponseStart?.(controller, statusCode, statusMessage, headers) + if (opts.method === 'HEAD') { stream.destroy() } else { stream.on('data', function (chunk) { - if (typeof handler.onData === 'function' && !handler.onData(chunk)) { - stream.pause() - } + handler.onResponseData?.(controller, chunk) }) } } diff --git a/lib/util/cache.js b/lib/util/cache.js index b316e7e86e7..97dda5a8058 100644 --- a/lib/util/cache.js +++ b/lib/util/cache.js @@ -80,8 +80,8 @@ function assertCacheValue (value) { throw new TypeError(`expected value.statusMessage to be string, got ${typeof value.statusMessage}`) } - if (!Array.isArray(value.rawHeaders)) { - throw new TypeError(`expected value.rawHeaders to be array, got ${typeof value.rawHeaders}`) + if (value.headers != null && typeof value.headers !== 'object') { + throw new TypeError(`expected value.rawHeaders to be object, got ${typeof value.headers}`) } if (value.vary !== undefined && typeof value.vary !== 'object') { diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index 20fac41ee23..9302a5a7d2a 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -32,7 +32,7 @@ declare namespace CacheHandler { export interface CacheValue { statusCode: number statusMessage: string - rawHeaders: Buffer[] + headers: Record vary?: Record etag?: string cachedAt: number @@ -49,7 +49,7 @@ declare namespace CacheHandler { type GetResult = { statusCode: number statusMessage: string - rawHeaders: Buffer[] + headers: Record etag?: string body: null | Readable | Iterable | AsyncIterable | Buffer | Iterable | AsyncIterable | string cachedAt: number From 5ad623806587ab010f6cb2c016c6f945350d046d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 25 Nov 2024 09:52:34 +0100 Subject: [PATCH 3/6] fixup --- lib/cache/memory-cache-store.js | 2 +- lib/dispatcher/dispatcher.js | 1 - lib/handler/cache-revalidation-handler.js | 6 +++--- lib/interceptor/cache.js | 18 ++++++++++-------- .../cache-store-test-utils.js | 13 ++++++------- test/types/cache-interceptor.test-d.ts | 4 ++-- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/cache/memory-cache-store.js b/lib/cache/memory-cache-store.js index aa552da2abd..78c1c90439c 100644 --- a/lib/cache/memory-cache-store.js +++ b/lib/cache/memory-cache-store.js @@ -87,7 +87,7 @@ class MemoryCacheStore { : { statusMessage: entry.statusMessage, statusCode: entry.statusCode, - rawHeaders: entry.rawHeaders, + headers: entry.headers, body: entry.body, etag: entry.etag, cachedAt: entry.cachedAt, diff --git a/lib/dispatcher/dispatcher.js b/lib/dispatcher/dispatcher.js index a74cf06af6f..824dfb6d822 100644 --- a/lib/dispatcher/dispatcher.js +++ b/lib/dispatcher/dispatcher.js @@ -31,7 +31,6 @@ class Dispatcher extends EventEmitter { throw new TypeError(`invalid interceptor, expected function received ${typeof interceptor}`) } - dispatch = wrapInterceptor(dispatch) dispatch = interceptor(dispatch) dispatch = wrapInterceptor(dispatch) diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 956e4c2045e..1175a2c4f91 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -18,7 +18,7 @@ const assert = require('node:assert') class CacheRevalidationHandler { #successful = false /** - * @type {((boolean) => void) | null} + * @type {((boolean, any) => void) | null} */ #callback /** @@ -29,7 +29,7 @@ class CacheRevalidationHandler { #context /** - * @param {(boolean) => void} callback Function to call if the cached value is valid + * @param {(boolean, any) => void} callback Function to call if the cached value is valid * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler */ constructor (callback, handler) { @@ -56,7 +56,7 @@ class CacheRevalidationHandler { // https://www.rfc-editor.org/rfc/rfc9111.html#name-handling-a-validation-respo this.#successful = statusCode === 304 - this.#callback(this.#successful) + this.#callback(this.#successful, this.#context) this.#callback = null if (this.#successful) { diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index f8474add775..4ceb2bc8f4e 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -146,7 +146,7 @@ module.exports = (opts = {}) => { * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result * @param {number} age */ - const respondWithCachedValue = ({ headers, statusCode, statusMessage, body }, age) => { + const respondWithCachedValue = ({ headers, statusCode, statusMessage, body }, age, context) => { const stream = util.isStream(body) ? body : Readable.from(body ?? []) @@ -161,6 +161,9 @@ module.exports = (opts = {}) => { pause () { stream.pause() }, + get paused () { + return stream.isPaused() + }, get aborted () { return stream.destroyed }, @@ -188,7 +191,7 @@ module.exports = (opts = {}) => { } }) - handler.onRequestStart?.(controller) + handler.onRequestStart?.(controller, context) if (stream.destroyed) { return @@ -196,10 +199,8 @@ module.exports = (opts = {}) => { // Add the age header // https://www.rfc-editor.org/rfc/rfc9111.html#name-age - headers = { - ...headers, - age: String(Math.round((Date.now() - result.cachedAt) / 1000)) - } + // TODO (fix): What if headers.age already exists? + headers = age != null ? { ...headers, age: String(age) } : headers handler.onResponseStart?.(controller, statusCode, statusMessage, headers) @@ -248,9 +249,9 @@ module.exports = (opts = {}) => { } }, new CacheRevalidationHandler( - (success) => { + (success, context) => { if (success) { - respondWithCachedValue(result, age) + respondWithCachedValue(result, age, context) } else if (util.isStream(result.body)) { result.body.on('error', () => {}).destroy() } @@ -264,6 +265,7 @@ module.exports = (opts = {}) => { if (util.isStream(opts.body)) { opts.body.on('error', () => {}).destroy() } + respondWithCachedValue(result, age) } diff --git a/test/cache-interceptor/cache-store-test-utils.js b/test/cache-interceptor/cache-store-test-utils.js index 09cbb1be08d..e04d899f575 100644 --- a/test/cache-interceptor/cache-store-test-utils.js +++ b/test/cache-interceptor/cache-store-test-utils.js @@ -1,7 +1,7 @@ 'use strict' const { describe, test } = require('node:test') -const { deepStrictEqual, notEqual, equal, ok } = require('node:assert') +const { deepStrictEqual, notEqual, equal } = require('node:assert') const { Readable } = require('node:stream') const { once } = require('node:events') @@ -14,7 +14,6 @@ function cacheStoreTests (CacheStore) { describe(CacheStore.prototype.constructor.name, () => { test('matches interface', async () => { const store = new CacheStore() - ok(['boolean', 'undefined'].includes(typeof store.isFull)) equal(typeof store.get, 'function') equal(typeof store.createWriteStream, 'function') equal(typeof store.delete, 'function') @@ -31,7 +30,7 @@ function cacheStoreTests (CacheStore) { const requestValue = { statusCode: 200, statusMessage: '', - rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + headers: { foo: 'bar' }, cachedAt: Date.now(), staleAt: Date.now() + 10000, deleteAt: Date.now() + 20000 @@ -71,7 +70,7 @@ function cacheStoreTests (CacheStore) { const anotherValue = { statusCode: 200, statusMessage: '', - rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + headers: { foo: 'bar' }, cachedAt: Date.now(), staleAt: Date.now() + 10000, deleteAt: Date.now() + 20000 @@ -109,7 +108,7 @@ function cacheStoreTests (CacheStore) { const requestValue = { statusCode: 200, statusMessage: '', - rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + headers: { foo: 'bar' }, cachedAt: Date.now() - 10000, staleAt: Date.now() - 1, deleteAt: Date.now() + 20000 @@ -144,7 +143,7 @@ function cacheStoreTests (CacheStore) { statusCode: 200, statusMessage: '', cachedAt: Date.now() - 20000, - rawHeaders: [], + headers: {}, staleAt: Date.now() - 10000, deleteAt: Date.now() - 5 } @@ -174,7 +173,7 @@ function cacheStoreTests (CacheStore) { const requestValue = { statusCode: 200, statusMessage: '', - rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + headers: { foo: 'bar' }, vary: { 'some-header': 'hello world' }, diff --git a/test/types/cache-interceptor.test-d.ts b/test/types/cache-interceptor.test-d.ts index 3bbd35a3163..b219a528750 100644 --- a/test/types/cache-interceptor.test-d.ts +++ b/test/types/cache-interceptor.test-d.ts @@ -24,7 +24,7 @@ expectAssignable({ store, methods: ['GET'] }) expectAssignable({ statusCode: 200, statusMessage: 'OK', - rawHeaders: [], + headers: {}, cachedAt: 0, staleAt: 0, deleteAt: 0 @@ -33,7 +33,7 @@ expectAssignable({ expectAssignable({ statusCode: 200, statusMessage: 'OK', - rawHeaders: [], + headers: {}, vary: {}, cachedAt: 0, staleAt: 0, From ef233af58906b9aa90873ffe718766cda8465ee0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 25 Nov 2024 10:08:49 +0100 Subject: [PATCH 4/6] fixup --- lib/handler/cache-handler.js | 13 ++++------- lib/handler/cache-revalidation-handler.js | 6 ++--- lib/interceptor/cache.js | 5 ++-- types/dispatcher.d.ts | 28 +++++++++++++++++++++-- 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 82ca34760c3..3097ab124c9 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -10,7 +10,7 @@ const { function noop () {} /** - * Writes a response to a CacheStore and then passes it on to the next handler + * @implements {import('../../types/dispatcher.d.ts').default.DispatchHandler} */ class CacheHandler { /** @@ -24,7 +24,7 @@ class CacheHandler { #store /** - * @type {import('../../types/dispatcher.d.ts').default.DispatchHandlers} + * @type {import('../../types/dispatcher.d.ts').default.DispatchHandler} */ #handler @@ -36,7 +36,7 @@ class CacheHandler { /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheHandlerOptions} opts * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} cacheKey - * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandler} handler */ constructor (opts, cacheKey, handler) { const { store } = opts @@ -154,15 +154,10 @@ class CacheHandler { this.#handler.onResponseEnd?.(controller, trailers) } - /** - * @see {DispatchHandlers.onError} - * - * @param {Error} err - */ onResponseError (controller, err) { this.#writeStream?.destroy(err) this.#writeStream = undefined - this.#handler?.onResponseError(controller, err) + this.#handler.onResponseError?.(controller, err) } } diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 1175a2c4f91..59d6109e5c7 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -13,7 +13,7 @@ const assert = require('node:assert') * * @see https://www.rfc-editor.org/rfc/rfc9111.html#name-validation * - * @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandlers} DispatchHandlers + * @implements {import('../../types/dispatcher.d.ts').default.DispatchHandler} */ class CacheRevalidationHandler { #successful = false @@ -22,7 +22,7 @@ class CacheRevalidationHandler { */ #callback /** - * @type {(import('../../types/dispatcher.d.ts').default.DispatchHandlers)} + * @type {(import('../../types/dispatcher.d.ts').default.DispatchHandler)} */ #handler @@ -30,7 +30,7 @@ class CacheRevalidationHandler { /** * @param {(boolean, any) => void} callback Function to call if the cached value is valid - * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandler} handler */ constructor (callback, handler) { if (typeof callback !== 'function') { diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 4ceb2bc8f4e..91d5496f0c7 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -7,9 +7,10 @@ const CacheHandler = require('../handler/cache-handler') const MemoryCacheStore = require('../cache/memory-cache-store') const CacheRevalidationHandler = require('../handler/cache-revalidation-handler') const { assertCacheStore, assertCacheMethods, makeCacheKey, parseCacheControlHeader } = require('../util/cache.js') +const { AbortError } = require('../core/errors.js') /** - * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandler} handler */ function sendGatewayTimeout (handler) { let aborted = false @@ -171,7 +172,7 @@ module.exports = (opts = {}) => { return stream.errored }, abort (reason) { - stream.destroy(reason) + stream.destroy(reason ?? new AbortError()) } } diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 069fed63ac4..06d2d5d8651 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -15,7 +15,7 @@ export default Dispatcher /** Dispatcher is the core API used to dispatch requests. */ declare class Dispatcher extends EventEmitter { /** Dispatches a request. This API is expected to evolve through semver-major versions and is less stable than the preceding higher level APIs. It is primarily intended for library developers who implement higher level APIs on top of this. */ - dispatch (options: Dispatcher.DispatchOptions, handler: Dispatcher.DispatchHandlers): boolean + dispatch (options: Dispatcher.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean /** Starts two-way communications with the requested resource. */ connect(options: Dispatcher.ConnectOptions): Promise> connect(options: Dispatcher.ConnectOptions, callback: (err: Error | null, data: Dispatcher.ConnectData) => void): void @@ -213,22 +213,46 @@ declare namespace Dispatcher { context: object; } export type StreamFactory = (data: StreamFactoryData) => Writable - export interface DispatchHandlers { + + export interface DispatchController { + get aborted () : boolean + get paused () : boolean + get reason () : Error | null + abort (reason: Error): void + pause(): void + resume(): void + } + + export interface DispatchHandler { + onRequestStart?(controller: DispatchController, context: any) + onResponseStart?(controller: DispatchController, statusCode: number, statusMessage?: string, headers: IncomingHttpHeaders) + onResponseData?(controller: DispatchController, chunk: Buffer) + onResponseEnd?(controller: DispatchController, trailers: IncomingHttpHeaders) + onResponseError?(controller: DispatchController, error: Error) + /** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */ + /** @deprecated */ onConnect?(abort: (err?: Error) => void): void; /** Invoked when an error has occurred. */ + /** @deprecated */ onError?(err: Error): void; /** Invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. */ + /** @deprecated */ onUpgrade?(statusCode: number, headers: Buffer[] | string[] | null, socket: Duplex): void; /** Invoked when response is received, before headers have been read. **/ + /** @deprecated */ onResponseStarted?(): void; /** Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. */ + /** @deprecated */ onHeaders?(statusCode: number, headers: Buffer[], resume: () => void, statusText: string): boolean; /** Invoked when response payload data is received. */ + /** @deprecated */ onData?(chunk: Buffer): boolean; /** Invoked when response payload and trailers have been received and the request has completed. */ + /** @deprecated */ onComplete?(trailers: string[] | null): void; /** Invoked when a body chunk is sent to the server. May be invoked multiple times for chunked requests */ + /** @deprecated */ onBodySent?(chunkSize: number, totalBytesSent: number): void; } export type PipelineHandler = (data: PipelineHandlerData) => Readable From a3094ac652f2067821c6de9346b1b7aab855c949 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 25 Nov 2024 10:48:19 +0100 Subject: [PATCH 5/6] fixup --- docs/docs/api/Dispatcher.md | 14 ++++++-------- lib/handler/unwrap-handler.js | 2 +- lib/handler/wrap-handler.js | 2 +- types/dispatcher.d.ts | 11 ++++++----- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/docs/docs/api/Dispatcher.md b/docs/docs/api/Dispatcher.md index b85008eb14c..4bfbfd36d35 100644 --- a/docs/docs/api/Dispatcher.md +++ b/docs/docs/api/Dispatcher.md @@ -205,14 +205,12 @@ Returns: `Boolean` - `false` if dispatcher is busy and further dispatch calls wo #### Parameter: `DispatchHandler` -* **onConnect** `(abort: () => void, context: object) => void` - Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. -* **onError** `(error: Error) => void` - Invoked when an error has occurred. May not throw. -* **onUpgrade** `(statusCode: number, headers: Buffer[], socket: Duplex) => void` (optional) - Invoked when request is upgraded. Required if `DispatchOptions.upgrade` is defined or `DispatchOptions.method === 'CONNECT'`. -* **onResponseStarted** `() => void` (optional) - Invoked when response is received, before headers have been read. -* **onHeaders** `(statusCode: number, headers: Buffer[], resume: () => void, statusText: string) => boolean` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests. -* **onData** `(chunk: Buffer) => boolean` - Invoked when response payload data is received. Not required for `upgrade` requests. -* **onComplete** `(trailers: Buffer[]) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests. -* **onBodySent** `(chunk: string | Buffer | Uint8Array) => void` - Invoked when a body chunk is sent to the server. Not required. For a stream or iterable body this will be invoked for every chunk. For other body types, it will be invoked once after the body is sent. +* **onRequestStart** `(controller: DispatchController, context: object) => void` - Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. +* **onRequestUpgrade** `(controller: DispatchController, statusCode: number, headers: Record, socket: Duplex) => void` (optional) - Invoked when request is upgraded. Required if `DispatchOptions.upgrade` is defined or `DispatchOptions.method === 'CONNECT'`. +* **onResponseStart** `(controller: DispatchController, statusCode: number, statusMessage?: string, headers: Record) => void` - Invoked when statusCode and headers have been received. May be invoked multiple times due to 1xx informational headers. Not required for `upgrade` requests. +* **onResponseData** `(controller: DispatchController, chunk: Buffer) => void` - Invoked when response payload data is received. Not required for `upgrade` requests. +* **onResponseEnd** `(controller: DispatchController, trailers: Record) => void` - Invoked when response payload and trailers have been received and the request has completed. Not required for `upgrade` requests. +* **onResponseError** `(error: Error) => void` - Invoked when an error has occurred. May not throw. #### Example 1 - Dispatch GET request diff --git a/lib/handler/unwrap-handler.js b/lib/handler/unwrap-handler.js index 4b5d87a5d7d..3b38ac264d0 100644 --- a/lib/handler/unwrap-handler.js +++ b/lib/handler/unwrap-handler.js @@ -68,7 +68,7 @@ module.exports = class UnwrapHandler { } onUpgrade (statusCode, rawHeaders, socket) { - this.#handler.onRequestUpgrade?.(statusCode, parseHeaders(rawHeaders), socket) + this.#handler.onRequestUpgrade?.(this.#controller, statusCode, parseHeaders(rawHeaders), socket) } onHeaders (statusCode, rawHeaders, resume, statusMessage) { diff --git a/lib/handler/wrap-handler.js b/lib/handler/wrap-handler.js index baa202dc479..3bf49f1be16 100644 --- a/lib/handler/wrap-handler.js +++ b/lib/handler/wrap-handler.js @@ -50,7 +50,7 @@ module.exports = class WrapHandler { this.#handler.onConnect?.((reason) => controller.abort(reason), context) } - onRequestUpgrade (statusCode, headers, socket) { + onRequestUpgrade (controller, statusCode, headers, socket) { const rawHeaders = [] for (const [key, val] of Object.entries(headers)) { // TODO (fix): What if val is Array diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 06d2d5d8651..6b7c7e31b67 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -224,11 +224,12 @@ declare namespace Dispatcher { } export interface DispatchHandler { - onRequestStart?(controller: DispatchController, context: any) - onResponseStart?(controller: DispatchController, statusCode: number, statusMessage?: string, headers: IncomingHttpHeaders) - onResponseData?(controller: DispatchController, chunk: Buffer) - onResponseEnd?(controller: DispatchController, trailers: IncomingHttpHeaders) - onResponseError?(controller: DispatchController, error: Error) + onRequestStart?(controller: DispatchController, context: any): void; + onRequestUpgrade?(controller: DispatchController, statusCode: number, headers: IncomingHttpHeaders, socket: Duplex): void; + onResponseStart?(controller: DispatchController, statusCode: number, statusMessage?: string, headers: IncomingHttpHeaders): void; + onResponseData?(controller: DispatchController, chunk: Buffer): void; + onResponseEnd?(controller: DispatchController, trailers: IncomingHttpHeaders): void; + onResponseError?(controller: DispatchController, error: Error): void; /** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */ /** @deprecated */ From f294bd2b52065a80ebe51e90560c75b87bc7577f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 25 Nov 2024 11:08:57 +0100 Subject: [PATCH 6/6] fixup --- docs/docs/api/RedirectHandler.md | 2 +- docs/docs/api/RetryHandler.md | 4 ++-- lib/handler/cache-handler.js | 4 ++++ lib/handler/cache-revalidation-handler.js | 4 ++++ lib/interceptor/cache.js | 2 +- test/types/client.test-d.ts | 2 +- test/types/dispatcher.test-d.ts | 6 +++--- test/types/index.test-d.ts | 2 +- types/agent.d.ts | 2 +- types/dispatcher.d.ts | 2 +- types/env-http-proxy-agent.d.ts | 2 +- types/handlers.d.ts | 8 ++++---- types/mock-agent.d.ts | 2 +- types/mock-client.d.ts | 2 +- types/mock-pool.d.ts | 2 +- types/proxy-agent.d.ts | 2 +- types/retry-handler.d.ts | 4 ++-- 17 files changed, 30 insertions(+), 22 deletions(-) diff --git a/docs/docs/api/RedirectHandler.md b/docs/docs/api/RedirectHandler.md index 90a937e7c13..bb16284fff4 100644 --- a/docs/docs/api/RedirectHandler.md +++ b/docs/docs/api/RedirectHandler.md @@ -16,7 +16,7 @@ Returns: `RedirectHandler` ### Parameters -- **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandlers) => Promise` (required) - Dispatch function to be called after every redirection. +- **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandler) => Promise` (required) - Dispatch function to be called after every redirection. - **maxRedirections** `number` (required) - Maximum number of redirections allowed. - **opts** `object` (required) - Options for handling redirection. - **handler** `object` (required) - Handlers for different stages of the request lifecycle. diff --git a/docs/docs/api/RetryHandler.md b/docs/docs/api/RetryHandler.md index c24da75645c..04a621de33c 100644 --- a/docs/docs/api/RetryHandler.md +++ b/docs/docs/api/RetryHandler.md @@ -43,8 +43,8 @@ It represents the retry state for a given request. ### Parameter `RetryHandlers` -- **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandlers) => Promise` (required) - Dispatch function to be called after every retry. -- **handler** Extends [`Dispatch.DispatchHandlers`](/docs/docs/api/Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted. +- **dispatch** `(options: Dispatch.DispatchOptions, handlers: Dispatch.DispatchHandler) => Promise` (required) - Dispatch function to be called after every retry. +- **handler** Extends [`Dispatch.DispatchHandler`](/docs/docs/api/Dispatcher.md#dispatcherdispatchoptions-handler) (required) - Handler function to be called after the request is successful or the retries are exhausted. >__Note__: The `RetryHandler` does not retry over stateful bodies (e.g. streams, AsyncIterable) as those, once consumed, are left in a state that cannot be reutilized. For these situations the `RetryHandler` will identify >the body as stateful and will not retry the request rejecting with the error `UND_ERR_REQ_RETRY`. diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 3097ab124c9..7c3a7da0bb6 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -52,6 +52,10 @@ class CacheHandler { this.#handler.onRequestStart?.(controller, context) } + onRequestUpgrade (controller, statusCode, headers, socket) { + this.#handler.onRequestUpgrade?.(controller, statusCode, headers, socket) + } + onResponseStart ( controller, statusCode, diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js index 59d6109e5c7..9e45268bfec 100644 --- a/lib/handler/cache-revalidation-handler.js +++ b/lib/handler/cache-revalidation-handler.js @@ -46,6 +46,10 @@ class CacheRevalidationHandler { this.#context = context } + onRequestUpgrade (controller, statusCode, headers, socket) { + this.#handler.onRequestUpgrade?.(controller, statusCode, headers, socket) + } + onResponseStart ( controller, statusCode, diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 91d5496f0c7..8da8468b04b 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -267,7 +267,7 @@ module.exports = (opts = {}) => { opts.body.on('error', () => {}).destroy() } - respondWithCachedValue(result, age) + respondWithCachedValue(result, age, null) } if (typeof result.then === 'function') { diff --git a/test/types/client.test-d.ts b/test/types/client.test-d.ts index 1c0f558a790..3e6d9c060f8 100644 --- a/test/types/client.test-d.ts +++ b/test/types/client.test-d.ts @@ -73,7 +73,7 @@ expectAssignable(new Client('', { expectAssignable(dispatcher) return (opts, handlers) => { expectAssignable(opts) - expectAssignable(handlers) + expectAssignable(handlers) return dispatcher(opts, handlers) } }] diff --git a/test/types/dispatcher.test-d.ts b/test/types/dispatcher.test-d.ts index 58dda692fdc..77f5320c1c4 100644 --- a/test/types/dispatcher.test-d.ts +++ b/test/types/dispatcher.test-d.ts @@ -178,7 +178,7 @@ expectAssignable(new Dispatcher().compose( expectAssignable(dispatcher) return (opts, handlers) => { expectAssignable(opts) - expectAssignable(handlers) + expectAssignable(handlers) return dispatcher(opts, handlers) } } @@ -188,7 +188,7 @@ expectAssignable(new Dispatcher().compose([ expectAssignable(dispatcher) return (opts, handlers) => { expectAssignable(opts) - expectAssignable(handlers) + expectAssignable(handlers) return dispatcher(opts, handlers) } }, @@ -196,7 +196,7 @@ expectAssignable(new Dispatcher().compose([ expectAssignable(dispatcher) return (opts, handlers) => { expectAssignable(opts) - expectAssignable(handlers) + expectAssignable(handlers) return dispatcher(opts, handlers) } } diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index 3dbfcf48fe5..fa33864641d 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -17,7 +17,7 @@ expectAssignable(Undici.interceptors.re expectAssignable(Undici.interceptors.cache()) const client = new Undici.Client('', {}) -const handler: Dispatcher.DispatchHandlers = {} +const handler: Dispatcher.DispatchHandler = {} const redirectHandler = new Undici.RedirectHandler(client, 10, { path: '/', method: 'GET' diff --git a/types/agent.d.ts b/types/agent.d.ts index 74d9d5493a4..ee313b5209b 100644 --- a/types/agent.d.ts +++ b/types/agent.d.ts @@ -11,7 +11,7 @@ declare class Agent extends Dispatcher { /** `true` after `dispatcher.destroyed()` has been called or `dispatcher.close()` has been called and the dispatcher shutdown has completed. */ destroyed: boolean /** Dispatches a request. */ - dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandlers): boolean + dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean } declare namespace Agent { diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 6b7c7e31b67..3c3a32c47f0 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -226,7 +226,7 @@ declare namespace Dispatcher { export interface DispatchHandler { onRequestStart?(controller: DispatchController, context: any): void; onRequestUpgrade?(controller: DispatchController, statusCode: number, headers: IncomingHttpHeaders, socket: Duplex): void; - onResponseStart?(controller: DispatchController, statusCode: number, statusMessage?: string, headers: IncomingHttpHeaders): void; + onResponseStart?(controller: DispatchController, statusCode: number, statusMessage: string | null, headers: IncomingHttpHeaders): void; onResponseData?(controller: DispatchController, chunk: Buffer): void; onResponseEnd?(controller: DispatchController, trailers: IncomingHttpHeaders): void; onResponseError?(controller: DispatchController, error: Error): void; diff --git a/types/env-http-proxy-agent.d.ts b/types/env-http-proxy-agent.d.ts index f5b1dc43060..28fbb846a35 100644 --- a/types/env-http-proxy-agent.d.ts +++ b/types/env-http-proxy-agent.d.ts @@ -6,7 +6,7 @@ export default EnvHttpProxyAgent declare class EnvHttpProxyAgent extends Dispatcher { constructor (opts?: EnvHttpProxyAgent.Options) - dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandlers): boolean + dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean } declare namespace EnvHttpProxyAgent { diff --git a/types/handlers.d.ts b/types/handlers.d.ts index e95451540c9..a165f26c7d7 100644 --- a/types/handlers.d.ts +++ b/types/handlers.d.ts @@ -1,15 +1,15 @@ import Dispatcher from './dispatcher' -export declare class RedirectHandler implements Dispatcher.DispatchHandlers { +export declare class RedirectHandler implements Dispatcher.DispatchHandler { constructor ( dispatch: Dispatcher, maxRedirections: number, opts: Dispatcher.DispatchOptions, - handler: Dispatcher.DispatchHandlers, + handler: Dispatcher.DispatchHandler, redirectionLimitReached: boolean ) } -export declare class DecoratorHandler implements Dispatcher.DispatchHandlers { - constructor (handler: Dispatcher.DispatchHandlers) +export declare class DecoratorHandler implements Dispatcher.DispatchHandler { + constructor (handler: Dispatcher.DispatchHandler) } diff --git a/types/mock-agent.d.ts b/types/mock-agent.d.ts index 311b28b2db0..e92c7bea860 100644 --- a/types/mock-agent.d.ts +++ b/types/mock-agent.d.ts @@ -17,7 +17,7 @@ declare class MockAgent(origin: RegExp): TInterceptable get(origin: ((origin: string) => boolean)): TInterceptable /** Dispatches a mocked request. */ - dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandlers): boolean + dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean /** Closes the mock agent and waits for registered mock pools and clients to also close before resolving. */ close (): Promise /** Disables mocking in MockAgent. */ diff --git a/types/mock-client.d.ts b/types/mock-client.d.ts index 704e48a6eec..88e16d9fb4f 100644 --- a/types/mock-client.d.ts +++ b/types/mock-client.d.ts @@ -11,7 +11,7 @@ declare class MockClient extends Client implements Interceptable { /** Intercepts any matching requests that use the same origin as this mock client. */ intercept (options: MockInterceptor.Options): MockInterceptor /** Dispatches a mocked request. */ - dispatch (options: Dispatcher.DispatchOptions, handlers: Dispatcher.DispatchHandlers): boolean + dispatch (options: Dispatcher.DispatchOptions, handlers: Dispatcher.DispatchHandler): boolean /** Closes the mock client and gracefully waits for enqueued requests to complete. */ close (): Promise } diff --git a/types/mock-pool.d.ts b/types/mock-pool.d.ts index 7ef52767366..5a9d9cb274f 100644 --- a/types/mock-pool.d.ts +++ b/types/mock-pool.d.ts @@ -11,7 +11,7 @@ declare class MockPool extends Pool implements Interceptable { /** Intercepts any matching requests that use the same origin as this mock pool. */ intercept (options: MockInterceptor.Options): MockInterceptor /** Dispatches a mocked request. */ - dispatch (options: Dispatcher.DispatchOptions, handlers: Dispatcher.DispatchHandlers): boolean + dispatch (options: Dispatcher.DispatchOptions, handlers: Dispatcher.DispatchHandler): boolean /** Closes the mock pool and gracefully waits for enqueued requests to complete. */ close (): Promise } diff --git a/types/proxy-agent.d.ts b/types/proxy-agent.d.ts index 5350dbc2e60..7d39f971c90 100644 --- a/types/proxy-agent.d.ts +++ b/types/proxy-agent.d.ts @@ -8,7 +8,7 @@ export default ProxyAgent declare class ProxyAgent extends Dispatcher { constructor (options: ProxyAgent.Options | string) - dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandlers): boolean + dispatch (options: Agent.DispatchOptions, handler: Dispatcher.DispatchHandler): boolean close (): Promise } diff --git a/types/retry-handler.d.ts b/types/retry-handler.d.ts index c4471cd6189..988e74b2b02 100644 --- a/types/retry-handler.d.ts +++ b/types/retry-handler.d.ts @@ -2,7 +2,7 @@ import Dispatcher from './dispatcher' export default RetryHandler -declare class RetryHandler implements Dispatcher.DispatchHandlers { +declare class RetryHandler implements Dispatcher.DispatchHandler { constructor ( options: Dispatcher.DispatchOptions & { retryOptions?: RetryHandler.RetryOptions; @@ -111,6 +111,6 @@ declare namespace RetryHandler { export interface RetryHandlers { dispatch: Dispatcher['dispatch']; - handler: Dispatcher.DispatchHandlers; + handler: Dispatcher.DispatchHandler; } }