Skip to content

Commit

Permalink
fix: port retry to new hooks (#3883)
Browse files Browse the repository at this point in the history
* fix: port retry to new hooks

* fixup

* fixup

* fixup

* fixup: unref

* fixup

* fixup
  • Loading branch information
ronag authored Nov 27, 2024
1 parent 3029bd8 commit d006e90
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 178 deletions.
6 changes: 4 additions & 2 deletions lib/dispatcher/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class DispatcherBase extends Dispatcher {
throw new InvalidArgumentError('handler must be an object')
}

handler = UnwrapHandler.unwrap(handler)

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be an object.')
Expand All @@ -143,10 +145,10 @@ class DispatcherBase extends Dispatcher {
throw new ClientClosedError()
}

return this[kDispatch](opts, UnwrapHandler.unwrap(handler))
return this[kDispatch](opts, handler)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
throw err
}

handler.onError(err)
Expand Down
165 changes: 65 additions & 100 deletions lib/handler/retry-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ const assert = require('node:assert')

const { kRetryHandlerDefaultRetry } = require('../core/symbols')
const { RequestRetryError } = require('../core/errors')
const WrapHandler = require('./wrap-handler')
const {
isDisturbed,
parseHeaders,
parseRangeHeader,
wrapRequestBody
} = require('../core/util')
Expand All @@ -16,7 +16,7 @@ function calculateRetryAfterHeader (retryAfter) {
}

