Skip to content

Commit

Permalink
fix: cache fixes (#3830)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag authored Nov 13, 2024
1 parent 51836bb commit 28b10fa
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 151 deletions.
23 changes: 3 additions & 20 deletions lib/cache/memory-cache-store.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { Writable, Readable } = require('node:stream')
const { Writable } = require('node:stream')

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore
Expand Down Expand Up @@ -81,24 +81,7 @@ class MemoryCacheStore {
return undefined
}

/**
* @type {Readable | undefined}
*/
let readable
if (value.body) {
readable = new Readable()

for (const chunk of value.body) {
readable.push(chunk)
}

readable.push(null)
}

return {
response: value.opts,
body: readable
}
return { ...value.opts, body: value.body }
}

/**
Expand Down Expand Up @@ -242,7 +225,7 @@ class MemoryCacheStore {
/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
*/
deleteByKey (key) {
delete (key) {
this.#data.delete(`${key.origin}:${key.path}`)
}

Expand Down
62 changes: 25 additions & 37 deletions lib/handler/cache-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CacheHandler extends DecoratorHandler {
) {
// https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response
try {
this.#store.deleteByKey(this.#cacheKey).catch?.(noop)
this.#store.delete(this.#cacheKey).catch?.(noop)
} catch {
// Fail silently
}
Expand Down Expand Up @@ -135,43 +135,31 @@ class CacheHandler extends DecoratorHandler {
cacheControlDirectives
)

if (this.#cacheKey.method === 'HEAD') {
this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})
} else {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
// TODO (fix): Make error somehow observable?
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
}

Expand Down
135 changes: 67 additions & 68 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const assert = require('node:assert')
const { Readable } = require('node:stream')
const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
Expand Down Expand Up @@ -57,95 +58,88 @@ module.exports = (opts = {}) => {
// Where body can be a Buffer, string, stream or blob?
const result = store.get(cacheKey)
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
}

