From 3ffcedaf2bb3c4ab61d0b2e49571aa9e9fec735c Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Fri, 13 Jan 2023 12:44:11 +0000 Subject: [PATCH 1/4] use optimistic locking when updating es assets for upgrade package policy --- .../server/services/epm/packages/install.ts | 88 +++++++++++++++---- 1 file changed, 69 insertions(+), 19 deletions(-) diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index 1c5f18c4a45e8..38bfa54d823e6 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -20,6 +20,8 @@ import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants'; import pRetry from 'p-retry'; +import { uniqBy } from 'lodash'; + import { isPackagePrerelease, getNormalizedDataStreams } from '../../../../common/services'; import { FLEET_INSTALL_FORMAT_VERSION } from '../../../constants/fleet_es_assets'; @@ -783,15 +785,10 @@ export const updateEsAssetReferences = async ( return true; }); - const deduplicatedAssets = - [...withAssetsRemoved, ...assetsToAdd].reduce((acc, currentAsset) => { - const foundAsset = acc.find((asset: EsAssetReference) => asset.id === currentAsset.id); - if (!foundAsset) { - return acc.concat([currentAsset]); - } else { - return acc; - } - }, [] as EsAssetReference[]) || []; + const deduplicatedAssets = uniqBy( + [...withAssetsRemoved, ...assetsToAdd], + ({ type, id }) => `${type}-${id}` + ); const { attributes: { installed_es: updatedAssets }, @@ -813,11 +810,48 @@ export const updateEsAssetReferences = async ( ), // Use a lower number of retries for ES assets since they're installed in serial and can only conflict with // the single Kibana update call. - { retries: 5 } + { retries: 10 } ); return updatedAssets ?? []; }; +/** + * Utility function for adding assets the installed_es field of a package + * uses optimistic concurrency control to prevent missed updates + */ +export const optimisticallyAddEsAssetReferences = async ( + savedObjectsClient: SavedObjectsClientContract, + pkgName: string, + assetsToAdd: EsAssetReference[] +): Promise => { + const addEsAsstes = async () => { + const so = await savedObjectsClient.get(PACKAGES_SAVED_OBJECT_TYPE, pkgName); + + const installedEs = so.attributes.installed_es ?? []; + + const deduplicatedAssets = uniqBy( + [...installedEs, ...assetsToAdd], + ({ type, id }) => `${type}-${id}` + ); + + const { + attributes: { installed_es: updatedAssets }, + } = await savedObjectsClient.update( + PACKAGES_SAVED_OBJECT_TYPE, + pkgName, + { + installed_es: deduplicatedAssets, + }, + { + version: so.version, + } + ); + + return updatedAssets ?? []; + }; + + return pRetry(addEsAsstes, { retries: 20 }); +}; export async function ensurePackagesCompletedInstall( savedObjectsClient: SavedObjectsClientContract, @@ -896,15 +930,31 @@ export async function installIndexTemplatesAndPipelines({ // conditions on updating the installed_es field at the same time // These must be saved before we actually attempt to install the templates or pipelines so that we know what to // cleanup in the case that a single asset fails to install. - const newEsReferences = await updateEsAssetReferences( - savedObjectsClient, - packageInfo.name, - esReferences, - { - assetsToRemove: onlyForDataStreams ? [] : preparedIndexTemplates.assetsToRemove, - assetsToAdd: [...preparedIngestPipelines.assetsToAdd, ...preparedIndexTemplates.assetsToAdd], - } - ); + let newEsReferences: EsAssetReference[] = []; + + if (onlyForDataStreams) { + // if onlyForDataStreams is present that means we are in create package policy flow + // not install flow, meaning we do not have a lock on the installation SO + // so we need to use optimistic concurrency control + newEsReferences = await optimisticallyAddEsAssetReferences( + savedObjectsClient, + packageInfo.name, + [...preparedIngestPipelines.assetsToAdd, ...preparedIndexTemplates.assetsToAdd] + ); + } else { + newEsReferences = await updateEsAssetReferences( + savedObjectsClient, + packageInfo.name, + esReferences, + { + assetsToRemove: preparedIndexTemplates.assetsToRemove, + assetsToAdd: [ + ...preparedIngestPipelines.assetsToAdd, + ...preparedIndexTemplates.assetsToAdd, + ], + } + ); + } // Install index templates and ingest pipelines in parallel since they typically take the longest const [installedTemplates] = await Promise.all([ From d3301f3acb56ece16146b718f3b7608f1ea0360d Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Thu, 19 Jan 2023 10:09:30 +0000 Subject: [PATCH 2/4] reduce number of retries --- x-pack/plugins/fleet/server/services/epm/packages/install.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index 38bfa54d823e6..fdf3a504e46dd 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -850,7 +850,7 @@ export const optimisticallyAddEsAssetReferences = async ( return updatedAssets ?? []; }; - return pRetry(addEsAsstes, { retries: 20 }); + return pRetry(addEsAsstes, { retries: 10 }); }; export async function ensurePackagesCompletedInstall( From 9842559d269ef2c47f38ab265d00cfbf0c3d92e9 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Thu, 19 Jan 2023 10:11:55 +0000 Subject: [PATCH 3/4] reduce standard update retries --- x-pack/plugins/fleet/server/services/epm/packages/install.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index fdf3a504e46dd..c38bc77afd606 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -810,7 +810,7 @@ export const updateEsAssetReferences = async ( ), // Use a lower number of retries for ES assets since they're installed in serial and can only conflict with // the single Kibana update call. - { retries: 10 } + { retries: 5 } ); return updatedAssets ?? []; From 2a970954ba1687702c2c1a16e43e0fec365a73a0 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Thu, 19 Jan 2023 17:07:53 +0000 Subject: [PATCH 4/4] only retry on conflict errors --- .../fleet/server/services/epm/packages/install.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index c38bc77afd606..36bde4d4d0cbc 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -15,6 +15,7 @@ import type { SavedObjectsClientContract, Logger, } from '@kbn/core/server'; +import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants'; @@ -824,7 +825,7 @@ export const optimisticallyAddEsAssetReferences = async ( pkgName: string, assetsToAdd: EsAssetReference[] ): Promise => { - const addEsAsstes = async () => { + const addEsAssets = async () => { const so = await savedObjectsClient.get(PACKAGES_SAVED_OBJECT_TYPE, pkgName); const installedEs = so.attributes.installed_es ?? []; @@ -850,7 +851,13 @@ export const optimisticallyAddEsAssetReferences = async ( return updatedAssets ?? []; }; - return pRetry(addEsAsstes, { retries: 10 }); + const onlyRetryConflictErrors = (err: Error) => { + if (!SavedObjectsErrorHelpers.isConflictError(err)) { + throw err; + } + }; + + return pRetry(addEsAssets, { retries: 10, onFailedAttempt: onlyRetryConflictErrors }); }; export async function ensurePackagesCompletedInstall(