From 0d777fd2851701a50ed51fb8d9e132ba69182051 Mon Sep 17 00:00:00 2001 From: Josh Dover Date: Mon, 15 Nov 2021 18:50:43 +0100 Subject: [PATCH 1/2] Add retryTransientEsErrors utility --- .../services/epm/elasticsearch/retry.test.ts | 88 +++++++++++++++++++ .../services/epm/elasticsearch/retry.ts | 53 +++++++++++ 2 files changed, 141 insertions(+) create mode 100644 x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.test.ts create mode 100644 x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.ts diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.test.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.test.ts new file mode 100644 index 0000000000000..5b9a1bf1539f0 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.test.ts @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +jest.mock('timers/promises'); +import { setTimeout } from 'timers/promises'; + +import { loggerMock } from '@kbn/logging/mocks'; +import { errors as EsErrors } from '@elastic/elasticsearch'; + +import { retryTransientEsErrors } from './retry'; + +const setTimeoutMock = setTimeout as jest.Mock< + ReturnType, + Parameters +>; + +describe('retryTransientErrors', () => { + beforeEach(() => { + setTimeoutMock.mockClear(); + }); + + it("doesn't retry if operation is successful", async () => { + const esCallMock = jest.fn().mockResolvedValue('success'); + expect(await retryTransientEsErrors(esCallMock)).toEqual('success'); + expect(esCallMock).toHaveBeenCalledTimes(1); + }); + + it('logs an warning message on retry', async () => { + const logger = loggerMock.create(); + const esCallMock = jest + .fn() + .mockRejectedValueOnce(new EsErrors.ConnectionError('foo')) + .mockResolvedValue('success'); + + await retryTransientEsErrors(esCallMock, { logger }); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn.mock.calls[0][0]).toMatch( + `Retrying Elasticsearch operation after [2s] due to error: ConnectionError: foo ConnectionError: foo` + ); + }); + + it('retries with an exponential backoff', async () => { + let attempt = 0; + const esCallMock = jest.fn(async () => { + attempt++; + if (attempt < 5) { + throw new EsErrors.ConnectionError('foo'); + } else { + return 'success'; + } + }); + + expect(await retryTransientEsErrors(esCallMock)).toEqual('success'); + expect(setTimeoutMock.mock.calls).toEqual([[2000], [4000], [8000], [16000]]); + expect(esCallMock).toHaveBeenCalledTimes(5); + }); + + it('retries each supported error type', async () => { + const errors = [ + new EsErrors.NoLivingConnectionsError('no living connection', { + warnings: [], + meta: {} as any, + }), + new EsErrors.ConnectionError('no connection'), + new EsErrors.TimeoutError('timeout'), + new EsErrors.ResponseError({ statusCode: 503, meta: {} as any, warnings: [] }), + new EsErrors.ResponseError({ statusCode: 408, meta: {} as any, warnings: [] }), + new EsErrors.ResponseError({ statusCode: 410, meta: {} as any, warnings: [] }), + ]; + + for (const error of errors) { + const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success'); + expect(await retryTransientEsErrors(esCallMock)).toEqual('success'); + expect(esCallMock).toHaveBeenCalledTimes(2); + } + }); + + it('does not retry unsupported errors', async () => { + const error = new Error('foo!'); + const esCallMock = jest.fn().mockRejectedValueOnce(error).mockResolvedValue('success'); + await expect(retryTransientEsErrors(esCallMock)).rejects.toThrow(error); + expect(esCallMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.ts new file mode 100644 index 0000000000000..c8ea36a4addec --- /dev/null +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/retry.ts @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { setTimeout } from 'timers/promises'; + +import { errors as EsErrors } from '@elastic/elasticsearch'; +import type { Logger } from '@kbn/logging'; + +const MAX_ATTEMPTS = 5; + +const retryResponseStatuses = [ + 503, // ServiceUnavailable + 408, // RequestTimeout + 410, // Gone +]; + +const isRetryableError = (e: any) => + e instanceof EsErrors.NoLivingConnectionsError || + e instanceof EsErrors.ConnectionError || + e instanceof EsErrors.TimeoutError || + (e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!)); + +/** + * Retries any transient network or configuration issues encountered from Elasticsearch with an exponential backoff. + * Should only be used to wrap operations that are idempotent and can be safely executed more than once. + */ +export const retryTransientEsErrors = async ( + esCall: () => Promise, + { logger, attempt = 0 }: { logger?: Logger; attempt?: number } = {} +): Promise => { + try { + return await esCall(); + } catch (e) { + if (attempt < MAX_ATTEMPTS && isRetryableError(e)) { + const retryCount = attempt + 1; + const retryDelaySec = Math.min(Math.pow(2, retryCount), 64); // 2s, 4s, 8s, 16s, 32s, 64s, 64s, 64s ... + + logger?.warn( + `Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${ + e.stack + }` + ); + + await setTimeout(retryDelaySec * 1000); + return retryTransientEsErrors(esCall, { logger, attempt: retryCount }); + } + + throw e; + } +}; From e95e73f05d179877a6164d80a9044fe1f37583bd Mon Sep 17 00:00:00 2001 From: Josh Dover Date: Mon, 29 Nov 2021 16:47:06 +0100 Subject: [PATCH 2/2] Add retry logic to idempotent install operations --- .../elasticsearch/datastream_ilm/install.ts | 24 ++-- .../services/epm/elasticsearch/ilm/install.ts | 20 +-- .../elasticsearch/ingest_pipeline/install.ts | 26 +++- .../epm/elasticsearch/ml_model/install.ts | 29 +++-- .../elasticsearch/template/install.test.ts | 5 + .../epm/elasticsearch/template/install.ts | 116 +++++++++++++----- .../epm/elasticsearch/template/template.ts | 40 ++++-- .../epm/elasticsearch/transform/install.ts | 29 +++-- .../elasticsearch/transform/transform.test.ts | 13 +- .../epm/packages/_install_package.test.ts | 2 + .../services/epm/packages/_install_package.ts | 31 +++-- .../server/services/epm/packages/install.ts | 3 + x-pack/plugins/fleet/server/services/setup.ts | 4 +- 13 files changed, 242 insertions(+), 100 deletions(-) diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/datastream_ilm/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/datastream_ilm/install.ts index 2e43fe44527b7..a1075b15a6462 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/datastream_ilm/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/datastream_ilm/install.ts @@ -5,7 +5,7 @@ * 2.0. */ -import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server'; +import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'kibana/server'; import { ElasticsearchAssetType } from '../../../../../common/types/models'; import type { @@ -18,6 +18,7 @@ import { saveInstalledEsRefs } from '../../packages/install'; import { getAsset } from '../transform/common'; import { getESAssetMetadata } from '../meta'; +import { retryTransientEsErrors } from '../retry'; import { deleteIlmRefs, deleteIlms } from './remove'; @@ -35,7 +36,8 @@ export const installIlmForDataStream = async ( registryPackage: InstallablePackage, paths: string[], esClient: ElasticsearchClient, - savedObjectsClient: SavedObjectsClientContract + savedObjectsClient: SavedObjectsClientContract, + logger: Logger ) => { const installation = await getInstallation({ savedObjectsClient, pkgName: registryPackage.name }); let previousInstalledIlmEsAssets: EsAssetReference[] = []; @@ -90,7 +92,7 @@ export const installIlmForDataStream = async ( ); const installationPromises = ilmInstallations.map(async (ilmInstallation) => { - return handleIlmInstall({ esClient, ilmInstallation }); + return handleIlmInstall({ esClient, ilmInstallation, logger }); }); installedIlms = await Promise.all(installationPromises).then((results) => results.flat()); @@ -117,15 +119,21 @@ export const installIlmForDataStream = async ( async function handleIlmInstall({ esClient, ilmInstallation, + logger, }: { esClient: ElasticsearchClient; ilmInstallation: IlmInstallation; + logger: Logger; }): Promise { - await esClient.transport.request({ - method: 'PUT', - path: `/_ilm/policy/${ilmInstallation.installationName}`, - body: ilmInstallation.content, - }); + await retryTransientEsErrors( + () => + esClient.transport.request({ + method: 'PUT', + path: `/_ilm/policy/${ilmInstallation.installationName}`, + body: ilmInstallation.content, + }), + { logger } + ); return { id: ilmInstallation.installationName, type: ElasticsearchAssetType.dataStreamIlmPolicy }; } diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ilm/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ilm/install.ts index 380bd0e913d6d..b77a787090ed6 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ilm/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ilm/install.ts @@ -5,18 +5,20 @@ * 2.0. */ -import type { ElasticsearchClient } from 'kibana/server'; +import type { ElasticsearchClient, Logger } from 'kibana/server'; import type { InstallablePackage } from '../../../../types'; import { ElasticsearchAssetType } from '../../../../types'; import { getAsset, getPathParts } from '../../archive'; import { getESAssetMetadata } from '../meta'; +import { retryTransientEsErrors } from '../retry'; export async function installILMPolicy( packageInfo: InstallablePackage, paths: string[], - esClient: ElasticsearchClient + esClient: ElasticsearchClient, + logger: Logger ) { const ilmPaths = paths.filter((path) => isILMPolicy(path)); if (!ilmPaths.length) return; @@ -29,11 +31,15 @@ export async function installILMPolicy( const { file } = getPathParts(path); const name = file.substr(0, file.lastIndexOf('.')); try { - await esClient.transport.request({ - method: 'PUT', - path: '/_ilm/policy/' + name, - body, - }); + await retryTransientEsErrors( + () => + esClient.transport.request({ + method: 'PUT', + path: '/_ilm/policy/' + name, + body, + }), + { logger } + ); } catch (err) { throw new Error(err.message); } diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts index 560ff08331044..d857d7c6bc2fb 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts @@ -6,7 +6,7 @@ */ import type { TransportRequestOptions } from '@elastic/elasticsearch'; -import type { ElasticsearchClient, SavedObjectsClientContract } from 'src/core/server'; +import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'src/core/server'; import { ElasticsearchAssetType } from '../../../../types'; import type { EsAssetReference, RegistryDataStream, InstallablePackage } from '../../../../types'; @@ -22,6 +22,8 @@ import { import { appendMetadataToIngestPipeline } from '../meta'; +import { retryTransientEsErrors } from '../retry'; + import { deletePipelineRefs } from './remove'; interface RewriteSubstitution { @@ -41,7 +43,8 @@ export const installPipelines = async ( installablePackage: InstallablePackage, paths: string[], esClient: ElasticsearchClient, - savedObjectsClient: SavedObjectsClientContract + savedObjectsClient: SavedObjectsClientContract, + logger: Logger ) => { // unlike other ES assets, pipeline names are versioned so after a template is updated // it can be created pointing to the new template, without removing the old one and effecting data @@ -105,6 +108,7 @@ export const installPipelines = async ( installAllPipelines({ dataStream, esClient, + logger, paths: pipelinePaths, installablePackage, }) @@ -119,6 +123,7 @@ export const installPipelines = async ( installAllPipelines({ dataStream: undefined, esClient, + logger, paths: topLevelPipelinePaths, installablePackage, }) @@ -151,11 +156,13 @@ export function rewriteIngestPipeline( export async function installAllPipelines({ esClient, + logger, paths, dataStream, installablePackage, }: { esClient: ElasticsearchClient; + logger: Logger; paths: string[]; dataStream?: RegistryDataStream; installablePackage: InstallablePackage; @@ -195,7 +202,7 @@ export async function installAllPipelines({ }); const installationPromises = pipelines.map(async (pipeline) => { - return installPipeline({ esClient, pipeline, installablePackage }); + return installPipeline({ esClient, pipeline, installablePackage, logger }); }); return Promise.all(installationPromises); @@ -203,10 +210,12 @@ export async function installAllPipelines({ async function installPipeline({ esClient, + logger, pipeline, installablePackage, }: { esClient: ElasticsearchClient; + logger: Logger; pipeline: any; installablePackage?: InstallablePackage; }): Promise { @@ -233,7 +242,10 @@ async function installPipeline({ }; } - await esClient.ingest.putPipeline(esClientParams, esClientRequestOptions); + await retryTransientEsErrors( + () => esClient.ingest.putPipeline(esClientParams, esClientRequestOptions), + { logger } + ); return { id: pipelineWithMetadata.nameForInstallation, @@ -241,7 +253,10 @@ async function installPipeline({ }; } -export async function ensureFleetFinalPipelineIsInstalled(esClient: ElasticsearchClient) { +export async function ensureFleetFinalPipelineIsInstalled( + esClient: ElasticsearchClient, + logger: Logger +) { const esClientRequestOptions: TransportRequestOptions = { ignore: [404], }; @@ -258,6 +273,7 @@ export async function ensureFleetFinalPipelineIsInstalled(esClient: Elasticsearc ) { await installPipeline({ esClient, + logger, pipeline: { nameForInstallation: FLEET_FINAL_PIPELINE_ID, contentForInstallation: FLEET_FINAL_PIPELINE_CONTENT, diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ml_model/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ml_model/install.ts index d97081f15aca3..e5c96bea87181 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ml_model/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ml_model/install.ts @@ -5,7 +5,7 @@ * 2.0. */ -import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server'; +import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'kibana/server'; import { errors } from '@elastic/elasticsearch'; import { saveInstalledEsRefs } from '../../packages/install'; @@ -13,6 +13,8 @@ import { getPathParts } from '../../archive'; import { ElasticsearchAssetType } from '../../../../../common/types/models'; import type { EsAssetReference, InstallablePackage } from '../../../../../common/types/models'; +import { retryTransientEsErrors } from '../retry'; + import { getAsset } from './common'; interface MlModelInstallation { @@ -24,7 +26,8 @@ export const installMlModel = async ( installablePackage: InstallablePackage, paths: string[], esClient: ElasticsearchClient, - savedObjectsClient: SavedObjectsClientContract + savedObjectsClient: SavedObjectsClientContract, + logger: Logger ) => { const mlModelPath = paths.find((path) => isMlModel(path)); @@ -47,7 +50,7 @@ export const installMlModel = async ( content, }; - const result = await handleMlModelInstall({ esClient, mlModel }); + const result = await handleMlModelInstall({ esClient, logger, mlModel }); installedMlModels.push(result); } return installedMlModels; @@ -61,19 +64,25 @@ const isMlModel = (path: string) => { async function handleMlModelInstall({ esClient, + logger, mlModel, }: { esClient: ElasticsearchClient; + logger: Logger; mlModel: MlModelInstallation; }): Promise { try { - await esClient.ml.putTrainedModel({ - model_id: mlModel.installationName, - defer_definition_decompression: true, - timeout: '45s', - // @ts-expect-error expects an object not a string - body: mlModel.content, - }); + await retryTransientEsErrors( + () => + esClient.ml.putTrainedModel({ + model_id: mlModel.installationName, + defer_definition_decompression: true, + timeout: '45s', + // @ts-expect-error expects an object not a string + body: mlModel.content, + }), + { logger } + ); } catch (err) { // swallow the error if the ml model already exists. const isAlreadyExistError = diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.test.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.test.ts index 2e6365a9913e4..eba645ae1aae4 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.test.ts @@ -6,6 +6,7 @@ */ import { elasticsearchServiceMock } from 'src/core/server/mocks'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { loggerMock } from '@kbn/logging/mocks'; import { createAppContextStartContractMock } from '../../../../mocks'; import { appContextService } from '../../../../services'; @@ -44,6 +45,7 @@ describe('EPM install', () => { const templatePriorityDatasetIsPrefixUnset = 200; await installTemplate({ esClient, + logger: loggerMock.create(), fields, dataStream: dataStreamDatasetIsPrefixUnset, packageVersion: pkg.version, @@ -84,6 +86,7 @@ describe('EPM install', () => { const templatePriorityDatasetIsPrefixFalse = 200; await installTemplate({ esClient, + logger: loggerMock.create(), fields, dataStream: dataStreamDatasetIsPrefixFalse, packageVersion: pkg.version, @@ -124,6 +127,7 @@ describe('EPM install', () => { const templatePriorityDatasetIsPrefixTrue = 150; await installTemplate({ esClient, + logger: loggerMock.create(), fields, dataStream: dataStreamDatasetIsPrefixTrue, packageVersion: pkg.version, @@ -174,6 +178,7 @@ describe('EPM install', () => { const templatePriorityDatasetIsPrefixUnset = 200; await installTemplate({ esClient, + logger: loggerMock.create(), fields, dataStream: dataStreamDatasetIsPrefixUnset, packageVersion: pkg.version, diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts index de64b99c787ad..eb5b43650dad7 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts @@ -7,7 +7,7 @@ import { merge } from 'lodash'; import Boom from '@hapi/boom'; -import type { ElasticsearchClient, SavedObjectsClientContract } from 'src/core/server'; +import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'src/core/server'; import { ElasticsearchAssetType } from '../../../../types'; import type { @@ -29,6 +29,7 @@ import { import type { ESAssetMetadata } from '../meta'; import { getESAssetMetadata } from '../meta'; +import { retryTransientEsErrors } from '../retry'; import { generateMappings, @@ -42,14 +43,15 @@ import { buildDefaultSettings } from './default_settings'; export const installTemplates = async ( installablePackage: InstallablePackage, esClient: ElasticsearchClient, + logger: Logger, paths: string[], savedObjectsClient: SavedObjectsClientContract ): Promise => { // install any pre-built index template assets, // atm, this is only the base package's global index templates // Install component templates first, as they are used by the index templates - await installPreBuiltComponentTemplates(paths, esClient); - await installPreBuiltTemplates(paths, esClient); + await installPreBuiltComponentTemplates(paths, esClient, logger); + await installPreBuiltTemplates(paths, esClient, logger); // remove package installation's references to index templates await removeAssetTypesFromInstalledEs(savedObjectsClient, installablePackage.name, [ @@ -65,6 +67,7 @@ export const installTemplates = async ( installTemplateForDataStream({ pkg: installablePackage, esClient, + logger, dataStream, }) ) @@ -84,7 +87,11 @@ export const installTemplates = async ( return installedTemplates; }; -const installPreBuiltTemplates = async (paths: string[], esClient: ElasticsearchClient) => { +const installPreBuiltTemplates = async ( + paths: string[], + esClient: ElasticsearchClient, + logger: Logger +) => { const templatePaths = paths.filter((path) => isTemplate(path)); const templateInstallPromises = templatePaths.map(async (path) => { const { file } = getPathParts(path); @@ -96,10 +103,16 @@ const installPreBuiltTemplates = async (paths: string[], esClient: Elasticsearch if (content.hasOwnProperty('template') || content.hasOwnProperty('composed_of')) { // Template is v2 - return esClient.indices.putIndexTemplate(esClientParams, esClientRequestOptions); + return retryTransientEsErrors( + () => esClient.indices.putIndexTemplate(esClientParams, esClientRequestOptions), + { logger } + ); } else { // template is V1 - return esClient.indices.putTemplate(esClientParams, esClientRequestOptions); + return retryTransientEsErrors( + () => esClient.indices.putTemplate(esClientParams, esClientRequestOptions), + { logger } + ); } }); try { @@ -113,7 +126,8 @@ const installPreBuiltTemplates = async (paths: string[], esClient: Elasticsearch const installPreBuiltComponentTemplates = async ( paths: string[], - esClient: ElasticsearchClient + esClient: ElasticsearchClient, + logger: Logger ) => { const templatePaths = paths.filter((path) => isComponentTemplate(path)); const templateInstallPromises = templatePaths.map(async (path) => { @@ -126,7 +140,10 @@ const installPreBuiltComponentTemplates = async ( body: content, }; - return esClient.cluster.putComponentTemplate(esClientParams, { ignore: [404] }); + return retryTransientEsErrors( + () => esClient.cluster.putComponentTemplate(esClientParams, { ignore: [404] }), + { logger } + ); }); try { @@ -157,15 +174,18 @@ const isComponentTemplate = (path: string) => { export async function installTemplateForDataStream({ pkg, esClient, + logger, dataStream, }: { pkg: InstallablePackage; esClient: ElasticsearchClient; + logger: Logger; dataStream: RegistryDataStream; }): Promise { const fields = await loadFieldsFromYaml(pkg, dataStream.path); return installTemplate({ esClient, + logger, fields, dataStream, packageVersion: pkg.version, @@ -186,6 +206,7 @@ interface TemplateMapEntry { type TemplateMap = Record; function putComponentTemplate( esClient: ElasticsearchClient, + logger: Logger, params: { body: TemplateMapEntry; name: string; @@ -194,9 +215,9 @@ function putComponentTemplate( ): { clusterPromise: Promise; name: string } { const { name, body, create = false } = params; return { - clusterPromise: esClient.cluster.putComponentTemplate( - { name, body, create }, - { ignore: [404] } + clusterPromise: retryTransientEsErrors( + () => esClient.cluster.putComponentTemplate({ name, body, create }, { ignore: [404] }), + { logger } ), name, }; @@ -256,10 +277,12 @@ async function installDataStreamComponentTemplates(params: { templateName: string; registryElasticsearch: RegistryElasticsearch | undefined; esClient: ElasticsearchClient; + logger: Logger; packageName: string; defaultSettings: IndexTemplate['template']['settings']; }) { - const { templateName, registryElasticsearch, esClient, packageName, defaultSettings } = params; + const { templateName, registryElasticsearch, esClient, packageName, defaultSettings, logger } = + params; const templates = buildComponentTemplates({ templateName, registryElasticsearch, @@ -274,15 +297,22 @@ async function installDataStreamComponentTemplates(params: { templateEntries.map(async ([name, body]) => { if (isUserSettingsTemplate(name)) { // look for existing user_settings template - const result = await esClient.cluster.getComponentTemplate({ name }, { ignore: [404] }); + const result = await retryTransientEsErrors( + () => esClient.cluster.getComponentTemplate({ name }, { ignore: [404] }), + { logger } + ); const hasUserSettingsTemplate = result.body.component_templates?.length === 1; if (!hasUserSettingsTemplate) { // only add if one isn't already present - const { clusterPromise } = putComponentTemplate(esClient, { body, name, create: true }); + const { clusterPromise } = putComponentTemplate(esClient, logger, { + body, + name, + create: true, + }); return clusterPromise; } } else { - const { clusterPromise } = putComponentTemplate(esClient, { body, name }); + const { clusterPromise } = putComponentTemplate(esClient, logger, { body, name }); return clusterPromise; } }) @@ -291,19 +321,26 @@ async function installDataStreamComponentTemplates(params: { return templateNames; } -export async function ensureDefaultComponentTemplate(esClient: ElasticsearchClient) { - const { body: getTemplateRes } = await esClient.cluster.getComponentTemplate( - { - name: FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME, - }, - { - ignore: [404], - } +export async function ensureDefaultComponentTemplate( + esClient: ElasticsearchClient, + logger: Logger +) { + const { body: getTemplateRes } = await retryTransientEsErrors( + () => + esClient.cluster.getComponentTemplate( + { + name: FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME, + }, + { + ignore: [404], + } + ), + { logger } ); const existingTemplate = getTemplateRes?.component_templates?.[0]; if (!existingTemplate) { - await putComponentTemplate(esClient, { + await putComponentTemplate(esClient, logger, { name: FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME, body: FLEET_GLOBAL_COMPONENT_TEMPLATE_CONTENT, create: true, @@ -315,12 +352,14 @@ export async function ensureDefaultComponentTemplate(esClient: ElasticsearchClie export async function installTemplate({ esClient, + logger, fields, dataStream, packageVersion, packageName, }: { esClient: ElasticsearchClient; + logger: Logger; fields: Field[]; dataStream: RegistryDataStream; packageVersion: string; @@ -342,13 +381,17 @@ export async function installTemplate({ } // Datastream now throw an error if the aliases field is present so ensure that we remove that field. - const { body: getTemplateRes } = await esClient.indices.getIndexTemplate( - { - name: templateName, - }, - { - ignore: [404], - } + const { body: getTemplateRes } = await retryTransientEsErrors( + () => + esClient.indices.getIndexTemplate( + { + name: templateName, + }, + { + ignore: [404], + } + ), + { logger } ); const existingIndexTemplate = getTemplateRes?.index_templates?.[0]; @@ -369,7 +412,10 @@ export async function installTemplate({ }, }; - await esClient.indices.putIndexTemplate(updateIndexTemplateParams, { ignore: [404] }); + await retryTransientEsErrors( + () => esClient.indices.putIndexTemplate(updateIndexTemplateParams, { ignore: [404] }), + { logger } + ); } const defaultSettings = buildDefaultSettings({ @@ -384,6 +430,7 @@ export async function installTemplate({ templateName, registryElasticsearch: dataStream.elasticsearch, esClient, + logger, packageName, defaultSettings, }); @@ -406,7 +453,10 @@ export async function installTemplate({ body: template, }; - await esClient.indices.putIndexTemplate(esClientParams, { ignore: [404] }); + await retryTransientEsErrors( + () => esClient.indices.putIndexTemplate(esClientParams, { ignore: [404] }), + { logger } + ); return { templateName, diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts index 5bad33defc578..05f7b744f4db9 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts @@ -5,7 +5,7 @@ * 2.0. */ -import type { ElasticsearchClient } from 'kibana/server'; +import type { ElasticsearchClient, Logger } from 'kibana/server'; import type { Field, Fields } from '../../fields/field'; import type { @@ -18,6 +18,7 @@ import { appContextService } from '../../../'; import { getRegistryDataStreamAssetBaseName } from '../index'; import { FLEET_GLOBAL_COMPONENT_TEMPLATE_NAME } from '../../../../constants'; import { getESAssetMetadata } from '../meta'; +import { retryTransientEsErrors } from '../retry'; interface Properties { [key: string]: any; @@ -408,13 +409,14 @@ function getBaseTemplate( export const updateCurrentWriteIndices = async ( esClient: ElasticsearchClient, + logger: Logger, templates: IndexTemplateEntry[] ): Promise => { if (!templates.length) return; const allIndices = await queryDataStreamsFromTemplates(esClient, templates); if (!allIndices.length) return; - return updateAllDataStreams(allIndices, esClient); + return updateAllDataStreams(allIndices, esClient, logger); }; function isCurrentDataStream(item: CurrentDataStream[] | undefined): item is CurrentDataStream[] { @@ -448,11 +450,12 @@ const getDataStreams = async ( const updateAllDataStreams = async ( indexNameWithTemplates: CurrentDataStream[], - esClient: ElasticsearchClient + esClient: ElasticsearchClient, + logger: Logger ): Promise => { const updatedataStreamPromises = indexNameWithTemplates.map( ({ dataStreamName, indexTemplate }) => { - return updateExistingDataStream({ dataStreamName, esClient, indexTemplate }); + return updateExistingDataStream({ dataStreamName, esClient, logger, indexTemplate }); } ); await Promise.all(updatedataStreamPromises); @@ -460,10 +463,12 @@ const updateAllDataStreams = async ( const updateExistingDataStream = async ({ dataStreamName, esClient, + logger, indexTemplate, }: { dataStreamName: string; esClient: ElasticsearchClient; + logger: Logger; indexTemplate: IndexTemplate; }) => { const { settings, mappings } = indexTemplate.template; @@ -476,14 +481,19 @@ const updateExistingDataStream = async ({ // try to update the mappings first try { - await esClient.indices.putMapping({ - index: dataStreamName, - body: mappings, - write_index_only: true, - }); + await retryTransientEsErrors( + () => + esClient.indices.putMapping({ + index: dataStreamName, + body: mappings, + write_index_only: true, + }), + { logger } + ); // if update fails, rollover data stream } catch (err) { try { + // Do no wrap rollovers in retryTransientEsErrors since it is not idempotent const path = `/${dataStreamName}/_rollover`; await esClient.transport.request({ method: 'POST', @@ -498,10 +508,14 @@ const updateExistingDataStream = async ({ // for now, only update the pipeline if (!settings.index.default_pipeline) return; try { - await esClient.indices.putSettings({ - index: dataStreamName, - body: { default_pipeline: settings.index.default_pipeline }, - }); + await retryTransientEsErrors( + () => + esClient.indices.putSettings({ + index: dataStreamName, + body: { default_pipeline: settings.index.default_pipeline }, + }), + { logger } + ); } catch (err) { throw new Error(`could not update index template settings for ${dataStreamName}`); } diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/install.ts index 8b76b5a026fc0..197d463797cac 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/install.ts @@ -5,7 +5,7 @@ * 2.0. */ -import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server'; +import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from 'kibana/server'; import { errors } from '@elastic/elasticsearch'; import { saveInstalledEsRefs } from '../../packages/install'; @@ -13,10 +13,11 @@ import { getPathParts } from '../../archive'; import { ElasticsearchAssetType } from '../../../../../common/types/models'; import type { EsAssetReference, InstallablePackage } from '../../../../../common/types/models'; import { getInstallation } from '../../packages'; -import { appContextService } from '../../../app_context'; import { getESAssetMetadata } from '../meta'; +import { retryTransientEsErrors } from '../retry'; + import { deleteTransforms, deleteTransformRefs } from './remove'; import { getAsset } from './common'; @@ -29,9 +30,9 @@ export const installTransform = async ( installablePackage: InstallablePackage, paths: string[], esClient: ElasticsearchClient, - savedObjectsClient: SavedObjectsClientContract + savedObjectsClient: SavedObjectsClientContract, + logger: Logger ) => { - const logger = appContextService.getLogger(); const installation = await getInstallation({ savedObjectsClient, pkgName: installablePackage.name, @@ -87,7 +88,7 @@ export const installTransform = async ( }); const installationPromises = transforms.map(async (transform) => { - return handleTransformInstall({ esClient, transform }); + return handleTransformInstall({ esClient, logger, transform }); }); installedTransforms = await Promise.all(installationPromises).then((results) => results.flat()); @@ -118,18 +119,24 @@ const isTransform = (path: string) => { async function handleTransformInstall({ esClient, + logger, transform, }: { esClient: ElasticsearchClient; + logger: Logger; transform: TransformInstallation; }): Promise { try { - // defer validation on put if the source index is not available - await esClient.transform.putTransform({ - transform_id: transform.installationName, - defer_validation: true, - body: transform.content, - }); + await retryTransientEsErrors( + () => + // defer validation on put if the source index is not available + esClient.transform.putTransform({ + transform_id: transform.installationName, + defer_validation: true, + body: transform.content, + }), + { logger } + ); } catch (err) { // swallow the error if the transform already exists. const isAlreadyExistError = diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/transform.test.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/transform.test.ts index 1aef95a49fdcb..94e2e3f6d73af 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/transform.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/transform.test.ts @@ -21,6 +21,7 @@ jest.mock('./common', () => { import { errors } from '@elastic/elasticsearch'; import type { DeeplyMockedKeys } from '@kbn/utility-types/jest'; import type { ElasticsearchClient, SavedObject, SavedObjectsClientContract } from 'kibana/server'; +import { loggerMock } from '@kbn/logging/mocks'; import { ElasticsearchAssetType } from '../../../../types'; import type { Installation, RegistryPackage } from '../../../../types'; @@ -157,7 +158,8 @@ describe('test transform install', () => { 'endpoint-0.16.0-dev.0/elasticsearch/transform/metadata_current/default.json', ], esClient, - savedObjectsClient + savedObjectsClient, + loggerMock.create() ); expect(esClient.transform.getTransform.mock.calls).toEqual([ @@ -329,7 +331,8 @@ describe('test transform install', () => { } as unknown as RegistryPackage, ['endpoint-0.16.0-dev.0/elasticsearch/transform/metadata_current/default.json'], esClient, - savedObjectsClient + savedObjectsClient, + loggerMock.create() ); const meta = getESAssetMetadata({ packageName: 'endpoint' }); @@ -441,7 +444,8 @@ describe('test transform install', () => { } as unknown as RegistryPackage, [], esClient, - savedObjectsClient + savedObjectsClient, + loggerMock.create() ); expect(esClient.transform.getTransform.mock.calls).toEqual([ @@ -556,7 +560,8 @@ describe('test transform install', () => { } as unknown as RegistryPackage, ['endpoint-0.16.0-dev.0/elasticsearch/transform/metadata_current/default.json'], esClient, - savedObjectsClient + savedObjectsClient, + loggerMock.create() ); const meta = getESAssetMetadata({ packageName: 'endpoint' }); diff --git a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts index 7996cbfb79ef8..5ee0f57b6e03a 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts @@ -7,6 +7,7 @@ import type { SavedObjectsClientContract, ElasticsearchClient } from 'src/core/server'; import { savedObjectsClientMock, elasticsearchServiceMock } from 'src/core/server/mocks'; +import { loggerMock } from '@kbn/logging/mocks'; import { appContextService } from '../../app_context'; import { createAppContextStartContractMock } from '../../../mocks'; @@ -66,6 +67,7 @@ describe('_installPackage', () => { const installationPromise = _installPackage({ savedObjectsClient: soClient, esClient, + logger: loggerMock.create(), paths: [], packageInfo: { title: 'title', diff --git a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts index 776a3d3cd6bc1..e2027a99463fc 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts @@ -5,7 +5,12 @@ * 2.0. */ -import type { ElasticsearchClient, SavedObject, SavedObjectsClientContract } from 'src/core/server'; +import type { + ElasticsearchClient, + Logger, + SavedObject, + SavedObjectsClientContract, +} from 'src/core/server'; import { MAX_TIME_COMPLETE_INSTALL, @@ -44,6 +49,7 @@ import { deleteKibanaSavedObjectsAssets } from './remove'; export async function _installPackage({ savedObjectsClient, esClient, + logger, installedPkg, paths, packageInfo, @@ -52,6 +58,7 @@ export async function _installPackage({ }: { savedObjectsClient: SavedObjectsClientContract; esClient: ElasticsearchClient; + logger: Logger; installedPkg?: SavedObject; paths: string[]; packageInfo: InstallablePackage; @@ -131,41 +138,51 @@ export async function _installPackage({ // currently only the base package has an ILM policy // at some point ILM policies can be installed/modified // per data stream and we should then save them - await installILMPolicy(packageInfo, paths, esClient); + await installILMPolicy(packageInfo, paths, esClient, logger); const installedDataStreamIlm = await installIlmForDataStream( packageInfo, paths, esClient, - savedObjectsClient + savedObjectsClient, + logger ); // installs ml models - const installedMlModel = await installMlModel(packageInfo, paths, esClient, savedObjectsClient); + const installedMlModel = await installMlModel( + packageInfo, + paths, + esClient, + savedObjectsClient, + logger + ); // installs versionized pipelines without removing currently installed ones const installedPipelines = await installPipelines( packageInfo, paths, esClient, - savedObjectsClient + savedObjectsClient, + logger ); // install or update the templates referencing the newly installed pipelines const installedTemplates = await installTemplates( packageInfo, esClient, + logger, paths, savedObjectsClient ); // update current backing indices of each data stream - await updateCurrentWriteIndices(esClient, installedTemplates); + await updateCurrentWriteIndices(esClient, logger, installedTemplates); const installedTransforms = await installTransform( packageInfo, paths, esClient, - savedObjectsClient + savedObjectsClient, + logger ); // If this is an update or retrying an update, delete the previous version's pipelines diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index db26dc3a20a80..330fd84e789b8 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -308,6 +308,7 @@ async function installPackageFromRegistry({ return _installPackage({ savedObjectsClient, esClient, + logger, installedPkg, paths, packageInfo, @@ -367,6 +368,7 @@ async function installPackageByUpload({ archiveBuffer, contentType, }: InstallUploadedArchiveParams): Promise { + const logger = appContextService.getLogger(); // if an error happens during getInstallType, report that we don't know let installType: InstallType = 'unknown'; const telemetryEvent: PackageUpdateEvent = getTelemetryEvent('', ''); @@ -409,6 +411,7 @@ async function installPackageByUpload({ return _installPackage({ savedObjectsClient, esClient, + logger, installedPkg, paths, packageInfo, diff --git a/x-pack/plugins/fleet/server/services/setup.ts b/x-pack/plugins/fleet/server/services/setup.ts index d39a5f4473199..18c66e8267468 100644 --- a/x-pack/plugins/fleet/server/services/setup.ts +++ b/x-pack/plugins/fleet/server/services/setup.ts @@ -139,8 +139,8 @@ export async function ensureFleetGlobalEsAssets( // Ensure Global Fleet ES assets are installed logger.debug('Creating Fleet component template and ingest pipeline'); const globalAssetsRes = await Promise.all([ - ensureDefaultComponentTemplate(esClient), - ensureFleetFinalPipelineIsInstalled(esClient), + ensureDefaultComponentTemplate(esClient, logger), + ensureFleetFinalPipelineIsInstalled(esClient, logger), ]); if (globalAssetsRes.some((asset) => asset.isCreated)) {