From b7af0ef996b681f221792dde14a484e425bb454a Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Tue, 4 Jun 2024 15:36:41 -0500 Subject: [PATCH 1/5] Add exponential backoff to request retries --- src/Transport.ts | 27 ++++++++++++++++++++++++++- src/connection/HttpConnection.ts | 5 +---- src/symbols.ts | 1 + src/util.ts | 22 ++++++++++++++++++++++ test/unit/transport.test.ts | 10 ++++++++-- 5 files changed, 58 insertions(+), 7 deletions(-) create mode 100644 src/util.ts diff --git a/src/Transport.ts b/src/Transport.ts index 6340533..9e4ff06 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -75,8 +75,10 @@ import { kJsonContentType, kNdjsonContentType, kAcceptHeader, - kRedaction + kRedaction, + kRetryBackoff } from './symbols' +import { sleep } from './util' const { version: clientVersion } = require('../package.json') // eslint-disable-line const debug = Debug('elasticsearch') @@ -216,6 +218,7 @@ export default class Transport { [kNdjsonContentType]: string [kAcceptHeader]: string [kRedaction]: RedactionOptions + [kRetryBackoff]: (min: number, max: number, attempt: number) => number static sniffReasons = { SNIFF_ON_START: 'sniff-on-start', @@ -277,6 +280,7 @@ export default class Transport { this[kNdjsonContentType] = opts.vendoredHeaders?.ndjsonContentType ?? 'application/x-ndjson' this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain' this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] } + this[kRetryBackoff] = retryBackoff if (opts.sniffOnStart === true) { this.sniff({ @@ -607,6 +611,13 @@ export default class Transport { if (meta.attempts < maxRetries) { meta.attempts++ debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) + + // exponential backoff on retries, with jitter + const backoffWait = this[kRetryBackoff](0, 4, meta.attempts) + if (backoffWait > 0) { + await sleep(backoffWait * 1000) + } + continue } @@ -701,3 +712,17 @@ export function lowerCaseHeaders (oldHeaders?: http.IncomingHttpHeaders): http.I } return newHeaders } + +/** + * Function for calculating how long to sleep, in seconds, before the next request retry + * Uses the AWS "equal jitter" algorithm noted in this post: + * https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + * @param min The minimum number of seconds to wait + * @param max The maximum number of seconds to wait + * @param attempt How many retry attempts have been made + * @returns The number of seconds to wait before the next retry + */ +function retryBackoff (min: number, max: number, attempt: number): number { + const ceiling = Math.min(max, 2 ** attempt) / 2 + return ceiling + ((Math.random() * (ceiling - min)) - min) +} diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index 57fdd12..a5f055f 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -42,6 +42,7 @@ import { RequestAbortedError, TimeoutError } from '../errors' +import { sleep } from '../util' import { HttpAgentOptions } from '../types' const debug = Debug('elasticsearch') @@ -387,7 +388,3 @@ function isHttpAgentOptions (opts: Record): opts is HttpAgentOption if (opts.connections != null) return false return true } - -async function sleep (ms: number): Promise { - return await new Promise((resolve) => setTimeout(resolve, ms)) -} diff --git a/src/symbols.ts b/src/symbols.ts index c8de226..a6ae316 100644 --- a/src/symbols.ts +++ b/src/symbols.ts @@ -47,3 +47,4 @@ export const kJsonContentType = Symbol('json content type') export const kNdjsonContentType = Symbol('ndjson content type') export const kAcceptHeader = Symbol('accept header') export const kRedaction = Symbol('redaction') +export const kRetryBackoff = Symbol('retry backoff') diff --git a/src/util.ts b/src/util.ts new file mode 100644 index 0000000..4e81460 --- /dev/null +++ b/src/util.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function sleep (ms: number): Promise { + return await new Promise((resolve) => setTimeout(resolve, ms)) +} diff --git a/test/unit/transport.test.ts b/test/unit/transport.test.ts index 87234cb..8b64600 100644 --- a/test/unit/transport.test.ts +++ b/test/unit/transport.test.ts @@ -19,8 +19,6 @@ import { test } from 'tap' import buffer from 'buffer' -// import { URL } from 'url' -// import FakeTimers from '@sinonjs/fake-timers' import { promisify } from 'util' import { Readable as ReadableStream } from 'stream' import { gzipSync, deflateSync } from 'zlib' @@ -43,6 +41,7 @@ import { errors } from '../..' import { connection, buildServer } from '../utils' +import { kRetryBackoff } from '../../src/symbols' const { version: transportVersion } = require('../../package.json') // eslint-disable-line const sleep = promisify(setTimeout) @@ -109,6 +108,7 @@ test('Basic error (TimeoutError)', async t => { pool.addConnection('http://localhost:9200') const transport = new Transport({ connectionPool: pool, maxRetries: 0, retryOnTimeout: true }) + transport[kRetryBackoff] = () => 0 try { await transport.request({ @@ -138,6 +138,7 @@ test('Basic error (ConnectionError)', async t => { pool.addConnection('http://localhost:9200') const transport = new Transport({ connectionPool: pool, maxRetries: 0 }) + transport[kRetryBackoff] = () => 0 try { await transport.request({ @@ -729,6 +730,7 @@ test('Retry on timeout error if retryOnTimeout is true', async t => { pool.addConnection('http://localhost:9200') const transport = new Transport({ connectionPool: pool, retryOnTimeout: true }) + transport[kRetryBackoff] = () => 0 try { await transport.request({ @@ -1518,6 +1520,8 @@ test('Calls the sniff method on connection error', async t => { connectionPool: pool, sniffOnConnectionFault: true }) + // skip sleep between retries + transport[kRetryBackoff] = () => 0 try { await transport.request({ @@ -1546,6 +1550,8 @@ test('Calls the sniff method on timeout error if retryOnTimeout is true', async sniffOnConnectionFault: true, retryOnTimeout: true }) + // skip sleep between retries + transport[kRetryBackoff] = () => 0 try { await transport.request({ From e8ce49c34b6cedc026302d52a475ef037cb674eb Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Wed, 5 Jun 2024 14:09:24 -0500 Subject: [PATCH 2/5] Switch from sleep to setTimeout --- src/Transport.ts | 4 ++-- src/connection/HttpConnection.ts | 4 ++-- src/util.ts | 22 ---------------------- 3 files changed, 4 insertions(+), 26 deletions(-) delete mode 100644 src/util.ts diff --git a/src/Transport.ts b/src/Transport.ts index 9e4ff06..cd82f21 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -78,7 +78,7 @@ import { kRedaction, kRetryBackoff } from './symbols' -import { sleep } from './util' +import { setTimeout as setTimeoutPromise } from 'node:timers/promises' const { version: clientVersion } = require('../package.json') // eslint-disable-line const debug = Debug('elasticsearch') @@ -615,7 +615,7 @@ export default class Transport { // exponential backoff on retries, with jitter const backoffWait = this[kRetryBackoff](0, 4, meta.attempts) if (backoffWait > 0) { - await sleep(backoffWait * 1000) + await setTimeoutPromise(backoffWait * 1000) } continue diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index a5f055f..6e67b48 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -42,7 +42,7 @@ import { RequestAbortedError, TimeoutError } from '../errors' -import { sleep } from '../util' +import { setTimeout as setTimeoutPromise } from 'timers/promises' import { HttpAgentOptions } from '../types' const debug = Debug('elasticsearch') @@ -311,7 +311,7 @@ export default class HttpConnection extends BaseConnection { async close (): Promise { debug('Closing connection', this.id) while (this._openRequests > 0) { - await sleep(1000) + await setTimeoutPromise(1000) } /* istanbul ignore else */ if (this.agent !== undefined) { diff --git a/src/util.ts b/src/util.ts deleted file mode 100644 index 4e81460..0000000 --- a/src/util.ts +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -export async function sleep (ms: number): Promise { - return await new Promise((resolve) => setTimeout(resolve, ms)) -} From 3863134867fe8529419caa4c41b2912bff03b7e9 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Wed, 5 Jun 2024 14:23:11 -0500 Subject: [PATCH 3/5] Expose retryBackoff to make testing easier Very hard to mock a retry that uses setTimeout with random jitter --- package.json | 6 ++- src/Transport.ts | 7 +++- test/unit/transport.test.ts | 81 ++++++++++++++++++++++++++----------- 3 files changed, 68 insertions(+), 26 deletions(-) diff --git a/package.json b/package.json index dd9daf1..9b37c32 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "workq": "^3.0.0" }, "dependencies": { + "@tapjs/clock": "^1.1.24", "debug": "^4.3.4", "hpagent": "^1.0.0", "ms": "^2.1.3", @@ -65,6 +66,9 @@ "undici": "^6.12.0" }, "tap": { - "allow-incomplete-coverage": true + "allow-incomplete-coverage": true, + "plugin": [ + "@tapjs/clock" + ] } } diff --git a/src/Transport.ts b/src/Transport.ts index cd82f21..4dd2b14 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -116,6 +116,7 @@ export interface TransportOptions { accept?: string } redaction?: RedactionOptions + retryBackoff?: (min: number, max: number, attempt: number) => number } export interface TransportRequestParams { @@ -164,6 +165,7 @@ export interface TransportRequestOptions { */ meta?: boolean redaction?: RedactionOptions + retryBackoff?: (min: number, max: number, attempt: number) => number } export interface TransportRequestOptionsWithMeta extends TransportRequestOptions { @@ -280,7 +282,7 @@ export default class Transport { this[kNdjsonContentType] = opts.vendoredHeaders?.ndjsonContentType ?? 'application/x-ndjson' this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain' this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] } - this[kRetryBackoff] = retryBackoff + this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff if (opts.sniffOnStart === true) { this.sniff({ @@ -613,7 +615,8 @@ export default class Transport { debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) // exponential backoff on retries, with jitter - const backoffWait = this[kRetryBackoff](0, 4, meta.attempts) + const backoff = options.retryBackoff ?? this[kRetryBackoff] + const backoffWait = backoff(0, 4, meta.attempts) if (backoffWait > 0) { await setTimeoutPromise(backoffWait * 1000) } diff --git a/test/unit/transport.test.ts b/test/unit/transport.test.ts index 8b64600..b6a668b 100644 --- a/test/unit/transport.test.ts +++ b/test/unit/transport.test.ts @@ -41,7 +41,6 @@ import { errors } from '../..' import { connection, buildServer } from '../utils' -import { kRetryBackoff } from '../../src/symbols' const { version: transportVersion } = require('../../package.json') // eslint-disable-line const sleep = promisify(setTimeout) @@ -107,8 +106,12 @@ test('Basic error (TimeoutError)', async t => { const pool = new MyPool({ Connection: MockConnectionTimeout }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool, maxRetries: 0, retryOnTimeout: true }) - transport[kRetryBackoff] = () => 0 + const transport = new Transport({ + connectionPool: pool, + maxRetries: 0, + retryOnTimeout: true, + retryBackoff: () => 0, + }) try { await transport.request({ @@ -137,8 +140,11 @@ test('Basic error (ConnectionError)', async t => { const pool = new MyPool({ Connection: MockConnectionError }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool, maxRetries: 0 }) - transport[kRetryBackoff] = () => 0 + const transport = new Transport({ + connectionPool: pool, + maxRetries: 0, + retryBackoff: () => 0 + }) try { await transport.request({ @@ -710,13 +716,17 @@ test('Retry on connection error', async t => { const pool = new WeightedConnectionPool({ Connection: MockConnectionError }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool }) + const transport = new Transport({ + connectionPool: pool, + retryBackoff: () => 0, + }) try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + await res } catch (err: any) { t.ok(err instanceof ConnectionError) t.equal(err.meta.meta.attempts, 3) @@ -725,18 +735,25 @@ test('Retry on connection error', async t => { test('Retry on timeout error if retryOnTimeout is true', async t => { t.plan(2) + t.clock.enter() + t.teardown(() => t.clock.exit()) const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout }) pool.addConnection('http://localhost:9200') - const transport = new Transport({ connectionPool: pool, retryOnTimeout: true }) - transport[kRetryBackoff] = () => 0 + const transport = new Transport({ + connectionPool: pool, + retryOnTimeout: true, + retryBackoff: () => 0 + }) try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + t.clock.advance(4000) + await res } catch (err: any) { t.ok(err instanceof TimeoutError) t.equal(err.meta.meta.attempts, 3) @@ -1518,16 +1535,16 @@ test('Calls the sniff method on connection error', async t => { const transport = new MyTransport({ connectionPool: pool, - sniffOnConnectionFault: true + sniffOnConnectionFault: true, + retryBackoff: () => 0 }) - // skip sleep between retries - transport[kRetryBackoff] = () => 0 try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + await res } catch (err: any) { t.ok(err instanceof ConnectionError) t.equal(err.meta.meta.attempts, 3) @@ -1537,6 +1554,9 @@ test('Calls the sniff method on connection error', async t => { test('Calls the sniff method on timeout error if retryOnTimeout is true', async t => { t.plan(6) + t.clock.enter() + t.teardown(() => t.clock.exit()) + class MyTransport extends Transport { sniff (opts: SniffOptions): void { t.equal(opts.reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT) @@ -1548,16 +1568,17 @@ test('Calls the sniff method on timeout error if retryOnTimeout is true', async const transport = new MyTransport({ connectionPool: pool, sniffOnConnectionFault: true, - retryOnTimeout: true + retryOnTimeout: true, + retryBackoff: () => 0, }) - // skip sleep between retries - transport[kRetryBackoff] = () => 0 try { - await transport.request({ + const res = transport.request({ method: 'GET', path: '/hello' }) + t.clock.advance(4000) + await res } catch (err: any) { t.ok(err instanceof TimeoutError) t.equal(err.meta.meta.attempts, 3) @@ -1583,6 +1604,8 @@ test('Sniff on start', async t => { test('Sniff interval', async t => { t.plan(5) + t.clock.enter() + t.teardown(() => t.clock.exit()) class MyTransport extends Transport { sniff (opts: SniffOptions): void { @@ -1597,26 +1620,38 @@ test('Sniff interval', async t => { sniffInterval: 50 }) - let res = await transport.request({ + let promise = transport.request({ method: 'GET', path: '/hello' }, { meta: true }) + + t.clock.advance(4000) + + let res = await promise t.equal(res.statusCode, 200) - await sleep(80) + promise = sleep(80) + t.clock.advance(80) + await promise - res = await transport.request({ + promise = transport.request({ method: 'GET', path: '/hello' }, { meta: true }) + t.clock.advance(4000) + res = await promise t.equal(res.statusCode, 200) - await sleep(80) + promise = sleep(80) + t.clock.advance(80) + await promise - res = await transport.request({ + promise = transport.request({ method: 'GET', path: '/hello' }, { meta: true }) + t.clock.advance(4000) + res = await promise t.equal(res.statusCode, 200) }) From 003353e16566db51905abbc8d6b2355361d8d17e Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 6 Jun 2024 12:29:53 -0500 Subject: [PATCH 4/5] Fix backoff calculation Accidentally let the minimum be `min * -1` instead of `min`. Whoops! --- src/Transport.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Transport.ts b/src/Transport.ts index 4dd2b14..8433de2 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -727,5 +727,5 @@ export function lowerCaseHeaders (oldHeaders?: http.IncomingHttpHeaders): http.I */ function retryBackoff (min: number, max: number, attempt: number): number { const ceiling = Math.min(max, 2 ** attempt) / 2 - return ceiling + ((Math.random() * (ceiling - min)) - min) + return ceiling + ((Math.random() * (ceiling - min)) + min) } From 6f790a1c822f7864ed6737fcf76dd7f9c1270a0e Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 6 Jun 2024 12:46:16 -0500 Subject: [PATCH 5/5] Move @tapjs/clock to dev dependencies --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 9b37c32..14437b7 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ }, "devDependencies": { "@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1", + "@tapjs/clock": "^1.1.24", "@types/debug": "^4.1.7", "@types/ms": "^0.7.31", "@types/node": "^18.19.21", @@ -57,7 +58,6 @@ "workq": "^3.0.0" }, "dependencies": { - "@tapjs/clock": "^1.1.24", "debug": "^4.3.4", "hpagent": "^1.0.0", "ms": "^2.1.3",