diff --git a/lib/cache/memory-cache-store.js b/lib/cache/memory-cache-store.js index dfd74b12b5a..6a00568ca22 100644 --- a/lib/cache/memory-cache-store.js +++ b/lib/cache/memory-cache-store.js @@ -1,6 +1,6 @@ 'use strict' -const { Writable, Readable } = require('node:stream') +const { Writable } = require('node:stream') /** * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore @@ -81,24 +81,7 @@ class MemoryCacheStore { return undefined } - /** - * @type {Readable | undefined} - */ - let readable - if (value.body) { - readable = new Readable() - - for (const chunk of value.body) { - readable.push(chunk) - } - - readable.push(null) - } - - return { - response: value.opts, - body: readable - } + return { ...value.opts, body: value.body } } /** @@ -242,7 +225,7 @@ class MemoryCacheStore { /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key */ - deleteByKey (key) { + delete (key) { this.#data.delete(`${key.origin}:${key.path}`) } diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 94f1ce99873..bea9e775a50 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -94,7 +94,7 @@ class CacheHandler extends DecoratorHandler { ) { // https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response try { - this.#store.deleteByKey(this.#cacheKey).catch?.(noop) + this.#store.delete(this.#cacheKey).catch?.(noop) } catch { // Fail silently } @@ -135,43 +135,31 @@ class CacheHandler extends DecoratorHandler { cacheControlDirectives ) - if (this.#cacheKey.method === 'HEAD') { - this.#store.createWriteStream(this.#cacheKey, { - statusCode, - statusMessage, - rawHeaders: strippedHeaders, - vary: varyDirectives, - cachedAt: now, - staleAt, - deleteAt - }) - } else { - this.#writeStream = this.#store.createWriteStream(this.#cacheKey, { - statusCode, - statusMessage, - rawHeaders: strippedHeaders, - vary: varyDirectives, - cachedAt: now, - staleAt, - deleteAt - }) - - if (this.#writeStream) { - const handler = this - this.#writeStream - .on('drain', resume) - .on('error', function () { + this.#writeStream = this.#store.createWriteStream(this.#cacheKey, { + statusCode, + statusMessage, + rawHeaders: strippedHeaders, + vary: varyDirectives, + cachedAt: now, + staleAt, + deleteAt + }) + + if (this.#writeStream) { + const handler = this + this.#writeStream + .on('drain', resume) + .on('error', function () { // TODO (fix): Make error somehow observable? - }) - .on('close', function () { - if (handler.#writeStream === this) { - handler.#writeStream = undefined - } - - // TODO (fix): Should we resume even if was paused downstream? - resume() - }) - } + }) + .on('close', function () { + if (handler.#writeStream === this) { + handler.#writeStream = undefined + } + + // TODO (fix): Should we resume even if was paused downstream? + resume() + }) } } diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index fca9ee831e1..cb0bc6fa650 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -1,6 +1,7 @@ 'use strict' const assert = require('node:assert') +const { Readable } = require('node:stream') const util = require('../core/util') const CacheHandler = require('../handler/cache-handler') const MemoryCacheStore = require('../cache/memory-cache-store') @@ -57,95 +58,88 @@ module.exports = (opts = {}) => { // Where body can be a Buffer, string, stream or blob? const result = store.get(cacheKey) if (!result) { - // Request isn't cached return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) } /** - * @param {import('node:stream').Readable | undefined} stream - * @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} value + * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result */ - const respondWithCachedValue = (stream, value) => { - assert(!stream || !stream.destroyed, 'stream should not be destroyed') - assert(!stream || !stream.readableDidRead, 'stream should not be readableDidRead') - try { - stream - ?.on('error', function (err) { - if (!this.readableEnded) { - if (typeof handler.onError === 'function') { - handler.onError(err) - } else { - process.nextTick(() => { - throw err - }) - } - } - }) - .on('close', function () { - if (!this.errored && typeof handler.onComplete === 'function') { - handler.onComplete([]) + const respondWithCachedValue = ({ cachedAt, rawHeaders, statusCode, statusMessage, body }) => { + const stream = util.isStream(body) + ? body + : Readable.from(body ?? []) + + assert(!stream.destroyed, 'stream should not be destroyed') + assert(!stream.readableDidRead, 'stream should not be readableDidRead') + + stream + .on('error', function (err) { + if (!this.readableEnded) { + if (typeof handler.onError === 'function') { + handler.onError(err) + } else { + throw err } - }) + } + }) + .on('close', function () { + if (!this.errored && typeof handler.onComplete === 'function') { + handler.onComplete([]) + } + }) - if (typeof handler.onConnect === 'function') { - handler.onConnect((err) => { - stream?.destroy(err) - }) + if (typeof handler.onConnect === 'function') { + handler.onConnect((err) => { + stream.destroy(err) + }) - if (stream?.destroyed) { - return - } + if (stream.destroyed) { + return } + } - if (typeof handler.onHeaders === 'function') { - // Add the age header - // https://www.rfc-editor.org/rfc/rfc9111.html#name-age - const age = Math.round((Date.now() - value.cachedAt) / 1000) + if (typeof handler.onHeaders === 'function') { + // Add the age header + // https://www.rfc-editor.org/rfc/rfc9111.html#name-age + const age = Math.round((Date.now() - cachedAt) / 1000) - // TODO (fix): What if rawHeaders already contains age header? - const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)] + // TODO (fix): What if rawHeaders already contains age header? + rawHeaders = [...rawHeaders, AGE_HEADER, Buffer.from(`${age}`)] - if (handler.onHeaders(value.statusCode, rawHeaders, () => stream?.resume(), value.statusMessage) === false) { - stream?.pause() - } + if (handler.onHeaders(statusCode, rawHeaders, () => stream?.resume(), statusMessage) === false) { + stream.pause() } + } - if (opts.method === 'HEAD') { - if (typeof handler.onComplete === 'function') { - handler.onComplete([]) + if (opts.method === 'HEAD') { + stream.destroy() + } else { + stream.on('data', function (chunk) { + if (typeof handler.onData === 'function' && !handler.onData(chunk)) { + stream.pause() } - - stream?.destroy() - } else { - stream.on('data', function (chunk) { - if (typeof handler.onData === 'function' && !handler.onData(chunk)) { - stream.pause() - } - }) - } - } catch (err) { - stream?.destroy(err) + }) } } /** * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result */ - const handleStream = (result) => { - const { response: value, body: stream } = result + const handleResult = (result) => { + // TODO (perf): Readable.from path can be optimized... - if (!stream && opts.method !== 'HEAD') { + if (!result.body && opts.method !== 'HEAD') { throw new Error('stream is undefined but method isn\'t HEAD') } // Check if the response is stale const now = Date.now() - if (now < value.staleAt) { + if (now < result.staleAt) { // Dump request body. if (util.isStream(opts.body)) { opts.body.on('error', () => {}).destroy() } - respondWithCachedValue(stream, value) + respondWithCachedValue(result) } else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { // If body is is stream we can't revalidate... // TODO (fix): This could be less strict... @@ -157,15 +151,15 @@ module.exports = (opts = {}) => { ...opts, headers: { ...opts.headers, - 'if-modified-since': new Date(value.cachedAt).toUTCString() + 'if-modified-since': new Date(result.cachedAt).toUTCString() } }, new CacheRevalidationHandler( (success) => { if (success) { - respondWithCachedValue(stream, value) - } else { - stream.on('error', () => {}).destroy() + respondWithCachedValue(result) + } else if (util.isStream(result.body)) { + result.body.on('error', () => {}).destroy() } }, new CacheHandler(globalOpts, cacheKey, handler) @@ -177,14 +171,19 @@ module.exports = (opts = {}) => { if (typeof result.then === 'function') { result.then((result) => { if (!result) { - // Request isn't cached - return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) + dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) + } else { + handleResult(result) } - - handleStream(result) - }).catch(err => handler.onError(err)) + }, err => { + if (typeof handler.onError === 'function') { + handler.onError(err) + } else { + throw err + } + }) } else { - handleStream(result) + handleResult(result) } return true diff --git a/lib/util/cache.js b/lib/util/cache.js index 0386a450a27..cb5e84051b8 100644 --- a/lib/util/cache.js +++ b/lib/util/cache.js @@ -210,7 +210,7 @@ function assertCacheStore (store, name = 'CacheStore') { throw new TypeError(`expected type of ${name} to be a CacheStore, got ${store === null ? 'null' : typeof store}`) } - for (const fn of ['get', 'createWriteStream', 'deleteByKey']) { + for (const fn of ['get', 'createWriteStream', 'delete']) { if (typeof store[fn] !== 'function') { throw new TypeError(`${name} needs to have a \`${fn}()\` function`) } diff --git a/test/cache-interceptor/cache-stores.js b/test/cache-interceptor/cache-stores.js index d5d6b7f5239..3fd4d3cc0f3 100644 --- a/test/cache-interceptor/cache-stores.js +++ b/test/cache-interceptor/cache-stores.js @@ -2,6 +2,7 @@ const { describe, test } = require('node:test') const { deepStrictEqual, notEqual, equal } = require('node:assert') +const { Readable } = require('node:stream') const { once } = require('node:events') const MemoryCacheStore = require('../../lib/cache/memory-cache-store') @@ -17,7 +18,7 @@ function cacheStoreTests (CacheStore) { equal(typeof store.isFull, 'boolean') equal(typeof store.get, 'function') equal(typeof store.createWriteStream, 'function') - equal(typeof store.deleteByKey, 'function') + equal(typeof store.delete, 'function') }) // Checks that it can store & fetch different responses @@ -268,9 +269,11 @@ function writeResponse (stream, body) { * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result * @returns {Promise} */ -async function readResponse ({ response, body: stream }) { +async function readResponse ({ body: src, ...response }) { notEqual(response, undefined) - notEqual(stream, undefined) + notEqual(src, undefined) + + const stream = Readable.from(src ?? []) /** * @type {Buffer[]} diff --git a/test/interceptors/cache.js b/test/interceptors/cache.js index a116b9af13c..5e7b3baedb9 100644 --- a/test/interceptors/cache.js +++ b/test/interceptors/cache.js @@ -251,7 +251,7 @@ describe('Cache Interceptor', () => { }) }) - test('unsafe methods call the store\'s deleteByKey function', async () => { + test('unsafe methods call the store\'s delete function', async () => { const server = createServer((_, res) => { res.end('asd') }).listen(0) @@ -259,13 +259,13 @@ describe('Cache Interceptor', () => { after(() => server.close()) await once(server, 'listening') - let deleteByKeyCalled = false + let deleteCalled = false const store = new cacheStores.MemoryCacheStore() - const originalDeleteByKey = store.deleteByKey.bind(store) - store.deleteByKey = (key) => { - deleteByKeyCalled = true - originalDeleteByKey(key) + const originaldelete = store.delete.bind(store) + store.delete = (key) => { + deleteCalled = true + originaldelete(key) } const client = new Client(`http://localhost:${server.address().port}`) @@ -281,7 +281,7 @@ describe('Cache Interceptor', () => { path: '/' }) - equal(deleteByKeyCalled, false) + equal(deleteCalled, false) // Make sure other safe methods that we don't want to cache don't cause a cache purge await client.request({ @@ -290,11 +290,11 @@ describe('Cache Interceptor', () => { path: '/' }) - strictEqual(deleteByKeyCalled, false) + strictEqual(deleteCalled, false) // Make sure the common unsafe methods cause cache purges for (const method of ['POST', 'PUT', 'PATCH', 'DELETE']) { - deleteByKeyCalled = false + deleteCalled = false await client.request({ origin: 'localhost', @@ -302,7 +302,7 @@ describe('Cache Interceptor', () => { path: '/' }) - equal(deleteByKeyCalled, true, method) + equal(deleteCalled, true, method) } }) diff --git a/test/types/cache-interceptor.test-d.ts b/test/types/cache-interceptor.test-d.ts index e51b42395cc..4d9ad4b99a2 100644 --- a/test/types/cache-interceptor.test-d.ts +++ b/test/types/cache-interceptor.test-d.ts @@ -13,7 +13,7 @@ const store: CacheInterceptor.CacheStore = { throw new Error('stub') }, - deleteByKey (_: CacheInterceptor.CacheKey): void | Promise { + delete (_: CacheInterceptor.CacheKey): void | Promise { throw new Error('stub') } } diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index 4e49814783a..1c30b42d359 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -31,10 +31,7 @@ declare namespace CacheHandler { path: string } - export interface GetResult { - response: CachedResponse - body?: Readable - } + type GetResult = CachedResponse & { body: null | Readable | Iterable | Buffer | Iterable | string } /** * Underlying storage provider for cached responses @@ -49,7 +46,7 @@ declare namespace CacheHandler { createWriteStream(key: CacheKey, value: CachedResponse): Writable | undefined - deleteByKey(key: CacheKey): void | Promise; + delete(key: CacheKey): void | Promise } export interface CachedResponse { @@ -60,20 +57,20 @@ declare namespace CacheHandler { * Headers defined by the Vary header and their respective values for * later comparison */ - vary?: Record; + vary?: Record /** * Time in millis that this value was cached */ - cachedAt: number; + cachedAt: number /** * Time in millis that this value is considered stale */ - staleAt: number; + staleAt: number /** * Time in millis that this value is to be deleted from the cache. This is * either the same as staleAt or the `max-stale` caching directive. */ - deleteAt: number; + deleteAt: number } export interface MemoryCacheStoreOpts { @@ -91,12 +88,12 @@ declare namespace CacheHandler { export class MemoryCacheStore implements CacheStore { constructor (opts?: MemoryCacheStoreOpts) - get isFull (): boolean + get isFull (): boolean | undefined get (key: CacheKey): GetResult | Promise | undefined createWriteStream (key: CacheKey, value: CachedResponse): Writable | undefined - deleteByKey (uri: DeleteByUri): void + delete (key: CacheKey): void | Promise } }