diff --git a/x-pack/plugins/observability/server/services/slo/transform_manager.test.ts b/x-pack/plugins/observability/server/services/slo/transform_manager.test.ts index dafa4bff18a52..1badb6b08e49f 100644 --- a/x-pack/plugins/observability/server/services/slo/transform_manager.test.ts +++ b/x-pack/plugins/observability/server/services/slo/transform_manager.test.ts @@ -6,10 +6,14 @@ */ /* eslint-disable max-classes-per-file */ -import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; -import { ElasticsearchClient } from '@kbn/core/server'; +import { + ElasticsearchClientMock, + elasticsearchServiceMock, + loggingSystemMock, +} from '@kbn/core/server/mocks'; import { MockedLogger } from '@kbn/logging-mocks'; import { TransformPutTransformRequest } from '@elastic/elasticsearch/lib/api/types'; +import { errors as EsErrors } from '@elastic/elasticsearch'; import { DefaultTransformManager } from './transform_manager'; import { @@ -22,7 +26,7 @@ import { createAPMTransactionErrorRateIndicator, createSLO } from './fixtures/sl const SPACE_ID = 'space-id'; describe('TransformManager', () => { - let esClientMock: jest.Mocked; + let esClientMock: ElasticsearchClientMock; let loggerMock: jest.Mocked; beforeEach(() => { @@ -158,6 +162,26 @@ describe('TransformManager', () => { expect(esClientMock.transform.deleteTransform).toHaveBeenCalledTimes(1); }); + + it('retries on transient error', async () => { + esClientMock.transform.deleteTransform.mockRejectedValueOnce( + new EsErrors.ConnectionError('irrelevant') + ); + // @ts-ignore defining only a subset of the possible SLI + const generators: Record = { + 'slo.apm.transaction_error_rate': new ApmTransactionErrorRateTransformGenerator(), + }; + const transformManager = new DefaultTransformManager( + generators, + esClientMock, + loggerMock, + SPACE_ID + ); + + await transformManager.uninstall('slo-transform-id'); + + expect(esClientMock.transform.deleteTransform).toHaveBeenCalledTimes(2); + }); }); }); diff --git a/x-pack/plugins/observability/server/services/slo/transform_manager.ts b/x-pack/plugins/observability/server/services/slo/transform_manager.ts index 178d6dacaa433..ab7799a4a00c6 100644 --- a/x-pack/plugins/observability/server/services/slo/transform_manager.ts +++ b/x-pack/plugins/observability/server/services/slo/transform_manager.ts @@ -8,6 +8,7 @@ import { ElasticsearchClient, Logger } from '@kbn/core/server'; import { SLO, SLITypes } from '../../types/models'; +import { retryTransientEsErrors } from '../../utils/retry'; import { TransformGenerator } from './transform_generators'; type TransformId = string; @@ -36,7 +37,9 @@ export class DefaultTransformManager implements TransformManager { const transformParams = generator.getTransformParams(slo, this.spaceId); try { - await this.esClient.transform.putTransform(transformParams); + await retryTransientEsErrors(() => this.esClient.transform.putTransform(transformParams), { + logger: this.logger, + }); } catch (err) { this.logger.error(`Cannot create transform for ${slo.indicator.type} SLO type: ${err}`); throw err; @@ -47,9 +50,10 @@ export class DefaultTransformManager implements TransformManager { async start(transformId: TransformId): Promise { try { - await this.esClient.transform.startTransform( - { transform_id: transformId }, - { ignore: [409] } + await retryTransientEsErrors( + () => + this.esClient.transform.startTransform({ transform_id: transformId }, { ignore: [409] }), + { logger: this.logger } ); } catch (err) { this.logger.error(`Cannot start transform id ${transformId}: ${err}`); @@ -59,9 +63,13 @@ export class DefaultTransformManager implements TransformManager { async stop(transformId: TransformId): Promise { try { - await this.esClient.transform.stopTransform( - { transform_id: transformId, wait_for_completion: true }, - { ignore: [404] } + await retryTransientEsErrors( + () => + this.esClient.transform.stopTransform( + { transform_id: transformId, wait_for_completion: true }, + { ignore: [404] } + ), + { logger: this.logger } ); } catch (err) { this.logger.error(`Cannot stop transform id ${transformId}: ${err}`); @@ -71,9 +79,13 @@ export class DefaultTransformManager implements TransformManager { async uninstall(transformId: TransformId): Promise { try { - await this.esClient.transform.deleteTransform( - { transform_id: transformId, force: true }, - { ignore: [404] } + await retryTransientEsErrors( + () => + this.esClient.transform.deleteTransform( + { transform_id: transformId, force: true }, + { ignore: [404] } + ), + { logger: this.logger } ); } catch (err) { this.logger.error(`Cannot delete transform id ${transformId}: ${err}`); diff --git a/x-pack/plugins/observability/server/utils/retry.test.ts b/x-pack/plugins/observability/server/utils/retry.test.ts new file mode 100644 index 0000000000000..25870fd24aa73 --- /dev/null +++ b/x-pack/plugins/observability/server/utils/retry.test.ts @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +jest.mock('timers/promises'); +import { setTimeout } from 'timers/promises'; +import { loggerMock } from '@kbn/logging-mocks'; +import { errors as EsErrors } from '@elastic/elasticsearch'; + +import { retryTransientEsErrors } from './retry'; + +const setTimeoutMock = setTimeout as jest.Mock< + ReturnType, + Parameters +>; + +describe('retryTransientErrors', () => { + beforeEach(() => { + setTimeoutMock.mockClear(); + }); + + it("doesn't retry if operation is successful", async () => { + const esCallMock = jest.fn().mockResolvedValue('success'); + expect(await retryTransientEsErrors(esCallMock)).toEqual('success'); + expect(esCallMock).toHaveBeenCalledTimes(1); + }); + + it('logs an warning message on retry', async () => { + const logger = loggerMock.create(); + const esCallMock = jest + .fn() + .mockRejectedValueOnce(new EsErrors.ConnectionError('foo')) + .mockResolvedValue('success'); + + await retryTransientEsErrors(esCallMock, { logger }); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn.mock.calls[0][0]).toMatch( + `Retrying Elasticsearch operation after [2s] due to error: ConnectionError: foo ConnectionError: foo` + ); + }); + + it('retries with an exponential backoff', async () => { + let attempt = 0; + const esCallMock = jest.fn(async () => { + attempt++; + if (attempt < 5) { + throw new EsErrors.ConnectionError('foo'); + } else { + return 'success'; + } + }); + + expect(await retryTransientEsErrors(esCallMock)).toEqual('success'); + expect(setTimeoutMock.mock.calls).toEqual([[2000], [4000], [8000], [16000]]); + expect(esCallMock).toHaveBeenCalledTimes(5); + }); + + it('retries each supported error type', async () => { + const errors = [ + new EsErrors.NoLivingConnectionsError('no living connection', { + warnings: [], + meta: {} as any, + }), + new EsErrors.ConnectionError('no connection'), + new EsErrors.TimeoutError('timeout'), + new EsErrors.ResponseError({ statusCode: 503, meta: {} as any, warnings: [] }), + new EsErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] }), + new EsErrors.ResponseError({ statusCode: 410, meta: {} as any, warnings: [] }), + ]; + + for (const error of errors) { + const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success'); + expect(await retryTransientEsErrors(esCallMock)).toEqual('success'); + expect(esCallMock).toHaveBeenCalledTimes(2); + } + }); + + it('does not retry unsupported errors', async () => { + const error = new Error('foo!'); + const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success'); + await expect(retryTransientEsErrors(esCallMock)).rejects.toThrow(error); + expect(esCallMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/x-pack/plugins/observability/server/utils/retry.ts b/x-pack/plugins/observability/server/utils/retry.ts new file mode 100644 index 0000000000000..421289d1c0479 --- /dev/null +++ b/x-pack/plugins/observability/server/utils/retry.ts @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { setTimeout } from 'timers/promises'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import type { Logger } from '@kbn/logging'; + +const MAX_ATTEMPTS = 5; + +const retryResponseStatuses = [ + 503, // ServiceUnavailable + 408, // RequestTimeout + 410, // Gone +]; + +const isRetryableError = (e: any) => + e instanceof EsErrors.NoLivingConnectionsError || + e instanceof EsErrors.ConnectionError || + e instanceof EsErrors.TimeoutError || + (e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!)); + +/** + * Retries any transient network or configuration issues encountered from Elasticsearch with an exponential backoff. + * Should only be used to wrap operations that are idempotent and can be safely executed more than once. + */ +export const retryTransientEsErrors = async ( + esCall: () => Promise, + { logger, attempt = 0 }: { logger?: Logger; attempt?: number } = {} +): Promise => { + try { + return await esCall(); + } catch (e) { + if (attempt < MAX_ATTEMPTS && isRetryableError(e)) { + const retryCount = attempt + 1; + const retryDelaySec = Math.min(Math.pow(2, retryCount), 64); // 2s, 4s, 8s, 16s, 32s, 64s, 64s, 64s ... + + logger?.warn( + `Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${ + e.stack + }` + ); + + await setTimeout(retryDelaySec * 1000); + return retryTransientEsErrors(esCall, { logger, attempt: retryCount }); + } + + throw e; + } +};