diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index a16b0c28cff81..cf00ec4e3aed1 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -15,11 +15,14 @@ import type { SavedObjectsClientContract, Logger, } from '@kbn/core/server'; +import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants'; import pRetry from 'p-retry'; +import { uniqBy } from 'lodash'; + import { isPackagePrerelease, getNormalizedDataStreams } from '../../../../common/services'; import { FLEET_INSTALL_FORMAT_VERSION } from '../../../constants/fleet_es_assets'; @@ -797,15 +800,10 @@ export const updateEsAssetReferences = async ( return true; }); - const deduplicatedAssets = - [...withAssetsRemoved, ...assetsToAdd].reduce((acc, currentAsset) => { - const foundAsset = acc.find((asset: EsAssetReference) => asset.id === currentAsset.id); - if (!foundAsset) { - return acc.concat([currentAsset]); - } else { - return acc; - } - }, [] as EsAssetReference[]) || []; + const deduplicatedAssets = uniqBy( + [...withAssetsRemoved, ...assetsToAdd], + ({ type, id }) => `${type}-${id}` + ); const { attributes: { installed_es: updatedAssets }, @@ -832,6 +830,49 @@ export const updateEsAssetReferences = async ( return updatedAssets ?? []; }; +/** + * Utility function for adding assets the installed_es field of a package + * uses optimistic concurrency control to prevent missed updates + */ +export const optimisticallyAddEsAssetReferences = async ( + savedObjectsClient: SavedObjectsClientContract, + pkgName: string, + assetsToAdd: EsAssetReference[] +): Promise => { + const addEsAssets = async () => { + const so = await savedObjectsClient.get(PACKAGES_SAVED_OBJECT_TYPE, pkgName); + + const installedEs = so.attributes.installed_es ?? []; + + const deduplicatedAssets = uniqBy( + [...installedEs, ...assetsToAdd], + ({ type, id }) => `${type}-${id}` + ); + + const { + attributes: { installed_es: updatedAssets }, + } = await savedObjectsClient.update( + PACKAGES_SAVED_OBJECT_TYPE, + pkgName, + { + installed_es: deduplicatedAssets, + }, + { + version: so.version, + } + ); + + return updatedAssets ?? []; + }; + + const onlyRetryConflictErrors = (err: Error) => { + if (!SavedObjectsErrorHelpers.isConflictError(err)) { + throw err; + } + }; + + return pRetry(addEsAssets, { retries: 10, onFailedAttempt: onlyRetryConflictErrors }); +}; export async function ensurePackagesCompletedInstall( savedObjectsClient: SavedObjectsClientContract, @@ -910,15 +951,31 @@ export async function installIndexTemplatesAndPipelines({ // conditions on updating the installed_es field at the same time // These must be saved before we actually attempt to install the templates or pipelines so that we know what to // cleanup in the case that a single asset fails to install. - const newEsReferences = await updateEsAssetReferences( - savedObjectsClient, - packageInfo.name, - esReferences, - { - assetsToRemove: onlyForDataStreams ? [] : preparedIndexTemplates.assetsToRemove, - assetsToAdd: [...preparedIngestPipelines.assetsToAdd, ...preparedIndexTemplates.assetsToAdd], - } - ); + let newEsReferences: EsAssetReference[] = []; + + if (onlyForDataStreams) { + // if onlyForDataStreams is present that means we are in create package policy flow + // not install flow, meaning we do not have a lock on the installation SO + // so we need to use optimistic concurrency control + newEsReferences = await optimisticallyAddEsAssetReferences( + savedObjectsClient, + packageInfo.name, + [...preparedIngestPipelines.assetsToAdd, ...preparedIndexTemplates.assetsToAdd] + ); + } else { + newEsReferences = await updateEsAssetReferences( + savedObjectsClient, + packageInfo.name, + esReferences, + { + assetsToRemove: preparedIndexTemplates.assetsToRemove, + assetsToAdd: [ + ...preparedIngestPipelines.assetsToAdd, + ...preparedIndexTemplates.assetsToAdd, + ], + } + ); + } // Install index templates and ingest pipelines in parallel since they typically take the longest const [installedTemplates] = await Promise.all([