diff --git a/lib/cache/memory-cache-store.js b/lib/cache/memory-cache-store.js index dfd74b12b5a..d78d8bbf369 100644 --- a/lib/cache/memory-cache-store.js +++ b/lib/cache/memory-cache-store.js @@ -1,6 +1,6 @@ 'use strict' -const { Writable, Readable } = require('node:stream') +const { Writable } = require('node:stream') /** * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore @@ -81,23 +81,9 @@ class MemoryCacheStore { return undefined } - /** - * @type {Readable | undefined} - */ - let readable - if (value.body) { - readable = new Readable() - - for (const chunk of value.body) { - readable.push(chunk) - } - - readable.push(null) - } - return { response: value.opts, - body: readable + body: value.body } } @@ -242,7 +228,7 @@ class MemoryCacheStore { /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key */ - deleteByKey (key) { + delete (key) { this.#data.delete(`${key.origin}:${key.path}`) } diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index 94f1ce99873..bea9e775a50 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -94,7 +94,7 @@ class CacheHandler extends DecoratorHandler { ) { // https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response try { - this.#store.deleteByKey(this.#cacheKey).catch?.(noop) + this.#store.delete(this.#cacheKey).catch?.(noop) } catch { // Fail silently } @@ -135,43 +135,31 @@ class CacheHandler extends DecoratorHandler { cacheControlDirectives ) - if (this.#cacheKey.method === 'HEAD') { - this.#store.createWriteStream(this.#cacheKey, { - statusCode, - statusMessage, - rawHeaders: strippedHeaders, - vary: varyDirectives, - cachedAt: now, - staleAt, - deleteAt - }) - } else { - this.#writeStream = this.#store.createWriteStream(this.#cacheKey, { - statusCode, - statusMessage, - rawHeaders: strippedHeaders, - vary: varyDirectives, - cachedAt: now, - staleAt, - deleteAt - }) - - if (this.#writeStream) { - const handler = this - this.#writeStream - .on('drain', resume) - .on('error', function () { + this.#writeStream = this.#store.createWriteStream(this.#cacheKey, { + statusCode, + statusMessage, + rawHeaders: strippedHeaders, + vary: varyDirectives, + cachedAt: now, + staleAt, + deleteAt + }) + + if (this.#writeStream) { + const handler = this + this.#writeStream + .on('drain', resume) + .on('error', function () { // TODO (fix): Make error somehow observable? - }) - .on('close', function () { - if (handler.#writeStream === this) { - handler.#writeStream = undefined - } - - // TODO (fix): Should we resume even if was paused downstream? - resume() - }) - } + }) + .on('close', function () { + if (handler.#writeStream === this) { + handler.#writeStream = undefined + } + + // TODO (fix): Should we resume even if was paused downstream? + resume() + }) } } diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index fca9ee831e1..90191003a1b 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -1,6 +1,7 @@ 'use strict' const assert = require('node:assert') +const { Readable } = require('node:stream') const util = require('../core/util') const CacheHandler = require('../handler/cache-handler') const MemoryCacheStore = require('../cache/memory-cache-store') @@ -57,27 +58,25 @@ module.exports = (opts = {}) => { // Where body can be a Buffer, string, stream or blob? const result = store.get(cacheKey) if (!result) { - // Request isn't cached return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) } /** - * @param {import('node:stream').Readable | undefined} stream + * @param {import('node:stream').Readable} stream * @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} value */ - const respondWithCachedValue = (stream, value) => { - assert(!stream || !stream.destroyed, 'stream should not be destroyed') - assert(!stream || !stream.readableDidRead, 'stream should not be readableDidRead') + const respondWithCachedValue = (stream, { cachedAt, rawHeaders, statusCode, statusMessage }) => { + assert(!stream.destroyed, 'stream should not be destroyed') + assert(!stream.readableDidRead, 'stream should not be readableDidRead') + try { stream - ?.on('error', function (err) { + .on('error', function (err) { if (!this.readableEnded) { if (typeof handler.onError === 'function') { handler.onError(err) } else { - process.nextTick(() => { - throw err - }) + throw err } } }) @@ -89,10 +88,10 @@ module.exports = (opts = {}) => { if (typeof handler.onConnect === 'function') { handler.onConnect((err) => { - stream?.destroy(err) + stream.destroy(err) }) - if (stream?.destroyed) { + if (stream.destroyed) { return } } @@ -100,22 +99,18 @@ module.exports = (opts = {}) => { if (typeof handler.onHeaders === 'function') { // Add the age header // https://www.rfc-editor.org/rfc/rfc9111.html#name-age - const age = Math.round((Date.now() - value.cachedAt) / 1000) + const age = Math.round((Date.now() - cachedAt) / 1000) // TODO (fix): What if rawHeaders already contains age header? - const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)] + rawHeaders = [...rawHeaders, AGE_HEADER, Buffer.from(`${age}`)] - if (handler.onHeaders(value.statusCode, rawHeaders, () => stream?.resume(), value.statusMessage) === false) { - stream?.pause() + if (handler.onHeaders(statusCode, rawHeaders, () => stream?.resume(), statusMessage) === false) { + stream.pause() } } if (opts.method === 'HEAD') { - if (typeof handler.onComplete === 'function') { - handler.onComplete([]) - } - - stream?.destroy() + stream.destroy() } else { stream.on('data', function (chunk) { if (typeof handler.onData === 'function' && !handler.onData(chunk)) { @@ -124,7 +119,7 @@ module.exports = (opts = {}) => { }) } } catch (err) { - stream?.destroy(err) + stream.destroy(err) } } @@ -132,7 +127,12 @@ module.exports = (opts = {}) => { * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result */ const handleStream = (result) => { - const { response: value, body: stream } = result + const { response: value, body } = result + + // TODO (perf): Readable.from path can be optimized... + const stream = util.isStream(body) + ? body + : Readable.from(body ?? []) if (!stream && opts.method !== 'HEAD') { throw new Error('stream is undefined but method isn\'t HEAD') @@ -177,12 +177,17 @@ module.exports = (opts = {}) => { if (typeof result.then === 'function') { result.then((result) => { if (!result) { - // Request isn't cached - return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) + dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) + } else { + handleStream(result) } - - handleStream(result) - }).catch(err => handler.onError(err)) + }, err => { + if (typeof handler.onError === 'function') { + handler.onError(err) + } else { + throw err + } + }) } else { handleStream(result) } diff --git a/lib/util/cache.js b/lib/util/cache.js index 0386a450a27..cb5e84051b8 100644 --- a/lib/util/cache.js +++ b/lib/util/cache.js @@ -210,7 +210,7 @@ function assertCacheStore (store, name = 'CacheStore') { throw new TypeError(`expected type of ${name} to be a CacheStore, got ${store === null ? 'null' : typeof store}`) } - for (const fn of ['get', 'createWriteStream', 'deleteByKey']) { + for (const fn of ['get', 'createWriteStream', 'delete']) { if (typeof store[fn] !== 'function') { throw new TypeError(`${name} needs to have a \`${fn}()\` function`) } diff --git a/test/cache-interceptor/cache-stores.js b/test/cache-interceptor/cache-stores.js index d5d6b7f5239..88cf57951f9 100644 --- a/test/cache-interceptor/cache-stores.js +++ b/test/cache-interceptor/cache-stores.js @@ -2,6 +2,7 @@ const { describe, test } = require('node:test') const { deepStrictEqual, notEqual, equal } = require('node:assert') +const { Readable } = require('node:stream') const { once } = require('node:events') const MemoryCacheStore = require('../../lib/cache/memory-cache-store') @@ -17,7 +18,7 @@ function cacheStoreTests (CacheStore) { equal(typeof store.isFull, 'boolean') equal(typeof store.get, 'function') equal(typeof store.createWriteStream, 'function') - equal(typeof store.deleteByKey, 'function') + equal(typeof store.delete, 'function') }) // Checks that it can store & fetch different responses @@ -268,9 +269,11 @@ function writeResponse (stream, body) { * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result * @returns {Promise} */ -async function readResponse ({ response, body: stream }) { +async function readResponse ({ response, body: src }) { notEqual(response, undefined) - notEqual(stream, undefined) + notEqual(src, undefined) + + const stream = Readable.from(src) /** * @type {Buffer[]} diff --git a/test/interceptors/cache.js b/test/interceptors/cache.js index a116b9af13c..5e7b3baedb9 100644 --- a/test/interceptors/cache.js +++ b/test/interceptors/cache.js @@ -251,7 +251,7 @@ describe('Cache Interceptor', () => { }) }) - test('unsafe methods call the store\'s deleteByKey function', async () => { + test('unsafe methods call the store\'s delete function', async () => { const server = createServer((_, res) => { res.end('asd') }).listen(0) @@ -259,13 +259,13 @@ describe('Cache Interceptor', () => { after(() => server.close()) await once(server, 'listening') - let deleteByKeyCalled = false + let deleteCalled = false const store = new cacheStores.MemoryCacheStore() - const originalDeleteByKey = store.deleteByKey.bind(store) - store.deleteByKey = (key) => { - deleteByKeyCalled = true - originalDeleteByKey(key) + const originaldelete = store.delete.bind(store) + store.delete = (key) => { + deleteCalled = true + originaldelete(key) } const client = new Client(`http://localhost:${server.address().port}`) @@ -281,7 +281,7 @@ describe('Cache Interceptor', () => { path: '/' }) - equal(deleteByKeyCalled, false) + equal(deleteCalled, false) // Make sure other safe methods that we don't want to cache don't cause a cache purge await client.request({ @@ -290,11 +290,11 @@ describe('Cache Interceptor', () => { path: '/' }) - strictEqual(deleteByKeyCalled, false) + strictEqual(deleteCalled, false) // Make sure the common unsafe methods cause cache purges for (const method of ['POST', 'PUT', 'PATCH', 'DELETE']) { - deleteByKeyCalled = false + deleteCalled = false await client.request({ origin: 'localhost', @@ -302,7 +302,7 @@ describe('Cache Interceptor', () => { path: '/' }) - equal(deleteByKeyCalled, true, method) + equal(deleteCalled, true, method) } }) diff --git a/test/types/cache-interceptor.test-d.ts b/test/types/cache-interceptor.test-d.ts index e51b42395cc..4d9ad4b99a2 100644 --- a/test/types/cache-interceptor.test-d.ts +++ b/test/types/cache-interceptor.test-d.ts @@ -13,7 +13,7 @@ const store: CacheInterceptor.CacheStore = { throw new Error('stub') }, - deleteByKey (_: CacheInterceptor.CacheKey): void | Promise { + delete (_: CacheInterceptor.CacheKey): void | Promise { throw new Error('stub') } } diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index 4e49814783a..b21e1b7c3ad 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -33,7 +33,7 @@ declare namespace CacheHandler { export interface GetResult { response: CachedResponse - body?: Readable + body?: Readable | Iterable | Buffer | Iterable | string } /** @@ -49,7 +49,7 @@ declare namespace CacheHandler { createWriteStream(key: CacheKey, value: CachedResponse): Writable | undefined - deleteByKey(key: CacheKey): void | Promise; + delete(key: CacheKey): void | Promise } export interface CachedResponse { @@ -60,20 +60,20 @@ declare namespace CacheHandler { * Headers defined by the Vary header and their respective values for * later comparison */ - vary?: Record; + vary?: Record /** * Time in millis that this value was cached */ - cachedAt: number; + cachedAt: number /** * Time in millis that this value is considered stale */ - staleAt: number; + staleAt: number /** * Time in millis that this value is to be deleted from the cache. This is * either the same as staleAt or the `max-stale` caching directive. */ - deleteAt: number; + deleteAt: number } export interface MemoryCacheStoreOpts { @@ -91,12 +91,12 @@ declare namespace CacheHandler { export class MemoryCacheStore implements CacheStore { constructor (opts?: MemoryCacheStoreOpts) - get isFull (): boolean + get isFull (): boolean | undefined get (key: CacheKey): GetResult | Promise | undefined createWriteStream (key: CacheKey, value: CachedResponse): Writable | undefined - deleteByKey (uri: DeleteByUri): void + delete (key: CacheKey): void | Promise } }