/**
* @param {import('node:stream').Readable | undefined} stream
* @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} value
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
*/
const respondWithCachedValue = (stream, value) => {
assert(!stream || !stream.destroyed, 'stream should not be destroyed')
assert(!stream || !stream.readableDidRead, 'stream should not be readableDidRead')
try {
stream
?.on('error', function (err) {
if (!this.readableEnded) {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
process.nextTick(() => {
throw err
})
}
}
})
.on('close', function () {
if (!this.errored && typeof handler.onComplete === 'function') {
handler.onComplete([])
const respondWithCachedValue = ({ cachedAt, rawHeaders, statusCode, statusMessage, body }) => {
const stream = util.isStream(body)
? body
: Readable.from(body ?? [])

assert(!stream.destroyed, 'stream should not be destroyed')
assert(!stream.readableDidRead, 'stream should not be readableDidRead')

stream
.on('error', function (err) {
if (!this.readableEnded) {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
throw err
}
})
}
})
.on('close', function () {
if (!this.errored && typeof handler.onComplete === 'function') {
handler.onComplete([])
}
})

if (typeof handler.onConnect === 'function') {
handler.onConnect((err) => {
stream?.destroy(err)
})
if (typeof handler.onConnect === 'function') {
handler.onConnect((err) => {
stream.destroy(err)
})

if (stream?.destroyed) {
return
}
if (stream.destroyed) {
return
}
}

if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((Date.now() - value.cachedAt) / 1000)
if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((Date.now() - cachedAt) / 1000)

// TODO (fix): What if rawHeaders already contains age header?
const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]
// TODO (fix): What if rawHeaders already contains age header?
rawHeaders = [...rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]

if (handler.onHeaders(value.statusCode, rawHeaders, () => stream?.resume(), value.statusMessage) === false) {
stream?.pause()
}
if (handler.onHeaders(statusCode, rawHeaders, () => stream?.resume(), statusMessage) === false) {
stream.pause()
}
}

if (opts.method === 'HEAD') {
if (typeof handler.onComplete === 'function') {
handler.onComplete([])
if (opts.method === 'HEAD') {
stream.destroy()
} else {
stream.on('data', function (chunk) {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
stream.pause()
}

stream?.destroy()
} else {
stream.on('data', function (chunk) {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
stream.pause()
}
})
}
} catch (err) {
stream?.destroy(err)
})
}
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
*/
const handleStream = (result) => {
const { response: value, body: stream } = result
const handleResult = (result) => {
// TODO (perf): Readable.from path can be optimized...

if (!stream && opts.method !== 'HEAD') {
if (!result.body && opts.method !== 'HEAD') {
throw new Error('stream is undefined but method isn\'t HEAD')
}

// Check if the response is stale
const now = Date.now()
if (now < value.staleAt) {
if (now < result.staleAt) {
// Dump request body.
if (util.isStream(opts.body)) {
opts.body.on('error', () => {}).destroy()
}
respondWithCachedValue(stream, value)
respondWithCachedValue(result)
} else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) {
// If body is is stream we can't revalidate...
// TODO (fix): This could be less strict...
Expand All @@ -157,15 +151,15 @@ module.exports = (opts = {}) => {
...opts,
headers: {
...opts.headers,
'if-modified-since': new Date(value.cachedAt).toUTCString()
'if-modified-since': new Date(result.cachedAt).toUTCString()
}
},
new CacheRevalidationHandler(
(success) => {
if (success) {
respondWithCachedValue(stream, value)
} else {
stream.on('error', () => {}).destroy()
respondWithCachedValue(result)
} else if (util.isStream(result.body)) {
result.body.on('error', () => {}).destroy()
}
},
new CacheHandler(globalOpts, cacheKey, handler)
Expand All @@ -177,14 +171,19 @@ module.exports = (opts = {}) => {
if (typeof result.then === 'function') {
result.then((result) => {
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
} else {
handleResult(result)
}

handleStream(result)
}).catch(err => handler.onError(err))
}, err => {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
throw err
}
})
} else {
handleStream(result)
handleResult(result)
}

return true
Expand Down
2 changes: 1 addition & 1 deletion lib/util/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ function assertCacheStore (store, name = 'CacheStore') {
throw new TypeError(`expected type of ${name} to be a CacheStore, got ${store === null ? 'null' : typeof store}`)
}

for (const fn of ['get', 'createWriteStream', 'deleteByKey']) {
for (const fn of ['get', 'createWriteStream', 'delete']) {
if (typeof store[fn] !== 'function') {
throw new TypeError(`${name} needs to have a \`${fn}()\` function`)
}
Expand Down
9 changes: 6 additions & 3 deletions test/cache-interceptor/cache-stores.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const { describe, test } = require('node:test')
const { deepStrictEqual, notEqual, equal } = require('node:assert')
const { Readable } = require('node:stream')
const { once } = require('node:events')
const MemoryCacheStore = require('../../lib/cache/memory-cache-store')

Expand All @@ -17,7 +18,7 @@ function cacheStoreTests (CacheStore) {
equal(typeof store.isFull, 'boolean')
equal(typeof store.get, 'function')
equal(typeof store.createWriteStream, 'function')
equal(typeof store.deleteByKey, 'function')
equal(typeof store.delete, 'function')
})

// Checks that it can store & fetch different responses
Expand Down Expand Up @@ -268,9 +269,11 @@ function writeResponse (stream, body) {
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
* @returns {Promise<import('../../types/cache-interceptor.d.ts').default.GetResult | { body: Buffer[] }>}
*/
async function readResponse ({ response, body: stream }) {
async function readResponse ({ body: src, ...response }) {
notEqual(response, undefined)
notEqual(stream, undefined)
notEqual(src, undefined)

const stream = Readable.from(src ?? [])

/**
* @type {Buffer[]}
Expand Down
Loading

0 comments on commit 28b10fa

Please sign in to comment.