class RetryHandler {
constructor (opts, handlers) {
constructor (opts, { dispatch, handler }) {
const { retryOptions, ...dispatchOpts } = opts
const {
// Retry scoped
Expand All @@ -32,12 +32,9 @@ class RetryHandler {
statusCodes
} = retryOptions ?? {}

this.dispatch = handlers.dispatch
this.handler = handlers.handler
this.dispatch = dispatch
this.handler = WrapHandler.wrap(handler)
this.opts = { ...dispatchOpts, body: wrapRequestBody(opts.body) }
this.abort = null
this.aborted = false
this.connectCalled = false
this.retryOpts = {
retry: retryFn ?? RetryHandler[kRetryHandlerDefaultRetry],
retryAfter: retryAfter ?? true,
Expand Down Expand Up @@ -65,37 +62,20 @@ class RetryHandler {

this.retryCount = 0
this.retryCountCheckpoint = 0
this.headersSent = false
this.start = 0
this.end = null
this.etag = null
this.resume = null
}

onRequestSent () {
if (this.handler.onRequestSent) {
this.handler.onRequestSent()
onRequestStart (controller, context) {
if (!this.headersSent) {
this.handler.onRequestStart?.(controller, context)
}
}

onUpgrade (statusCode, headers, socket) {
if (this.handler.onUpgrade) {
this.handler.onUpgrade(statusCode, headers, socket)
}
}

onConnect (abort, context) {
this.abort = abort
if (!this.connectCalled) {
this.connectCalled = true
this.handler.onConnect(reason => {
this.aborted = true
this.abort(reason)
}, context)
}
}

onBodySent (chunk) {
if (this.handler.onBodySent) return this.handler.onBodySent(chunk)
onRequestUpgrade (controller, statusCode, headers, socket) {
this.handler.onRequestUpgrade?.(controller, statusCode, headers, socket)
}

static [kRetryHandlerDefaultRetry] (err, { state, opts }, cb) {
Expand Down Expand Up @@ -153,83 +133,68 @@ class RetryHandler {
? Math.min(retryAfterHeader, maxTimeout)
: Math.min(minTimeout * timeoutFactor ** (counter - 1), maxTimeout)

setTimeout(() => cb(null), retryTimeout)
setTimeout(() => cb(null), retryTimeout).unref()
}

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
const headers = parseHeaders(rawHeaders)

onResponseStart (controller, statusCode, headers, statusMessage) {
this.retryCount += 1

if (statusCode >= 300) {
if (this.retryOpts.statusCodes.includes(statusCode) === false) {
return this.handler.onHeaders(
this.headersSent = true
this.handler.onResponseStart?.(
controller,
statusCode,
rawHeaders,
resume,
headers,
statusMessage
)
return
} else {
this.abort(
new RequestRetryError('Request failed', statusCode, {
headers,
data: {
count: this.retryCount
}
})
)
return false
throw new RequestRetryError('Request failed', statusCode, {
headers,
data: {
count: this.retryCount
}
})
}
}

// Checkpoint for resume from where we left it
if (this.resume != null) {
this.resume = null

if (this.headersSent) {
// Only Partial Content 206 supposed to provide Content-Range,
// any other status code that partially consumed the payload
// should not be retried because it would result in downstream
// wrongly concatenate multiple responses.
if (statusCode !== 206 && (this.start > 0 || statusCode !== 200)) {
this.abort(
new RequestRetryError('server does not support the range header and the payload was partially consumed', statusCode, {
headers,
data: { count: this.retryCount }
})
)
return false
throw new RequestRetryError('server does not support the range header and the payload was partially consumed', statusCode, {
headers,
data: { count: this.retryCount }
})
}

const contentRange = parseRangeHeader(headers['content-range'])
// If no content range
if (!contentRange) {
this.abort(
new RequestRetryError('Content-Range mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
)
return false
throw new RequestRetryError('Content-Range mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
}

// Let's start with a weak etag check
if (this.etag != null && this.etag !== headers.etag) {
this.abort(
new RequestRetryError('ETag mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
)
return false
throw new RequestRetryError('ETag mismatch', statusCode, {
headers,
data: { count: this.retryCount }
})
}

const { start, size, end = size - 1 } = contentRange
const { start, size, end = size ? size - 1 : null } = contentRange

assert(this.start === start, 'content-range mismatch')
assert(this.end == null || this.end === end, 'content-range mismatch')

this.resume = resume
return true
return
}

if (this.end == null) {
Expand All @@ -238,15 +203,17 @@ class RetryHandler {
const range = parseRangeHeader(headers['content-range'])

if (range == null) {
return this.handler.onHeaders(
this.headersSent = true
this.handler.onResponseStart?.(
controller,
statusCode,
rawHeaders,
resume,
headers,
statusMessage
)
return
}

const { start, size, end = size - 1 } = range
const { start, size, end = size ? size - 1 : null } = range
assert(
start != null && Number.isFinite(start),
'content-range mismatch'
Expand All @@ -269,7 +236,7 @@ class RetryHandler {
'invalid content-length'
)

this.resume = resume
this.resume = true
this.etag = headers.etag != null ? headers.etag : null

// Weak etags are not useful for comparison nor cache
Expand All @@ -283,38 +250,36 @@ class RetryHandler {
this.etag = null
}

return this.handler.onHeaders(
this.headersSent = true
this.handler.onResponseStart?.(
controller,
statusCode,
rawHeaders,
resume,
headers,
statusMessage
)
} else {
throw new RequestRetryError('Request failed', statusCode, {
headers,
data: { count: this.retryCount }
})
}

const err = new RequestRetryError('Request failed', statusCode, {
headers,
data: { count: this.retryCount }
})

this.abort(err)

return false
}

onData (chunk) {
onResponseData (controller, chunk) {
this.start += chunk.length

return this.handler.onData(chunk)
this.handler.onResponseData?.(controller, chunk)
}

onComplete (rawTrailers) {
onResponseEnd (controller, trailers) {
this.retryCount = 0
return this.handler.onComplete(rawTrailers)
return this.handler.onResponseEnd?.(controller, trailers)
}

onError (err) {
if (this.aborted || isDisturbed(this.opts.body)) {
return this.handler.onError(err)
onResponseError (controller, err) {
if (!controller || controller.aborted || isDisturbed(this.opts.body)) {
this.handler.onResponseError?.(controller, err)
return
}

// We reconcile in case of a mix between network errors
Expand Down Expand Up @@ -343,8 +308,8 @@ class RetryHandler {
* @returns
*/
function onRetry (err) {
if (err != null || this.aborted || isDisturbed(this.opts.body)) {
return this.handler.onError(err)
if (err != null || controller?.aborted || isDisturbed(this.opts.body)) {
return this.handler.onResponseError?.(controller, err)
}

if (this.start !== 0) {
Expand All @@ -368,7 +333,7 @@ class RetryHandler {
this.retryCountCheckpoint = this.retryCount
this.dispatch(this.opts, this)
} catch (err) {
this.handler.onError(err)
this.handler.onResponseError?.(controller, err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/handler/wrap-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module.exports = class WrapHandler {

onError (err) {
if (!this.#handler.onError) {
throw new InvalidArgumentError('invalid onError method')
throw err
}

return this.#handler.onError?.(err)
Expand Down
2 changes: 1 addition & 1 deletion test/node-test/client-dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ test('dispatch pool onError missing', async (t) => {
})
} catch (err) {
p.strictEqual(err.code, 'UND_ERR_INVALID_ARG')
p.strictEqual(err.message, 'invalid onError method')
p.strictEqual(err.message, 'upgrade must be a string')
}
})

Expand Down
Loading

0 comments on commit d006e90

Please sign in to comment.