Skip to content

Commit

Permalink
Shameless copy of the retryTransientEsErrors from fleet
Browse files Browse the repository at this point in the history
  • Loading branch information
kdelemme committed Sep 21, 2022
1 parent 11588b3 commit f936fb6
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
*/
/* eslint-disable max-classes-per-file */

import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { ElasticsearchClient } from '@kbn/core/server';
import {
ElasticsearchClientMock,
elasticsearchServiceMock,
loggingSystemMock,
} from '@kbn/core/server/mocks';
import { MockedLogger } from '@kbn/logging-mocks';
import { TransformPutTransformRequest } from '@elastic/elasticsearch/lib/api/types';
import { errors as EsErrors } from '@elastic/elasticsearch';

import { DefaultTransformManager } from './transform_manager';
import {
Expand All @@ -22,7 +26,7 @@ import { createAPMTransactionErrorRateIndicator, createSLO } from './fixtures/sl
const SPACE_ID = 'space-id';

describe('TransformManager', () => {
let esClientMock: jest.Mocked<ElasticsearchClient>;
let esClientMock: ElasticsearchClientMock;
let loggerMock: jest.Mocked<MockedLogger>;

beforeEach(() => {
Expand Down Expand Up @@ -158,6 +162,26 @@ describe('TransformManager', () => {

expect(esClientMock.transform.deleteTransform).toHaveBeenCalledTimes(1);
});

it('retries on transient error', async () => {
esClientMock.transform.deleteTransform.mockRejectedValueOnce(
new EsErrors.ConnectionError('irrelevant')
);
// @ts-ignore defining only a subset of the possible SLI
const generators: Record<SLITypes, TransformGenerator> = {
'slo.apm.transaction_error_rate': new ApmTransactionErrorRateTransformGenerator(),
};
const transformManager = new DefaultTransformManager(
generators,
esClientMock,
loggerMock,
SPACE_ID
);

await transformManager.uninstall('slo-transform-id');

expect(esClientMock.transform.deleteTransform).toHaveBeenCalledTimes(2);
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { ElasticsearchClient, Logger } from '@kbn/core/server';

import { SLO, SLITypes } from '../../types/models';
import { retryTransientEsErrors } from '../../utils/retry';
import { TransformGenerator } from './transform_generators';

type TransformId = string;
Expand Down Expand Up @@ -36,7 +37,9 @@ export class DefaultTransformManager implements TransformManager {

const transformParams = generator.getTransformParams(slo, this.spaceId);
try {
await this.esClient.transform.putTransform(transformParams);
await retryTransientEsErrors(() => this.esClient.transform.putTransform(transformParams), {
logger: this.logger,
});
} catch (err) {
this.logger.error(`Cannot create transform for ${slo.indicator.type} SLO type: ${err}`);
throw err;
Expand All @@ -47,9 +50,10 @@ export class DefaultTransformManager implements TransformManager {

async start(transformId: TransformId): Promise<void> {
try {
await this.esClient.transform.startTransform(
{ transform_id: transformId },
{ ignore: [409] }
await retryTransientEsErrors(
() =>
this.esClient.transform.startTransform({ transform_id: transformId }, { ignore: [409] }),
{ logger: this.logger }
);
} catch (err) {
this.logger.error(`Cannot start transform id ${transformId}: ${err}`);
Expand All @@ -59,9 +63,13 @@ export class DefaultTransformManager implements TransformManager {

async stop(transformId: TransformId): Promise<void> {
try {
await this.esClient.transform.stopTransform(
{ transform_id: transformId, wait_for_completion: true },
{ ignore: [404] }
await retryTransientEsErrors(
() =>
this.esClient.transform.stopTransform(
{ transform_id: transformId, wait_for_completion: true },
{ ignore: [404] }
),
{ logger: this.logger }
);
} catch (err) {
this.logger.error(`Cannot stop transform id ${transformId}: ${err}`);
Expand All @@ -71,9 +79,13 @@ export class DefaultTransformManager implements TransformManager {

async uninstall(transformId: TransformId): Promise<void> {
try {
await this.esClient.transform.deleteTransform(
{ transform_id: transformId, force: true },
{ ignore: [404] }
await retryTransientEsErrors(
() =>
this.esClient.transform.deleteTransform(
{ transform_id: transformId, force: true },
{ ignore: [404] }
),
{ logger: this.logger }
);
} catch (err) {
this.logger.error(`Cannot delete transform id ${transformId}: ${err}`);
Expand Down
87 changes: 87 additions & 0 deletions x-pack/plugins/observability/server/utils/retry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

jest.mock('timers/promises');
import { setTimeout } from 'timers/promises';
import { loggerMock } from '@kbn/logging-mocks';
import { errors as EsErrors } from '@elastic/elasticsearch';

import { retryTransientEsErrors } from './retry';

const setTimeoutMock = setTimeout as jest.Mock<
ReturnType<typeof setTimeout>,
Parameters<typeof setTimeout>
>;

describe('retryTransientErrors', () => {
beforeEach(() => {
setTimeoutMock.mockClear();
});

it("doesn't retry if operation is successful", async () => {
const esCallMock = jest.fn().mockResolvedValue('success');
expect(await retryTransientEsErrors(esCallMock)).toEqual('success');
expect(esCallMock).toHaveBeenCalledTimes(1);
});

it('logs an warning message on retry', async () => {
const logger = loggerMock.create();
const esCallMock = jest
.fn()
.mockRejectedValueOnce(new EsErrors.ConnectionError('foo'))
.mockResolvedValue('success');

await retryTransientEsErrors(esCallMock, { logger });
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatch(
`Retrying Elasticsearch operation after [2s] due to error: ConnectionError: foo ConnectionError: foo`
);
});

it('retries with an exponential backoff', async () => {
let attempt = 0;
const esCallMock = jest.fn(async () => {
attempt++;
if (attempt < 5) {
throw new EsErrors.ConnectionError('foo');
} else {
return 'success';
}
});

expect(await retryTransientEsErrors(esCallMock)).toEqual('success');
expect(setTimeoutMock.mock.calls).toEqual([[2000], [4000], [8000], [16000]]);
expect(esCallMock).toHaveBeenCalledTimes(5);
});

it('retries each supported error type', async () => {
const errors = [
new EsErrors.NoLivingConnectionsError('no living connection', {
warnings: [],
meta: {} as any,
}),
new EsErrors.ConnectionError('no connection'),
new EsErrors.TimeoutError('timeout'),
new EsErrors.ResponseError({ statusCode: 503, meta: {} as any, warnings: [] }),
new EsErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] }),
new EsErrors.ResponseError({ statusCode: 410, meta: {} as any, warnings: [] }),
];

for (const error of errors) {
const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success');
expect(await retryTransientEsErrors(esCallMock)).toEqual('success');
expect(esCallMock).toHaveBeenCalledTimes(2);
}
});

it('does not retry unsupported errors', async () => {
const error = new Error('foo!');
const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success');
await expect(retryTransientEsErrors(esCallMock)).rejects.toThrow(error);
expect(esCallMock).toHaveBeenCalledTimes(1);
});
});
53 changes: 53 additions & 0 deletions x-pack/plugins/observability/server/utils/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { setTimeout } from 'timers/promises';
import { errors as EsErrors } from '@elastic/elasticsearch';
import type { Logger } from '@kbn/logging';

const MAX_ATTEMPTS = 5;

const retryResponseStatuses = [
503, // ServiceUnavailable
408, // RequestTimeout
410, // Gone
];

const isRetryableError = (e: any) =>
e instanceof EsErrors.NoLivingConnectionsError ||
e instanceof EsErrors.ConnectionError ||
e instanceof EsErrors.TimeoutError ||
(e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!));

/**
* Retries any transient network or configuration issues encountered from Elasticsearch with an exponential backoff.
* Should only be used to wrap operations that are idempotent and can be safely executed more than once.
*/
export const retryTransientEsErrors = async <T>(
esCall: () => Promise<T>,
{ logger, attempt = 0 }: { logger?: Logger; attempt?: number } = {}
): Promise<T> => {
try {
return await esCall();
} catch (e) {
if (attempt < MAX_ATTEMPTS && isRetryableError(e)) {
const retryCount = attempt + 1;
const retryDelaySec = Math.min(Math.pow(2, retryCount), 64); // 2s, 4s, 8s, 16s, 32s, 64s, 64s, 64s ...

logger?.warn(
`Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${
e.stack
}`
);

await setTimeout(retryDelaySec * 1000);
return retryTransientEsErrors(esCall, { logger, attempt: retryCount });
}

throw e;
}
};

0 comments on commit f936fb6

Please sign in to comment.