diff --git a/x-pack/plugins/fleet/common/services/policy_template.ts b/x-pack/plugins/fleet/common/services/policy_template.ts index 912ec96f6091f..4265767a9be25 100644 --- a/x-pack/plugins/fleet/common/services/policy_template.ts +++ b/x-pack/plugins/fleet/common/services/policy_template.ts @@ -13,6 +13,7 @@ import type { PackageInfo, RegistryVarsEntry, RegistryDataStream, + InstallablePackage, } from '../types'; const DATA_STREAM_DATASET_VAR: RegistryVarsEntry = { @@ -52,7 +53,10 @@ export const getNormalizedInputs = (policyTemplate: RegistryPolicyTemplate): Reg return [input]; }; -export const getNormalizedDataStreams = (packageInfo: PackageInfo): RegistryDataStream[] => { +export const getNormalizedDataStreams = ( + packageInfo: PackageInfo | InstallablePackage, + datasetName?: string +): RegistryDataStream[] => { if (packageInfo.type !== 'input') { return packageInfo.data_streams || []; } @@ -66,11 +70,12 @@ export const getNormalizedDataStreams = (packageInfo: PackageInfo): RegistryData return policyTemplates.map((policyTemplate) => { const dataStream: RegistryDataStream = { type: policyTemplate.type, - dataset: createDefaultDatasetName(packageInfo, policyTemplate), + dataset: datasetName || createDefaultDatasetName(packageInfo, policyTemplate), title: policyTemplate.title + ' Dataset', release: packageInfo.release || 'ga', package: packageInfo.name, path: packageInfo.name, + elasticsearch: packageInfo.elasticsearch, streams: [ { input: policyTemplate.input, @@ -104,6 +109,6 @@ const addDatasetVarIfNotPresent = (vars?: RegistryVarsEntry[]): RegistryVarsEntr }; const createDefaultDatasetName = ( - packageInfo: PackageInfo, - policyTemplate: RegistryPolicyInputOnlyTemplate + packageInfo: { name: string }, + policyTemplate: { name: string } ): string => packageInfo.name + '.' + policyTemplate.name; diff --git a/x-pack/plugins/fleet/common/types/models/package_spec.ts b/x-pack/plugins/fleet/common/types/models/package_spec.ts index 52f993499ea4e..48dd619ae3aea 100644 --- a/x-pack/plugins/fleet/common/types/models/package_spec.ts +++ b/x-pack/plugins/fleet/common/types/models/package_spec.ts @@ -5,7 +5,7 @@ * 2.0. */ -import type { RegistryPolicyTemplate, RegistryVarsEntry } from './epm'; +import type { RegistryElasticsearch, RegistryPolicyTemplate, RegistryVarsEntry } from './epm'; // Based on https://github.com/elastic/package-spec/blob/master/versions/1/manifest.spec.yml#L8 export interface PackageSpecManifest { @@ -27,6 +27,10 @@ export interface PackageSpecManifest { policy_templates?: RegistryPolicyTemplate[]; vars?: RegistryVarsEntry[]; owner: { github: string }; + elasticsearch?: Pick< + RegistryElasticsearch, + 'index_template.settings' | 'index_template.mappings' + >; } export type PackageSpecPackageType = 'integration' | 'input'; diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/dataset_combo.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/dataset_combo.tsx index 15e1a18f26dcd..6908f8fea130f 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/dataset_combo.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/dataset_combo.tsx @@ -9,27 +9,52 @@ import React, { useEffect, useState } from 'react'; import { EuiComboBox } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; +import type { DataStream } from '../../../../../../../../../common/types'; + +interface SelectedDataset { + dataset: string; + package: string; +} + +const GENERIC_DATASET_NAME = 'generic'; + export const DatasetComboBox: React.FC<{ - value: any; - onChange: (newValue: any) => void; - datasets: string[]; + value?: SelectedDataset | string; + onChange: (newValue: SelectedDataset) => void; + datastreams: DataStream[]; + pkgName?: string; isDisabled?: boolean; -}> = ({ value, onChange, datasets, isDisabled }) => { - const datasetOptions = datasets.map((dataset: string) => ({ label: dataset })) ?? []; - const defaultOption = 'generic'; - const [selectedOptions, setSelectedOptions] = useState>([ - { - label: value ?? defaultOption, - }, - ]); +}> = ({ value, onChange, datastreams, isDisabled, pkgName = '' }) => { + const datasetOptions = + datastreams.map((datastream: DataStream) => ({ + label: datastream.dataset, + value: datastream, + })) ?? []; + const existingGenericStream = datasetOptions.find((ds) => ds.label === GENERIC_DATASET_NAME); + const valueAsOption = value + ? typeof value === 'string' + ? { label: value, value: { dataset: value, package: pkgName } } + : { label: value.dataset, value: { dataset: value.dataset, package: value.package } } + : undefined; + const defaultOption = valueAsOption || + existingGenericStream || { + label: GENERIC_DATASET_NAME, + value: { dataset: GENERIC_DATASET_NAME, package: pkgName }, + }; + + const [selectedOptions, setSelectedOptions] = useState>([defaultOption]); useEffect(() => { - if (!value) onChange(defaultOption); - }, [value, defaultOption, onChange]); + if (!value || typeof value === 'string') onChange(defaultOption.value as SelectedDataset); + }, [value, defaultOption.value, onChange, pkgName]); - const onDatasetChange = (newSelectedOptions: Array<{ label: string }>) => { + const onDatasetChange = (newSelectedOptions: Array<{ label: string; value?: DataStream }>) => { setSelectedOptions(newSelectedOptions); - onChange(newSelectedOptions[0]?.label); + const dataStream = newSelectedOptions[0].value; + onChange({ + dataset: newSelectedOptions[0].label, + package: !dataStream || typeof dataStream === 'string' ? pkgName : dataStream.package, + }); }; const onCreateOption = (searchValue: string = '') => { @@ -39,9 +64,13 @@ export const DatasetComboBox: React.FC<{ } const newOption = { label: searchValue, + value: { dataset: searchValue, package: pkgName }, }; setSelectedOptions([newOption]); - onChange(searchValue); + onChange({ + dataset: searchValue, + package: pkgName, + }); }; return ( { - it('should move datasets up that match name', () => { - const datasets = orderDatasets( - ['system.memory', 'elastic_agent', 'elastic_agent.filebeat', 'system.cpu'], - 'elastic_agent' - ); - - expect(datasets).toEqual([ - 'elastic_agent', - 'elastic_agent.filebeat', - 'system.cpu', - 'system.memory', - ]); - }); - - it('should order alphabetically if name does not match', () => { - const datasets = orderDatasets(['system.memory', 'elastic_agent'], 'nginx'); - - expect(datasets).toEqual(['elastic_agent', 'system.memory']); - }); -}); diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/order_datasets.ts b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/order_datasets.ts deleted file mode 100644 index 8262af7064142..0000000000000 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/order_datasets.ts +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { partition } from 'lodash'; - -export function orderDatasets(datasetList: string[], name: string): string[] { - const [relevantDatasets, otherDatasets] = partition(datasetList.sort(), (record) => - record.startsWith(name) - ); - const datasets = relevantDatasets.concat(otherDatasets); - return datasets; -} diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_stream.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_stream.tsx index 585385f1574c9..50b585cac9c89 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_stream.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_stream.tsx @@ -42,7 +42,7 @@ import { PackagePolicyEditorDatastreamMappings } from '../../datastream_mappings import { ExperimentDatastreamSettings } from './experimental_datastream_settings'; import { PackagePolicyInputVarField } from './package_policy_input_var_field'; import { useDataStreamId } from './hooks'; -import { orderDatasets } from './order_datasets'; +import { sortDatastreamsByDataset } from './sort_datastreams'; const ScrollAnchor = styled.div` display: none; @@ -144,9 +144,8 @@ export const PackagePolicyInputStreamConfig = memo( ); const { data: dataStreamsData } = useGetDataStreams(); - const datasetList = - uniq(dataStreamsData?.data_streams.map((dataStream) => dataStream.dataset)) ?? []; - const datasets = orderDatasets(datasetList, packageInfo.name); + const datasetList = uniq(dataStreamsData?.data_streams) ?? []; + const datastreams = sortDatastreamsByDataset(datasetList, packageInfo.name); return ( <> @@ -227,7 +226,8 @@ export const PackagePolicyInputStreamConfig = memo( errors={inputStreamValidationResults?.vars![varName]} forceShowErrors={forceShowErrors} packageType={packageInfo.type} - datasets={datasets} + packageName={packageInfo.name} + datastreams={datastreams} isEditPage={isEditPage} /> @@ -289,7 +289,8 @@ export const PackagePolicyInputStreamConfig = memo( errors={inputStreamValidationResults?.vars![varName]} forceShowErrors={forceShowErrors} packageType={packageInfo.type} - datasets={datasets} + packageName={packageInfo.name} + datastreams={datastreams} isEditPage={isEditPage} /> diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_var_field.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_var_field.tsx index 48b67d3078336..3c5115442f44f 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_var_field.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/package_policy_input_var_field.tsx @@ -22,7 +22,7 @@ import styled from 'styled-components'; import { CodeEditor } from '@kbn/kibana-react-plugin/public'; -import type { RegistryVarsEntry } from '../../../../../../types'; +import type { DataStream, RegistryVarsEntry } from '../../../../../../types'; import { MultiTextInput } from './multi_text_input'; import { DatasetComboBox } from './dataset_combo'; @@ -39,7 +39,8 @@ export const PackagePolicyInputVarField: React.FunctionComponent<{ forceShowErrors?: boolean; frozen?: boolean; packageType?: string; - datasets?: string[]; + packageName?: string; + datastreams?: DataStream[]; isEditPage?: boolean; }> = memo( ({ @@ -50,7 +51,8 @@ export const PackagePolicyInputVarField: React.FunctionComponent<{ forceShowErrors, frozen, packageType, - datasets = [], + packageName, + datastreams = [], isEditPage = false, }) => { const [isDirty, setIsDirty] = useState(false); @@ -73,7 +75,8 @@ export const PackagePolicyInputVarField: React.FunctionComponent<{ if (name === 'data_stream.dataset' && packageType === 'input') { return ( ({ dataset } as DataStream); +describe('orderDatasets', () => { + it('should move datasets up that match package name', () => { + const datasets = sortDatastreamsByDataset( + [ds('system.memory'), ds('elastic_agent'), ds('elastic_agent.filebeat'), ds('system.cpu')], + 'elastic_agent' + ); + + expect(datasets).toEqual([ + ds('elastic_agent'), + ds('elastic_agent.filebeat'), + ds('system.cpu'), + ds('system.memory'), + ]); + }); + + it('should order alphabetically if name does not match', () => { + const datasets = sortDatastreamsByDataset([ds('system.memory'), ds('elastic_agent')], 'nginx'); + + expect(datasets).toEqual([ds('elastic_agent'), ds('system.memory')]); + }); +}); diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/sort_datastreams.ts b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/sort_datastreams.ts new file mode 100644 index 0000000000000..71ea6ee4b44d6 --- /dev/null +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/components/steps/components/sort_datastreams.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { partition, sortBy } from 'lodash'; + +import type { DataStream } from '../../../../../../../../../common/types'; + +// sort data streams by dataset name, but promote datastreams that are from this package to the start +export function sortDatastreamsByDataset(datasetList: DataStream[], name: string): DataStream[] { + const [relevantDatasets, otherDatasets] = partition(sortBy(datasetList, 'dataset'), (record) => + record.dataset.startsWith(name) + ); + const datasets = relevantDatasets.concat(otherDatasets); + return datasets; +} diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/multi_page_layout/components/page_steps/add_integration.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/multi_page_layout/components/page_steps/add_integration.tsx index cd0164124bfbc..f84d3fbfc2988 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/multi_page_layout/components/page_steps/add_integration.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/multi_page_layout/components/page_steps/add_integration.tsx @@ -28,6 +28,7 @@ import type { PackagePolicyValidationResults } from '../../../services'; import { validatePackagePolicy, validationHasErrors } from '../../../services'; import { NotObscuredByBottomBar } from '..'; import { StepConfigurePackagePolicy, StepDefinePackagePolicy } from '../../../components'; +import { prepareInputPackagePolicyDataset } from '../../../services/prepare_input_pkg_policy_dataset'; const ExpandableAdvancedSettings: React.FC = ({ children }) => { const [isShowingAdvanced, setIsShowingAdvanced] = useState(false); @@ -147,7 +148,11 @@ export const AddIntegrationPageStep: React.FC = (props force?: boolean; }) => { setFormState('LOADING'); - const result = await sendCreatePackagePolicy({ ...newPackagePolicy, force }); + const { policy, forceCreateNeeded } = await prepareInputPackagePolicyDataset(newPackagePolicy); + const result = await sendCreatePackagePolicy({ + ...policy, + force: forceCreateNeeded || force, + }); setFormState('SUBMITTED'); return result; }; diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/services/prepare_input_pkg_policy_dataset.test.ts b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/services/prepare_input_pkg_policy_dataset.test.ts new file mode 100644 index 0000000000000..09e17b744e15c --- /dev/null +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/services/prepare_input_pkg_policy_dataset.test.ts @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { NewPackagePolicy } from '../../../../types'; + +import { prepareInputPackagePolicyDataset } from './prepare_input_pkg_policy_dataset'; +const customLogsPackagePolicyWithDataset = ( + datasetValue: + | { + dataset: string; + package: string; + } + | string +) => ({ + name: 'custom_logs-1', + description: '', + namespace: 'default', + policy_id: '05034740-9285-11ed-87a3-4ff599bc9864', + enabled: true, + inputs: [ + { + type: 'logfile', + policy_template: 'first_policy_template', + enabled: true, + streams: [ + { + enabled: true, + data_stream: { + type: 'logs', + dataset: 'custom_logs.first_policy_template', + }, + vars: { + paths: { + type: 'text', + value: ['/tmp/test.log'], + }, + tags: { + type: 'text', + value: ['tag1'], + }, + ignore_older: { + value: '72h', + type: 'text', + }, + 'data_stream.dataset': { + type: 'text', + value: datasetValue, + }, + }, + }, + ], + }, + ], + package: { + name: 'custom_logs', + title: 'Custom Logs', + version: '1.1.0', + experimental_data_stream_features: [], + }, +}); + +const expectDatasetVarToEqual = (policy: NewPackagePolicy, expected: any) => + expect(policy?.inputs?.[0]?.streams?.[0]?.vars?.['data_stream.dataset']?.value).toEqual(expected); + +describe('prepareInputPackagePolicyDataset', function () { + it('should do nothing if dataset value is not an object', function () { + const newPolicy = customLogsPackagePolicyWithDataset('generic'); + const result = prepareInputPackagePolicyDataset(newPolicy); + expect(result.forceCreateNeeded).toEqual(false); + expect(result.policy).toEqual(newPolicy); + }); + it('should do nothing if no inputs', function () { + const newPolicy = customLogsPackagePolicyWithDataset('generic'); + newPolicy.inputs = []; + const result = prepareInputPackagePolicyDataset(newPolicy); + expect(result.forceCreateNeeded).toEqual(false); + expect(result.policy).toEqual(newPolicy); + }); + it('should not force create if dataset from same package', function () { + const newPolicy = customLogsPackagePolicyWithDataset({ + dataset: 'generic', + package: 'custom_logs', + }); + const result = prepareInputPackagePolicyDataset(newPolicy); + expect(result.forceCreateNeeded).toEqual(false); + expectDatasetVarToEqual(result.policy, 'generic'); + }); + it('should force create if dataset from different package', function () { + const newPolicy = customLogsPackagePolicyWithDataset({ + dataset: 'generic', + package: 'not_custom_logs', + }); + const result = prepareInputPackagePolicyDataset(newPolicy); + expect(result.forceCreateNeeded).toEqual(true); + expectDatasetVarToEqual(result.policy, 'generic'); + }); +}); diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/services/prepare_input_pkg_policy_dataset.ts b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/services/prepare_input_pkg_policy_dataset.ts new file mode 100644 index 0000000000000..e4f1fae4419c1 --- /dev/null +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/services/prepare_input_pkg_policy_dataset.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { NewPackagePolicy } from '../../../../types'; + +export function prepareInputPackagePolicyDataset(newPolicy: NewPackagePolicy): { + policy: NewPackagePolicy; + forceCreateNeeded: boolean; +} { + let forceCreateNeeded = false; + const { inputs } = newPolicy; + + if (!inputs || !inputs.length) { + return { policy: newPolicy, forceCreateNeeded: false }; + } + + const newInputs = inputs.map((input) => { + const { streams } = input; + if (!streams) { + return input; + } + + const newStreams = streams.map((stream) => { + if ( + !stream.vars || + !stream.vars['data_stream.dataset'] || + !stream.vars['data_stream.dataset'].value?.package + ) { + return stream; + } + + const datasetVar = stream.vars['data_stream.dataset']; + + forceCreateNeeded = datasetVar.value?.package !== newPolicy?.package?.name; + stream.vars['data_stream.dataset'] = { + ...datasetVar, + value: datasetVar.value?.dataset, + }; + + return stream; + }); + + return { + ...input, + streams: newStreams, + }; + }); + + return { + policy: { + ...newPolicy, + inputs: newInputs, + }, + forceCreateNeeded, + }; +} diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/single_page_layout/hooks/form.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/single_page_layout/hooks/form.tsx index a1b0e2065b3a5..24277e464582c 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/single_page_layout/hooks/form.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/create_package_policy_page/single_page_layout/hooks/form.tsx @@ -38,6 +38,7 @@ import type { PackagePolicyValidationResults } from '../../services'; import type { PackagePolicyFormState } from '../../types'; import { SelectedPolicyTab } from '../../components'; import { useOnSaveNavigate } from '../../hooks'; +import { prepareInputPackagePolicyDataset } from '../../services/prepare_input_pkg_policy_dataset'; async function createAgentPolicy({ packagePolicy, @@ -63,7 +64,11 @@ async function createAgentPolicy({ } async function savePackagePolicy(pkgPolicy: CreatePackagePolicyRequest['body']) { - const result = await sendCreatePackagePolicy(pkgPolicy); + const { policy, forceCreateNeeded } = await prepareInputPackagePolicyDataset(pkgPolicy); + const result = await sendCreatePackagePolicy({ + ...policy, + ...(forceCreateNeeded && { force: true }), + }); return result; } diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/services/devtools_request.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/services/devtools_request.tsx index 51831fdfa6b7c..b9c69d083d21d 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/services/devtools_request.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agent_policy/services/devtools_request.tsx @@ -105,7 +105,12 @@ function formatVars(vars: NewPackagePolicy['inputs'][number]['vars']) { } return Object.entries(vars).reduce((acc, [varKey, varRecord]) => { - acc[varKey] = varRecord?.value; + // the data_stream.dataset var uses an internal format before we send it + if (varKey === 'data_stream.dataset' && varRecord?.value?.dataset) { + acc[varKey] = varRecord?.value.dataset; + } else { + acc[varKey] = varRecord?.value; + } return acc; }, {} as SimplifiedVars); diff --git a/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts b/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts index 274ab22999ff8..e6e497991ca82 100644 --- a/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts +++ b/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts @@ -12,11 +12,10 @@ import { KibanaSavedObjectType } from '../../../common/types'; import type { GetDataStreamsResponse } from '../../../common/types'; import { getPackageSavedObjects } from '../../services/epm/packages/get'; import { defaultFleetErrorHandler } from '../../errors'; +import { dataStreamService } from '../../services/data_streams'; import { getDataStreamsQueryMetadata } from './get_data_streams_query_metadata'; -const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*,synthetics-*-*'; - interface ESDataStreamInfo { name: string; timestamp_field: { @@ -49,13 +48,9 @@ export const getListHandler: RequestHandler = async (context, request, response) try { // Get matching data streams, their stats, and package SOs - const [ - { data_streams: dataStreamsInfo }, - { data_streams: dataStreamStats }, - packageSavedObjects, - ] = await Promise.all([ - esClient.indices.getDataStream({ name: DATA_STREAM_INDEX_PATTERN }), - esClient.indices.dataStreamsStats({ name: DATA_STREAM_INDEX_PATTERN, human: true }), + const [dataStreamsInfo, dataStreamStats, packageSavedObjects] = await Promise.all([ + dataStreamService.getAllFleetDataStreams(esClient), + dataStreamService.getAllFleetDataStreamsStats(esClient), getPackageSavedObjects(savedObjects.client), ]); diff --git a/x-pack/plugins/fleet/server/services/data_streams.ts b/x-pack/plugins/fleet/server/services/data_streams.ts new file mode 100644 index 0000000000000..aef5ee1e6570e --- /dev/null +++ b/x-pack/plugins/fleet/server/services/data_streams.ts @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { IndicesDataStream, IndicesIndexTemplate } from '@elastic/elasticsearch/lib/api/types'; +import type { ElasticsearchClient } from '@kbn/core/server'; + +const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*,synthetics-*-*'; + +class DataStreamService { + public async getAllFleetDataStreams(esClient: ElasticsearchClient) { + const { data_streams: dataStreamsInfo } = await esClient.indices.getDataStream({ + name: DATA_STREAM_INDEX_PATTERN, + }); + + return dataStreamsInfo; + } + + public async getAllFleetDataStreamsStats(esClient: ElasticsearchClient) { + const { data_streams: dataStreamStats } = await esClient.indices.dataStreamsStats({ + name: DATA_STREAM_INDEX_PATTERN, + human: true, + }); + + return dataStreamStats; + } + + public streamPartsToIndexPattern({ type, dataset }: { dataset: string; type: string }) { + return `${type}-${dataset}-*`; + } + + public async getMatchingDataStreams( + esClient: ElasticsearchClient, + dataStreamParts: { + dataset: string; + type: string; + } + ): Promise { + try { + const { data_streams: dataStreamsInfo } = await esClient.indices.getDataStream({ + name: this.streamPartsToIndexPattern(dataStreamParts), + }); + + return dataStreamsInfo; + } catch (e) { + if (e.statusCode === 404) { + return []; + } + throw e; + } + } + + public async getMatchingIndexTemplate( + esClient: ElasticsearchClient, + dataStreamParts: { + dataset: string; + type: string; + } + ): Promise { + try { + const { index_templates: indexTemplates } = await esClient.indices.getIndexTemplate({ + name: `${dataStreamParts.type}-${dataStreamParts.dataset}`, + }); + + return indexTemplates[0]?.index_template; + } catch (e) { + if (e.statusCode === 404) { + return null; + } + throw e; + } + } +} + +export const dataStreamService = new DataStreamService(); diff --git a/x-pack/plugins/fleet/server/services/epm/archive/parse.test.ts b/x-pack/plugins/fleet/server/services/epm/archive/parse.test.ts index 32393eebd12e5..27c2baf656791 100644 --- a/x-pack/plugins/fleet/server/services/epm/archive/parse.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/archive/parse.test.ts @@ -4,7 +4,11 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -import { parseDefaultIngestPipeline, parseDataStreamElasticsearchEntry } from './parse'; +import { + parseDefaultIngestPipeline, + parseDataStreamElasticsearchEntry, + parseTopLevelElasticsearchEntry, +} from './parse'; describe('parseDefaultIngestPipeline', () => { it('Should return undefined for stream without any elasticsearch dir', () => { expect( @@ -142,3 +146,89 @@ describe('parseDataStreamElasticsearchEntry', () => { }); }); }); + +describe('parseTopLevelElasticsearchEntry', () => { + it('Should handle undefined elasticsearch', () => { + expect(parseTopLevelElasticsearchEntry()).toEqual({}); + }); + it('Should handle empty elasticsearch', () => { + expect(parseTopLevelElasticsearchEntry({})).toEqual({}); + }); + it('Should not include junk keys', () => { + expect(parseTopLevelElasticsearchEntry({ a: 1, b: 2 })).toEqual({}); + }); + it('Should add privileges', () => { + expect( + parseTopLevelElasticsearchEntry({ privileges: { index: ['priv1'], cluster: ['priv2'] } }) + ).toEqual({ privileges: { index: ['priv1'], cluster: ['priv2'] } }); + }); + it('Should add index_template mappings and expand dots', () => { + expect( + parseTopLevelElasticsearchEntry({ + index_template: { mappings: { dynamic: false, something: { 'dot.somethingelse': 'val' } } }, + }) + ).toEqual({ + 'index_template.mappings': { dynamic: false, something: { dot: { somethingelse: 'val' } } }, + }); + }); + it('Should add index_template settings and expand dots', () => { + expect( + parseTopLevelElasticsearchEntry({ + index_template: { + settings: { + index: { + codec: 'best_compression', + 'sort.field': 'monitor.id', + }, + }, + }, + }) + ).toEqual({ + 'index_template.settings': { + index: { + codec: 'best_compression', + sort: { field: 'monitor.id' }, + }, + }, + }); + }); + it('Should handle dotted values for mappings and settings', () => { + expect( + parseTopLevelElasticsearchEntry({ + 'index_template.mappings': { dynamic: false }, + 'index_template.settings': { 'index.lifecycle.name': 'reference' }, + }) + ).toEqual({ + 'index_template.mappings': { dynamic: false }, + 'index_template.settings': { 'index.lifecycle.name': 'reference' }, + }); + }); + it('Should handle non-dotted values for privileges', () => { + expect( + parseTopLevelElasticsearchEntry({ + privileges: { + indices: ['read'], + cluster: ['test'], + }, + }) + ).toEqual({ + privileges: { + indices: ['read'], + cluster: ['test'], + }, + }); + }); + it('Should handle dotted values for privileges', () => { + expect( + parseTopLevelElasticsearchEntry({ + 'privileges.indices': ['read'], + 'privileges.cluster': ['test'], + }) + ).toEqual({ + privileges: { + indices: ['read'], + cluster: ['test'], + }, + }); + }); +}); diff --git a/x-pack/plugins/fleet/server/services/epm/archive/parse.ts b/x-pack/plugins/fleet/server/services/epm/archive/parse.ts index 80642496c4fb8..3e3329e49a59a 100644 --- a/x-pack/plugins/fleet/server/services/epm/archive/parse.ts +++ b/x-pack/plugins/fleet/server/services/epm/archive/parse.ts @@ -117,6 +117,7 @@ const optionalArchivePackageProps: readonly OptionalPackageProp[] = [ 'icons', 'policy_templates', 'release', + 'elasticsearch', ] as const; const registryInputProps = Object.values(RegistryInputKeys); @@ -214,6 +215,9 @@ function parseAndVerifyArchive( // at least have all required properties // get optional values and combine into one object for the remaining operations const optGiven = pick(manifest, optionalArchivePackageProps); + if (optGiven.elasticsearch) { + optGiven.elasticsearch = parseTopLevelElasticsearchEntry(optGiven.elasticsearch); + } const parsed: ArchivePackage = { ...reqGiven, ...optGiven }; // Package name and version from the manifest must match those from the toplevel directory @@ -561,6 +565,28 @@ export function parseDataStreamElasticsearchEntry( return parsedElasticsearchEntry; } +export function parseTopLevelElasticsearchEntry(elasticsearch?: Record) { + const parsedElasticsearchEntry: Record = {}; + const expandedElasticsearch = expandDottedObject(elasticsearch); + + if (expandedElasticsearch?.privileges) { + parsedElasticsearchEntry.privileges = expandedElasticsearch.privileges; + } + + if (expandedElasticsearch?.index_template?.mappings) { + parsedElasticsearchEntry['index_template.mappings'] = expandDottedEntries( + expandedElasticsearch.index_template.mappings + ); + } + + if (expandedElasticsearch?.index_template?.settings) { + parsedElasticsearchEntry['index_template.settings'] = expandDottedEntries( + expandedElasticsearch.index_template.settings + ); + } + return parsedElasticsearchEntry; +} + const isDefaultPipelineFile = (pipelinePath: string) => pipelinePath.endsWith(DEFAULT_INGEST_PIPELINE_FILE_NAME_YML) || pipelinePath.endsWith(DEFAULT_INGEST_PIPELINE_FILE_NAME_JSON); diff --git a/x-pack/plugins/fleet/server/services/epm/archive/storage.ts b/x-pack/plugins/fleet/server/services/epm/archive/storage.ts index 3b6f38971eff7..338898d9d05d9 100644 --- a/x-pack/plugins/fleet/server/services/epm/archive/storage.ts +++ b/x-pack/plugins/fleet/server/services/epm/archive/storage.ts @@ -28,7 +28,11 @@ import { appContextService } from '../../app_context'; import { getArchiveEntry, setArchiveEntry, setArchiveFilelist, setPackageInfo } from '.'; import type { ArchiveEntry } from '.'; -import { parseAndVerifyPolicyTemplates, parseAndVerifyStreams } from './parse'; +import { + parseAndVerifyPolicyTemplates, + parseAndVerifyStreams, + parseTopLevelElasticsearchEntry, +} from './parse'; const ONE_BYTE = 1024 * 1024; // could be anything, picked this from https://github.com/elastic/elastic-agent-client/issues/17 @@ -230,7 +234,9 @@ export const getEsPackage = async ( assetPathToObjectId(manifestPath) ); const packageInfo = safeLoad(soResManifest.attributes.data_utf8); - + if (packageInfo.elasticsearch) { + packageInfo.elasticsearch = parseTopLevelElasticsearchEntry(packageInfo.elasticsearch); + } try { const readmePath = `docs/README.md`; await savedObjectsClient.get( diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts index 7ada81c26c926..5e063e26c6a63 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts @@ -9,7 +9,12 @@ import type { TransportRequestOptions } from '@elastic/elasticsearch'; import type { ElasticsearchClient, Logger } from '@kbn/core/server'; import { ElasticsearchAssetType } from '../../../../types'; -import type { EsAssetReference, RegistryDataStream, InstallablePackage } from '../../../../types'; +import type { + EsAssetReference, + RegistryDataStream, + InstallablePackage, + PackageInfo, +} from '../../../../types'; import { getAsset, getPathParts } from '../../archive'; import type { ArchiveEntry } from '../../archive'; import { @@ -34,8 +39,9 @@ import { import type { PipelineInstall, RewriteSubstitution } from './types'; export const prepareToInstallPipelines = ( - installablePackage: InstallablePackage, - paths: string[] + installablePackage: InstallablePackage | PackageInfo, + paths: string[], + onlyForDataStreams?: RegistryDataStream[] ): { assetsToAdd: EsAssetReference[]; install: (esClient: ElasticsearchClient, logger: Logger) => Promise; @@ -43,7 +49,7 @@ export const prepareToInstallPipelines = ( // unlike other ES assets, pipeline names are versioned so after a template is updated // it can be created pointing to the new template, without removing the old one and effecting data // so do not remove the currently installed pipelines here - const dataStreams = installablePackage.data_streams; + const dataStreams = onlyForDataStreams || installablePackage.data_streams; const { version: pkgVersion } = installablePackage; const pipelinePaths = paths.filter((path) => isPipeline(path)); const topLevelPipelinePaths = paths.filter((path) => isTopLevelPipeline(path)); @@ -141,7 +147,7 @@ export async function installAllPipelines({ logger: Logger; paths: string[]; dataStream?: RegistryDataStream; - installablePackage: InstallablePackage; + installablePackage: InstallablePackage | PackageInfo; }): Promise { const pipelinePaths = dataStream ? paths.filter((path) => isDataStreamPipeline(path, dataStream.path)) @@ -219,7 +225,7 @@ async function installPipeline({ esClient: ElasticsearchClient; logger: Logger; pipeline: PipelineInstall; - installablePackage?: InstallablePackage; + installablePackage?: InstallablePackage | PackageInfo; shouldAddCustomPipelineProcessor?: boolean; }): Promise { let pipelineToInstall = appendMetadataToIngestPipeline({ diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts index fd1369748c867..993f26c3a38e0 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/template/install.ts @@ -61,10 +61,11 @@ import { buildDefaultSettings } from './default_settings'; const FLEET_COMPONENT_TEMPLATE_NAMES = FLEET_COMPONENT_TEMPLATES.map((tmpl) => tmpl.name); export const prepareToInstallTemplates = ( - installablePackage: InstallablePackage, + installablePackage: InstallablePackage | PackageInfo, paths: string[], esReferences: EsAssetReference[], - experimentalDataStreamFeatures: ExperimentalDataStreamFeature[] = [] + experimentalDataStreamFeatures: ExperimentalDataStreamFeature[] = [], + onlyForDataStreams?: RegistryDataStream[] ): { assetsToAdd: EsAssetReference[]; assetsToRemove: EsAssetReference[]; @@ -78,7 +79,7 @@ export const prepareToInstallTemplates = ( ); // build templates per data stream from yml files - const dataStreams = installablePackage.data_streams; + const dataStreams = onlyForDataStreams || installablePackage.data_streams; if (!dataStreams) return { assetsToAdd: [], assetsToRemove, install: () => Promise.resolve([]) }; const templates = dataStreams.map((dataStream) => { @@ -343,7 +344,6 @@ async function installDataStreamComponentTemplates({ logger: Logger; componentTemplates: TemplateMap; }) { - // TODO: Check return values for errors await Promise.all( Object.entries(componentTemplates).map(async ([name, body]) => { if (isUserSettingsTemplate(name)) { diff --git a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts index db9803ea70f3a..23cdd399344e7 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.test.ts @@ -23,8 +23,14 @@ jest.mock('./get'); import { updateCurrentWriteIndices } from '../elasticsearch/template/template'; import { installKibanaAssetsAndReferences } from '../kibana/assets/install'; +import { installIndexTemplatesAndPipelines } from './install'; + import { _installPackage } from './_install_package'; +const mockedInstallIndexTemplatesAndPipelines = + installIndexTemplatesAndPipelines as jest.MockedFunction< + typeof installIndexTemplatesAndPipelines + >; const mockedUpdateCurrentWriteIndices = updateCurrentWriteIndices as jest.MockedFunction< typeof updateCurrentWriteIndices >; @@ -57,7 +63,10 @@ describe('_installPackage', () => { // and force it to take long enough for the errors to occur // @ts-expect-error about call signature mockedUpdateCurrentWriteIndices.mockImplementation(async () => await sleep(1000)); - + mockedInstallIndexTemplatesAndPipelines.mockResolvedValue({ + installedTemplates: [], + esReferences: [], + }); const installationPromise = _installPackage({ savedObjectsClient: soClient, // @ts-ignore diff --git a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts index 2109cb00599da..596ad94a067d8 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/_install_package.ts @@ -16,6 +16,8 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import type { IAssignmentService, ITagsClient } from '@kbn/saved-objects-tagging-plugin/server'; +import { getNormalizedDataStreams } from '../../../../common/services'; + import { MAX_TIME_COMPLETE_INSTALL, ASSETS_SAVED_OBJECT_TYPE, @@ -31,17 +33,11 @@ import type { InstallSource, PackageAssetReference, PackageVerificationResult, + IndexTemplateEntry, } from '../../../types'; -import { - ensureFileUploadWriteIndices, - prepareToInstallTemplates, -} from '../elasticsearch/template/install'; +import { ensureFileUploadWriteIndices } from '../elasticsearch/template/install'; import { removeLegacyTemplates } from '../elasticsearch/template/remove_legacy'; -import { - prepareToInstallPipelines, - isTopLevelPipeline, - deletePreviousPipelines, -} from '../elasticsearch/ingest_pipeline'; +import { isTopLevelPipeline, deletePreviousPipelines } from '../elasticsearch/ingest_pipeline'; import { installILMPolicy } from '../elasticsearch/ilm/install'; import { installKibanaAssetsAndReferences } from '../kibana/assets/install'; import { updateCurrentWriteIndices } from '../elasticsearch/template/template'; @@ -52,7 +48,11 @@ import { saveArchiveEntries } from '../archive/storage'; import { ConcurrentInstallOperationError } from '../../../errors'; import { appContextService, packagePolicyService } from '../..'; -import { createInstallation, updateEsAssetReferences, restartInstallation } from './install'; +import { + createInstallation, + restartInstallation, + installIndexTemplatesAndPipelines, +} from './install'; import { withPackageSpan } from './utils'; // this is only exported for testing @@ -171,53 +171,52 @@ export async function _installPackage({ installMlModel(packageInfo, paths, esClient, savedObjectsClient, logger, esReferences) ); - /** - * In order to install assets in parallel, we need to split the preparation step from the installation step. This - * allows us to know which asset references are going to be installed so that we can save them on the packages - * SO before installation begins. In the case of a failure during installing any individual asset, we'll have the - * references necessary to remove any assets in that were successfully installed during the rollback phase. - * - * This split of prepare/install could be extended to all asset types. Besides performance, it also allows us to - * more easily write unit tests against the asset generation code without needing to mock ES responses. - */ - const experimentalDataStreamFeatures = - installedPkg?.attributes?.experimental_data_stream_features ?? []; + let indexTemplates: IndexTemplateEntry[] = []; - const preparedIngestPipelines = prepareToInstallPipelines(packageInfo, paths); - const preparedIndexTemplates = prepareToInstallTemplates( - packageInfo, - paths, - esReferences, - experimentalDataStreamFeatures - ); + if (packageInfo.type === 'integration') { + const { installedTemplates, esReferences: templateEsReferences } = + await installIndexTemplatesAndPipelines({ + installedPkg: installedPkg ? installedPkg.attributes : undefined, + packageInfo, + paths, + esClient, + savedObjectsClient, + logger, + esReferences, + }); + esReferences = templateEsReferences; + indexTemplates = installedTemplates; + } - // Update the references for the templates and ingest pipelines together. Need to be done togther to avoid race - // conditions on updating the installed_es field at the same time - // These must be saved before we actually attempt to install the templates or pipelines so that we know what to - // cleanup in the case that a single asset fails to install. - esReferences = await updateEsAssetReferences( - savedObjectsClient, - packageInfo.name, - esReferences, - { - assetsToRemove: preparedIndexTemplates.assetsToRemove, - assetsToAdd: [ - ...preparedIngestPipelines.assetsToAdd, - ...preparedIndexTemplates.assetsToAdd, - ], - } - ); + if (packageInfo.type === 'input' && installedPkg) { + // input packages create their data streams during package policy creation + // we must use installed_es to infer which streams exist first then + // we can install the new index templates + const dataStreamNames = installedPkg.attributes.installed_es + .filter((ref) => ref.type === 'index_template') + // index templates are named {type}-{dataset}, remove everything before first hyphen + .map((ref) => ref.id.replace(/^[^-]+-/, '')); - // Install index templates and ingest pipelines in parallel since they typically take the longest - const [installedTemplates] = await Promise.all([ - withPackageSpan('Install index templates', () => - preparedIndexTemplates.install(esClient, logger) - ), - // installs versionized pipelines without removing currently installed ones - withPackageSpan('Install ingest pipelines', () => - preparedIngestPipelines.install(esClient, logger) - ), - ]); + const dataStreams = dataStreamNames.flatMap((dataStreamName) => + getNormalizedDataStreams(packageInfo, dataStreamName) + ); + + if (dataStreams.length) { + const { installedTemplates, esReferences: templateEsReferences } = + await installIndexTemplatesAndPipelines({ + installedPkg: installedPkg ? installedPkg.attributes : undefined, + packageInfo, + paths, + esClient, + savedObjectsClient, + logger, + esReferences, + onlyForDataStreams: dataStreams, + }); + esReferences = templateEsReferences; + indexTemplates = installedTemplates; + } + } try { await removeLegacyTemplates({ packageInfo, esClient, logger }); @@ -236,7 +235,7 @@ export async function _installPackage({ // update current backing indices of each data stream await withPackageSpan('Update write indices', () => - updateCurrentWriteIndices(esClient, logger, installedTemplates) + updateCurrentWriteIndices(esClient, logger, indexTemplates) ); ({ esReferences } = await withPackageSpan('Install transforms', () => diff --git a/x-pack/plugins/fleet/server/services/epm/packages/assets.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/assets.test.ts index b019729b65eb1..1ed525a71b14f 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/assets.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/assets.test.ts @@ -18,52 +18,88 @@ jest.mock('../archive/cache', () => { }); const mockedGetArchiveFilelist = getArchiveFilelist as jest.Mock; -mockedGetArchiveFilelist.mockImplementation(() => [ - 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-plaintext.json', - 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-json.json', -]); -const tests = [ - { - package: { - name: 'coredns', - version: '1.0.1', +test('testGetAssets integration pkg', () => { + const tests = [ + { + package: { + name: 'coredns', + version: '1.0.1', + }, + dataset: 'log', + filter: (path: string) => { + return true; + }, + expected: [ + 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-plaintext.json', + 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-json.json', + ], }, - dataset: 'log', - filter: (path: string) => { - return true; + { + package: { + name: 'coredns', + version: '1.0.1', + }, + // Non existent dataset + dataset: 'foo', + filter: (path: string) => { + return true; + }, + expected: [], }, - expected: [ - 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-plaintext.json', - 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-json.json', - ], - }, - { - package: { - name: 'coredns', - version: '1.0.1', + { + package: { + name: 'coredns', + version: '1.0.1', + }, + // Filter which does not exist + filter: (path: string) => { + return path.includes('foo'); + }, + expected: [], }, - // Non existent dataset - dataset: 'foo', - filter: (path: string) => { - return true; - }, - expected: [], - }, - { - package: { - name: 'coredns', - version: '1.0.1', - }, - // Filter which does not exist - filter: (path: string) => { - return path.includes('foo'); + ]; + + mockedGetArchiveFilelist.mockImplementation(() => [ + 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-plaintext.json', + 'coredns-1.0.1/data_stream/log/elasticsearch/ingest-pipeline/pipeline-json.json', + ]); + for (const value of tests) { + // as needed to pretend it is an InstallablePackage + const assets = getAssets(value.package as PackageInfo, value.filter, value.dataset); + expect(assets).toStrictEqual(value.expected); + } +}); + +test('testGetAssets input pkg', () => { + mockedGetArchiveFilelist.mockImplementation(() => [ + 'input_package_upgrade-1.0.0/agent/input/input.yml.hbs', + 'input_package_upgrade-1.0.0/changelog.yml', + 'input_package_upgrade-1.0.0/docs/README.md', + 'input_package_upgrade-1.0.0/fields/input.yml', + 'input_package_upgrade-1.0.0/img/sample-logo.svg', + 'input_package_upgrade-1.0.0/img/sample-screenshot.png', + 'input_package_upgrade-1.0.0/manifest.yml', + ]); + + const tests = [ + { + package: { + name: 'input_package_upgrade', + version: '1.0.0', + type: 'input', + }, + dataset: 'log', + filter: (path: string) => { + return true; + }, + expected: [ + 'input_package_upgrade-1.0.0/agent/input/input.yml.hbs', + 'input_package_upgrade-1.0.0/fields/input.yml', + ], }, - expected: [], - }, -]; + ]; -test('testGetAssets', () => { for (const value of tests) { // as needed to pretend it is an InstallablePackage const assets = getAssets(value.package as PackageInfo, value.filter, value.dataset); diff --git a/x-pack/plugins/fleet/server/services/epm/packages/assets.ts b/x-pack/plugins/fleet/server/services/epm/packages/assets.ts index 915b68e1173d3..89bd9b25af75a 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/assets.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/assets.ts @@ -9,47 +9,41 @@ import type { PackageInfo } from '../../../types'; import { getArchiveFilelist, getAsset } from '../archive'; import type { ArchiveEntry } from '../archive'; +const maybeFilterByDataset = + (packageInfo: Pick, datasetName: string) => + (path: string): boolean => { + const basePath = `${packageInfo.name}-${packageInfo.version}`; + const comparePaths = + packageInfo?.type === 'input' + ? [`${basePath}/agent/input/`, `${basePath}/fields/`] + : [`${basePath}/data_stream/${datasetName}/`]; + + return comparePaths.some((comparePath) => path.includes(comparePath)); + }; + // paths from RegistryPackage are routes to the assets on EPR // e.g. `/package/nginx/1.2.0/data_stream/access/fields/fields.yml` // paths for ArchiveEntry are routes to the assets in the archive // e.g. `nginx-1.2.0/data_stream/access/fields/fields.yml` // RegistryPackage paths have a `/package/` prefix compared to ArchiveEntry paths // and different package and version structure - export function getAssets( packageInfo: Pick, filter = (path: string): boolean => true, datasetName?: string ): string[] { - const assets: string[] = []; - const { name, version } = packageInfo; - const paths = getArchiveFilelist({ name, version }); - // TODO: might be better to throw a PackageCacheError here - if (!paths || paths.length === 0) return assets; + const paths = getArchiveFilelist(packageInfo); - // Skip directories - for (const path of paths) { - if (path.endsWith('/')) { - continue; - } + if (!paths || paths.length === 0) return []; - // if dataset, filter for them - if (datasetName) { - const comparePath = - packageInfo?.type === 'input' - ? `${packageInfo.name}-${packageInfo.version}/agent/input/` - : `${packageInfo.name}-${packageInfo.version}/data_stream/${datasetName}/`; - if (!path.includes(comparePath)) { - continue; - } - } - if (!filter(path)) { - continue; - } + // filter out directories + let assets: string[] = paths.filter((path) => !path.endsWith('/')); - assets.push(path); + if (datasetName) { + assets = paths.filter(maybeFilterByDataset(packageInfo, datasetName)); } - return assets; + + return assets.filter(filter); } export function getAssetsData( diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index e223fa490f6d7..1c5f18c4a45e8 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -13,13 +13,14 @@ import type { ElasticsearchClient, SavedObject, SavedObjectsClientContract, + Logger, } from '@kbn/core/server'; import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants'; import pRetry from 'p-retry'; -import { isPackagePrerelease } from '../../../../common/services'; +import { isPackagePrerelease, getNormalizedDataStreams } from '../../../../common/services'; import { FLEET_INSTALL_FORMAT_VERSION } from '../../../constants/fleet_es_assets'; @@ -35,12 +36,15 @@ import type { InstallSource, InstallType, KibanaAssetType, + NewPackagePolicy, + PackageInfo, PackageVerificationResult, + RegistryDataStream, } from '../../../types'; import { AUTO_UPGRADE_POLICIES_PACKAGES } from '../../../../common/constants'; -import { FleetError, PackageOutdatedError } from '../../../errors'; +import { FleetError, PackageOutdatedError, PackagePolicyValidationError } from '../../../errors'; import { PACKAGES_SAVED_OBJECT_TYPE, MAX_TIME_COMPLETE_INSTALL } from '../../../constants'; -import { licenseService } from '../..'; +import { dataStreamService, licenseService } from '../..'; import { appContextService } from '../../app_context'; import * as Registry from '../registry'; import { @@ -48,6 +52,7 @@ import { generatePackageInfoFromArchiveBuffer, unpackBufferToCache, deleteVerificationResult, + getArchiveFilelist, } from '../archive'; import { toAssetReference } from '../kibana/assets/install'; import type { ArchiveAsset } from '../kibana/assets/install'; @@ -55,6 +60,10 @@ import type { ArchiveAsset } from '../kibana/assets/install'; import type { PackageUpdateEvent } from '../../upgrade_sender'; import { sendTelemetryEvents, UpdateEventType } from '../../upgrade_sender'; +import { prepareToInstallPipelines } from '../elasticsearch/ingest_pipeline'; + +import { prepareToInstallTemplates } from '../elasticsearch/template/install'; + import { formatVerificationResultForSO } from './package_verification'; import { getInstallation, getInstallationObject } from '.'; @@ -63,6 +72,7 @@ import { getPackageSavedObjects } from './get'; import { _installPackage } from './_install_package'; import { removeOldAssets } from './cleanup'; import { getBundledPackages } from './bundled_packages'; +import { withPackageSpan } from './utils'; export async function isPackageInstalled(options: { savedObjectsClient: SavedObjectsClientContract; @@ -843,6 +853,165 @@ export async function ensurePackagesCompletedInstall( return installingPackages; } +export async function installIndexTemplatesAndPipelines({ + installedPkg, + paths, + packageInfo, + esReferences, + savedObjectsClient, + esClient, + logger, + onlyForDataStreams, +}: { + installedPkg?: Installation; + paths: string[]; + packageInfo: PackageInfo | InstallablePackage; + esReferences: EsAssetReference[]; + savedObjectsClient: SavedObjectsClientContract; + esClient: ElasticsearchClient; + logger: Logger; + onlyForDataStreams?: RegistryDataStream[]; +}) { + /** + * In order to install assets in parallel, we need to split the preparation step from the installation step. This + * allows us to know which asset references are going to be installed so that we can save them on the packages + * SO before installation begins. In the case of a failure during installing any individual asset, we'll have the + * references necessary to remove any assets in that were successfully installed during the rollback phase. + * + * This split of prepare/install could be extended to all asset types. Besides performance, it also allows us to + * more easily write unit tests against the asset generation code without needing to mock ES responses. + */ + const experimentalDataStreamFeatures = installedPkg?.experimental_data_stream_features ?? []; + + const preparedIngestPipelines = prepareToInstallPipelines(packageInfo, paths, onlyForDataStreams); + const preparedIndexTemplates = prepareToInstallTemplates( + packageInfo, + paths, + esReferences, + experimentalDataStreamFeatures, + onlyForDataStreams + ); + + // Update the references for the templates and ingest pipelines together. Need to be done together to avoid race + // conditions on updating the installed_es field at the same time + // These must be saved before we actually attempt to install the templates or pipelines so that we know what to + // cleanup in the case that a single asset fails to install. + const newEsReferences = await updateEsAssetReferences( + savedObjectsClient, + packageInfo.name, + esReferences, + { + assetsToRemove: onlyForDataStreams ? [] : preparedIndexTemplates.assetsToRemove, + assetsToAdd: [...preparedIngestPipelines.assetsToAdd, ...preparedIndexTemplates.assetsToAdd], + } + ); + + // Install index templates and ingest pipelines in parallel since they typically take the longest + const [installedTemplates] = await Promise.all([ + withPackageSpan('Install index templates', () => + preparedIndexTemplates.install(esClient, logger) + ), + // installs versionized pipelines without removing currently installed ones + withPackageSpan('Install ingest pipelines', () => + preparedIngestPipelines.install(esClient, logger) + ), + ]); + + return { + esReferences: newEsReferences, + installedTemplates, + }; +} + +export async function installAssetsForInputPackagePolicy(opts: { + pkgInfo: PackageInfo; + logger: Logger; + packagePolicy: NewPackagePolicy; + esClient: ElasticsearchClient; + soClient: SavedObjectsClientContract; + force: boolean; +}) { + const { pkgInfo, logger, packagePolicy, esClient, soClient, force } = opts; + + if (pkgInfo.type !== 'input') return; + + const paths = await getArchiveFilelist(pkgInfo); + if (!paths) throw new Error('No paths found for '); + + const datasetName = packagePolicy.inputs[0].streams[0].vars?.['data_stream.dataset']?.value; + const [dataStream] = getNormalizedDataStreams(pkgInfo, datasetName); + const existingDataStreams = await dataStreamService.getMatchingDataStreams(esClient, { + type: dataStream.type, + dataset: datasetName, + }); + + if (existingDataStreams.length) { + const existingDataStreamsAreFromDifferentPackage = existingDataStreams.some( + (ds) => ds._meta?.package?.name !== pkgInfo.name + ); + if (existingDataStreamsAreFromDifferentPackage && !force) { + // user has opted to send data to an existing data stream which is managed by another + // package. This means certain custom setting such as elasticsearch settings + // defined by the package will not have been applied which could lead + // to unforeseen circumstances, so force flag must be used. + const streamIndexPattern = dataStreamService.streamPartsToIndexPattern({ + type: dataStream.type, + dataset: datasetName, + }); + + throw new PackagePolicyValidationError( + `Datastreams matching "${streamIndexPattern}" already exist and are not managed by this package, force flag is required` + ); + } else { + logger.info( + `Data stream ${dataStream.name} already exists, skipping index template creation for ${packagePolicy.id}` + ); + return; + } + } + + const existingIndexTemplate = await dataStreamService.getMatchingIndexTemplate(esClient, { + type: dataStream.type, + dataset: datasetName, + }); + + if (existingIndexTemplate) { + const indexTemplateOwnnedByDifferentPackage = + existingIndexTemplate._meta?.package?.name !== pkgInfo.name; + if (indexTemplateOwnnedByDifferentPackage && !force) { + // index template already exists but there is no data stream yet + // we do not want to override the index template + + throw new PackagePolicyValidationError( + `Index template "${dataStream.type}-${datasetName}" already exist and is not managed by this package, force flag is required` + ); + } else { + logger.info( + `Index template "${dataStream.type}-${datasetName}" already exists, skipping index template creation for ${packagePolicy.id}` + ); + return; + } + } + + const installedPkg = await getInstallation({ + savedObjectsClient: soClient, + pkgName: pkgInfo.name, + logger, + }); + if (!installedPkg) + throw new Error('Unable to find installed package while creating index templates'); + await installIndexTemplatesAndPipelines({ + installedPkg, + paths, + packageInfo: pkgInfo, + esReferences: installedPkg.installed_es || [], + savedObjectsClient: soClient, + esClient, + logger, + onlyForDataStreams: [dataStream], + }); +} + interface NoPkgArgs { pkgVersion: string; installedPkg?: undefined; diff --git a/x-pack/plugins/fleet/server/services/index.ts b/x-pack/plugins/fleet/server/services/index.ts index a5078471d934d..cfd8eb95ceb4d 100644 --- a/x-pack/plugins/fleet/server/services/index.ts +++ b/x-pack/plugins/fleet/server/services/index.ts @@ -45,6 +45,7 @@ export { packagePolicyService } from './package_policy'; export { outputService } from './output'; export { downloadSourceService } from './download_source'; export { settingsService }; +export { dataStreamService } from './data_streams'; // Plugin services export { appContextService } from './app_context'; diff --git a/x-pack/plugins/fleet/server/services/package_policy.ts b/x-pack/plugins/fleet/server/services/package_policy.ts index 73fda5ac50aff..4e25f7343f372 100644 --- a/x-pack/plugins/fleet/server/services/package_policy.ts +++ b/x-pack/plugins/fleet/server/services/package_policy.ts @@ -101,6 +101,7 @@ import { sendTelemetryEvents } from './upgrade_sender'; import { handleExperimentalDatastreamFeatureOptIn } from './package_policies'; import { updateDatastreamExperimentalFeatures } from './epm/packages/update'; import type { PackagePolicyClient, PackagePolicyService } from './package_policy_service'; +import { installAssetsForInputPackagePolicy } from './epm/packages/install'; export type InputsOverride = Partial & { vars?: Array; @@ -134,6 +135,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { packageInfo?: PackageInfo; } ): Promise { + const logger = appContextService.getLogger(); const agentPolicy = await agentPolicyService.get(soClient, packagePolicy.policy_id, true); if (agentPolicy && packagePolicy.package?.name === FLEET_APM_PACKAGE) { @@ -160,7 +162,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { } } - let elasticsearch: PackagePolicy['elasticsearch']; + let elasticsearchPrivileges: NonNullable['privileges']; // Add ids to stream const packagePolicyId = options?.id || uuid.v4(); let inputs: PackagePolicyInput[] = packagePolicy.inputs.map((input) => @@ -205,7 +207,18 @@ class PackagePolicyClientImpl implements PackagePolicyClient { inputs = await _compilePackagePolicyInputs(pkgInfo, packagePolicy.vars || {}, inputs); - elasticsearch = pkgInfo.elasticsearch; + elasticsearchPrivileges = pkgInfo.elasticsearch?.privileges; + + if (pkgInfo.type === 'input') { + await installAssetsForInputPackagePolicy({ + soClient, + esClient, + pkgInfo, + packagePolicy, + force: !!options?.force, + logger, + }); + } } const isoDate = new Date().toISOString(); @@ -217,7 +230,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { ? { package: omit(packagePolicy.package, 'experimental_data_stream_features') } : {}), inputs, - elasticsearch, + ...(elasticsearchPrivileges && { elasticsearch: { privileges: elasticsearchPrivileges } }), revision: 1, created_at: isoDate, created_by: options?.user?.username ?? 'system', @@ -511,7 +524,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { ); inputs = enforceFrozenInputs(oldPackagePolicy.inputs, inputs, options?.force); - let elasticsearch: PackagePolicy['elasticsearch']; + let elasticsearchPrivileges: NonNullable['privileges']; if (packagePolicy.package?.name) { const pkgInfo = await getPackageInfo({ savedObjectsClient: soClient, @@ -527,7 +540,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { validatePackagePolicyOrThrow(packagePolicy, pkgInfo); inputs = await _compilePackagePolicyInputs(pkgInfo, packagePolicy.vars || {}, inputs); - elasticsearch = pkgInfo.elasticsearch; + elasticsearchPrivileges = pkgInfo.elasticsearch?.privileges; } // Handle component template/mappings updates for experimental features, e.g. synthetic source @@ -542,7 +555,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { ? { package: omit(restOfPackagePolicy.package, 'experimental_data_stream_features') } : {}), inputs, - elasticsearch, + ...(elasticsearchPrivileges && { elasticsearch: { privileges: elasticsearchPrivileges } }), revision: oldPackagePolicy.revision + 1, updated_at: new Date().toISOString(), updated_by: options?.user?.username ?? 'system', @@ -614,7 +627,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { ); inputs = enforceFrozenInputs(oldPackagePolicy.inputs, inputs, options?.force); - let elasticsearch: PackagePolicy['elasticsearch']; + let elasticsearchPrivileges: NonNullable['privileges']; if (packagePolicy.package?.name) { const pkgInfo = packageInfos.get( `${packagePolicy.package.name}-${packagePolicy.package.version}` @@ -623,7 +636,7 @@ class PackagePolicyClientImpl implements PackagePolicyClient { validatePackagePolicyOrThrow(packagePolicy, pkgInfo); inputs = await _compilePackagePolicyInputs(pkgInfo, packagePolicy.vars || {}, inputs); - elasticsearch = pkgInfo.elasticsearch; + elasticsearchPrivileges = pkgInfo.elasticsearch?.privileges; } } @@ -639,7 +652,9 @@ class PackagePolicyClientImpl implements PackagePolicyClient { ? { package: omit(restOfPackagePolicy.package, 'experimental_data_stream_features') } : {}), inputs, - elasticsearch, + ...(elasticsearchPrivileges && { + elasticsearch: { privileges: elasticsearchPrivileges }, + }), revision: oldPackagePolicy.revision + 1, updated_at: new Date().toISOString(), updated_by: options?.user?.username ?? 'system', @@ -1985,7 +2000,8 @@ export function _validateRestrictedFieldsNotModifiedOrThrow(opts: { if ( oldStream && oldStream?.vars?.['data_stream.dataset'] && - oldStream?.vars['data_stream.dataset'] !== stream?.vars?.['data_stream.dataset'] + oldStream?.vars['data_stream.dataset']?.value !== + stream?.vars?.['data_stream.dataset']?.value ) { throw new PackagePolicyValidationError( i18n.translate('xpack.fleet.updatePackagePolicy.datasetCannotBeModified', { diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/agent/input/input.yml.hbs b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/agent/input/input.yml.hbs new file mode 100644 index 0000000000000..1ba86fa98a2f8 --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/agent/input/input.yml.hbs @@ -0,0 +1,18 @@ +paths: +{{#each paths}} + - {{this}} +{{/each}} + +{{#if tags}} +tags: +{{#each tags as |tag i|}} + - {{tag}} +{{/each}} +{{/if}} + +{{#if pipeline}} +pipeline: {{pipeline}} +{{/if}} + +data_stream: + dataset: {{data_stream.dataset}} \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/changelog.yml b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/changelog.yml new file mode 100644 index 0000000000000..20fdaba3bd1c3 --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/changelog.yml @@ -0,0 +1,6 @@ +# newer versions go on top +- version: "1.2.0-beta" + changes: + - description: Initial draft of the package + type: enhancement + link: https://github.com/elastic/package-spec/pull/325 diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/docs/README.md b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/docs/README.md new file mode 100644 index 0000000000000..9f29c89e0f5ef --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/docs/README.md @@ -0,0 +1 @@ +# Custom Logs \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/fields/input.yml b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/fields/input.yml new file mode 100644 index 0000000000000..f5851c64b6b3a --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/fields/input.yml @@ -0,0 +1,4 @@ +- name: input.name + type: constant_keyword + description: Sample field to be added. + value: logs \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/img/sample-logo.svg b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/img/sample-logo.svg new file mode 100644 index 0000000000000..6268dd88f3b3d --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/img/sample-logo.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/img/sample-screenshot.png b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/img/sample-screenshot.png new file mode 100644 index 0000000000000..d7a56a3ecc078 Binary files /dev/null and b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/img/sample-screenshot.png differ diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/manifest.yml b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/manifest.yml new file mode 100644 index 0000000000000..5c0811e80e65c --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.0.0/manifest.yml @@ -0,0 +1,45 @@ +format_version: 1.0.0 +name: input_package_upgrade +title: Custom Logs +description: >- + A version of the custom_logs input pkg designed to test upgrade behaviour +type: input +version: 1.0.0 +license: basic +categories: + - custom +policy_templates: + - name: logs + type: logs + title: Custom log file + description: Collect your custom log files. + input: logfile + template_path: input.yml.hbs + vars: + - name: paths + type: text + title: Paths + multi: true + required: true + show_user: true + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + - name: ignore_older + type: text + title: Ignore events older than + required: false + default: 72h +icons: + - src: "/img/sample-logo.svg" + type: "image/svg+xml" +screenshots: + - src: "/img/sample-screenshot.png" + title: "Sample screenshot" + size: "600x600" + type: "image/png" +owner: + github: elastic/integrations \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/agent/input/input.yml.hbs b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/agent/input/input.yml.hbs new file mode 100644 index 0000000000000..1ba86fa98a2f8 --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/agent/input/input.yml.hbs @@ -0,0 +1,18 @@ +paths: +{{#each paths}} + - {{this}} +{{/each}} + +{{#if tags}} +tags: +{{#each tags as |tag i|}} + - {{tag}} +{{/each}} +{{/if}} + +{{#if pipeline}} +pipeline: {{pipeline}} +{{/if}} + +data_stream: + dataset: {{data_stream.dataset}} \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/changelog.yml b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/changelog.yml new file mode 100644 index 0000000000000..53a8cc6ad57bb --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/changelog.yml @@ -0,0 +1,12 @@ +# newer versions go on top +- version: "1.1.0" + changes: + - description: Add elasticsearch key to manifest + type: enhancement + link: https://github.com/elastic/package-spec/pull/325 +# newer versions go on top +- version: "1.0.0" + changes: + - description: Initial draft of the package + type: enhancement + link: https://github.com/elastic/package-spec/pull/325 diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/docs/README.md b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/docs/README.md new file mode 100644 index 0000000000000..9f29c89e0f5ef --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/docs/README.md @@ -0,0 +1 @@ +# Custom Logs \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/fields/input.yml b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/fields/input.yml new file mode 100644 index 0000000000000..f5851c64b6b3a --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/fields/input.yml @@ -0,0 +1,4 @@ +- name: input.name + type: constant_keyword + description: Sample field to be added. + value: logs \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/img/sample-logo.svg b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/img/sample-logo.svg new file mode 100644 index 0000000000000..6268dd88f3b3d --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/img/sample-logo.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/img/sample-screenshot.png b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/img/sample-screenshot.png new file mode 100644 index 0000000000000..d7a56a3ecc078 Binary files /dev/null and b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/img/sample-screenshot.png differ diff --git a/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/manifest.yml b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/manifest.yml new file mode 100644 index 0000000000000..af623137f15cd --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/fixtures/test_packages/input_package_upgrade/1.1.0/manifest.yml @@ -0,0 +1,57 @@ +format_version: 1.0.0 +name: input_package_upgrade +title: Custom Logs +description: >- + A version of the custom_logs input pkg designed to test upgrade behaviour +type: input +version: 1.1.0 +license: basic +categories: + - custom +policy_templates: + - name: logs + type: logs + title: Custom log file + description: Collect your custom log files. + input: logfile + template_path: input.yml.hbs + vars: + - name: paths + type: text + title: Paths + multi: true + required: true + show_user: true + - name: tags + type: text + title: Tags + multi: true + required: true + show_user: false + - name: ignore_older + type: text + title: Ignore events older than + required: false + default: 72h +icons: + - src: "/img/sample-logo.svg" + type: "image/svg+xml" +screenshots: + - src: "/img/sample-screenshot.png" + title: "Sample screenshot" + size: "600x600" + type: "image/png" +owner: + github: elastic/integrations +elasticsearch: + index_template: + mappings: + properties: + '@timestamp': + ignore_malformed: false + type: date + dynamic_templates: + - data_stream_to_constant: + mapping: + type: constant_keyword + path_match: data_stream.* \ No newline at end of file diff --git a/x-pack/test/fleet_api_integration/apis/index.js b/x-pack/test/fleet_api_integration/apis/index.js index 60235bb6fc482..99be7518fd882 100644 --- a/x-pack/test/fleet_api_integration/apis/index.js +++ b/x-pack/test/fleet_api_integration/apis/index.js @@ -31,6 +31,7 @@ export default function ({ loadTestFile, getService }) { loadTestFile(require.resolve('./package_policy/get')); loadTestFile(require.resolve('./package_policy/delete')); loadTestFile(require.resolve('./package_policy/upgrade')); + loadTestFile(require.resolve('./package_policy/input_package_create_upgrade')); // Agent policies loadTestFile(require.resolve('./agent_policy')); diff --git a/x-pack/test/fleet_api_integration/apis/package_policy/input_package_create_upgrade.ts b/x-pack/test/fleet_api_integration/apis/package_policy/input_package_create_upgrade.ts new file mode 100644 index 0000000000000..5765fc3832099 --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/package_policy/input_package_create_upgrade.ts @@ -0,0 +1,388 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import expect from '@kbn/expect'; +import { sortBy } from 'lodash'; +import { FtrProviderContext } from '../../../api_integration/ftr_provider_context'; +import { skipIfNoDockerRegistry } from '../../helpers'; +import { setupFleetAndAgents } from '../agents/services'; +const PACKAGE_NAME = 'input_package_upgrade'; +const START_VERSION = '1.0.0'; +const UPGRADE_VERSION = '1.1.0'; + +const expectIdArraysEqual = (arr1: any[], arr2: any[]) => { + expect(sortBy(arr1, 'id')).to.eql(sortBy(arr2, 'id')); +}; +export default function (providerContext: FtrProviderContext) { + const { getService } = providerContext; + const supertest = getService('supertest'); + const es = getService('es'); + const uninstallPackage = async (name: string, version: string) => { + await supertest.delete(`/api/fleet/epm/packages/${name}/${version}`).set('kbn-xsrf', 'xxxx'); + }; + + const installPackage = async (name: string, version: string) => { + return await supertest + .post(`/api/fleet/epm/packages/${name}/${version}`) + .set('kbn-xsrf', 'xxxx') + .send({ force: true }) + .expect(200); + }; + + const getInstallationSavedObject = async (name: string, version: string) => { + const res = await supertest.get(`/api/fleet/epm/packages/${name}-${version}`).expect(200); + return res.body.item.savedObject.attributes; + }; + + const createPackagePolicyWithDataset = async ( + agentPolicyId: string, + dataset: string, + expectStatusCode = 200, + force = false + ) => { + const policy = { + force, + policy_id: agentPolicyId, + package: { + name: PACKAGE_NAME, + version: START_VERSION, + }, + name: 'test-policy-' + dataset, + description: '', + namespace: 'default', + inputs: { + 'logs-logfile': { + enabled: true, + streams: { + 'input_package_upgrade.logs': { + enabled: true, + vars: { + paths: ['/tmp/test/log'], + tags: ['tag1'], + ignore_older: '72h', + 'data_stream.dataset': dataset, + }, + }, + }, + }, + }, + }; + const res = await supertest + .post(`/api/fleet/package_policies`) + .set('kbn-xsrf', 'xxxx') + .send(policy) + .expect(expectStatusCode); + + return res.body.item; + }; + + const deletePackagePolicy = (id: string) => { + return supertest + .delete(`/api/fleet/package_policies/${id}`) + .set('kbn-xsrf', 'xxxx') + .expect(200); + }; + + const createAgentPolicy = async (name = 'Input Package Test 3') => { + const res = await supertest + .post(`/api/fleet/agent_policies`) + .set('kbn-xsrf', 'xxxx') + .send({ + name, + namespace: 'default', + }) + .expect(200); + return res.body.item; + }; + + const deleteAgentPolicy = async (agentPolicyId: string) => { + if (!agentPolicyId) return; + return supertest + .post(`/api/fleet/agent_policies/delete`) + .set('kbn-xsrf', 'xxxx') + .send({ agentPolicyId }); + }; + + const getComponentTemplate = async (name: string) => { + try { + const { component_templates: templates } = await es.cluster.getComponentTemplate({ name }); + + return templates?.[0] || null; + } catch (e) { + if (e.statusCode === 404) { + return null; + } + + throw e; + } + }; + + const getIndexTemplate = async (name: string) => { + try { + const { index_templates: templates } = await es.indices.getIndexTemplate({ name }); + + return templates?.[0]?.index_template || null; + } catch (e) { + if (e.statusCode === 404) { + return null; + } + } + }; + + const createFakeFleetIndexTemplate = async (dataset: string, pkgName = PACKAGE_NAME) => { + const templateName = `logs-${dataset}`; + const template = { + name: templateName, + index_patterns: [`${templateName}-*`], + priority: 200, + _meta: { + package: { + name: pkgName, + }, + managed_by: 'fleet', + managed: true, + }, + data_stream: {}, + template: { + settings: { + number_of_shards: 1, + }, + mappings: { + properties: {}, + }, + }, + }; + await es.indices.putIndexTemplate(template); + }; + + const createFakeFleetDataStream = async (dataset: string, pkgName = PACKAGE_NAME) => { + const indexName = `logs-${dataset}-default`; + await createFakeFleetIndexTemplate(dataset, pkgName); + + await es.index({ + index: indexName, + body: { + '@timestamp': new Date().toISOString(), + message: 'test', + }, + }); + }; + + const deleteDataStream = async (templateName: string) => { + await es.indices.deleteDataStream({ name: templateName + '-default' }); + await deleteIndexTemplate(templateName); + }; + + const deleteIndexTemplate = async (templateName: string) => { + await es.indices.deleteIndexTemplate({ name: templateName }); + }; + + describe('Package Policy - input package behavior', async function () { + skipIfNoDockerRegistry(providerContext); + + let agentPolicyId: string; + const packagePolicyIds: string[] = []; + before(async () => { + installPackage(PACKAGE_NAME, START_VERSION); + const agentPolicy = await createAgentPolicy(); + agentPolicyId = agentPolicy.id; + }); + + after(async () => { + await deleteAgentPolicy(agentPolicyId); + }); + setupFleetAndAgents(providerContext); + + it('should not have created any ES assets on install', async () => { + const installation = await getInstallationSavedObject(PACKAGE_NAME, START_VERSION); + expect(installation.installed_es).to.eql([]); + }); + + it('should create index templates and update installed_es on package policy creation', async () => { + const packagePolicy = await createPackagePolicyWithDataset(agentPolicyId, 'dataset1'); + packagePolicyIds.push(packagePolicy.id); + const installation = await getInstallationSavedObject(PACKAGE_NAME, START_VERSION); + expectIdArraysEqual(installation.installed_es, [ + { id: 'logs-dataset1-1.0.0', type: 'ingest_pipeline' }, + { id: 'logs-dataset1', type: 'index_template' }, + { id: 'logs-dataset1@package', type: 'component_template' }, + { id: 'logs-dataset1@custom', type: 'component_template' }, + ]); + + // now check the package component template was created correctly + const packageComponentTemplate = await getComponentTemplate('logs-dataset1@package'); + expect(packageComponentTemplate).eql({ + name: 'logs-dataset1@package', + component_template: { + template: { + settings: { + index: { + lifecycle: { name: 'logs' }, + codec: 'best_compression', + default_pipeline: 'logs-dataset1-1.0.0', + mapping: { total_fields: { limit: '10000' } }, + }, + }, + mappings: { + properties: { + input: { + properties: { + name: { + type: 'constant_keyword', + value: 'logs', + }, + }, + }, + }, + }, + }, + _meta: { + package: { name: 'input_package_upgrade' }, + managed_by: 'fleet', + managed: true, + }, + }, + }); + }); + + it('should create index templates and update installed_es on second package policy creation', async () => { + const packagePolicy = await createPackagePolicyWithDataset(agentPolicyId, 'dataset2'); + packagePolicyIds.push(packagePolicy.id); + const installation = await getInstallationSavedObject(PACKAGE_NAME, START_VERSION); + expectIdArraysEqual(installation.installed_es, [ + { id: 'logs-dataset1-1.0.0', type: 'ingest_pipeline' }, + { id: 'logs-dataset1', type: 'index_template' }, + { id: 'logs-dataset1@package', type: 'component_template' }, + { id: 'logs-dataset1@custom', type: 'component_template' }, + { id: 'logs-dataset2-1.0.0', type: 'ingest_pipeline' }, + { id: 'logs-dataset2', type: 'index_template' }, + { id: 'logs-dataset2@package', type: 'component_template' }, + { id: 'logs-dataset2@custom', type: 'component_template' }, + ]); + }); + + it('should allow data to be sent to existing stream if owned by package and should not create templates', async () => { + await createFakeFleetDataStream('dataset3'); + + const packagePolicy = await createPackagePolicyWithDataset(agentPolicyId, 'dataset3'); + packagePolicyIds.push(packagePolicy.id); + const installation = await getInstallationSavedObject(PACKAGE_NAME, START_VERSION); + expectIdArraysEqual(installation.installed_es, [ + { id: 'logs-dataset1-1.0.0', type: 'ingest_pipeline' }, + { id: 'logs-dataset1', type: 'index_template' }, + { id: 'logs-dataset1@package', type: 'component_template' }, + { id: 'logs-dataset1@custom', type: 'component_template' }, + { id: 'logs-dataset2-1.0.0', type: 'ingest_pipeline' }, + { id: 'logs-dataset2', type: 'index_template' }, + { id: 'logs-dataset2@package', type: 'component_template' }, + { id: 'logs-dataset2@custom', type: 'component_template' }, + ]); + + const dataset3PkgComponentTemplate = await getComponentTemplate('logs-dataset3@package'); + expect(dataset3PkgComponentTemplate).eql(null); + + await deleteDataStream('logs-dataset3'); + }); + + it('should not allow data to be sent to existing stream if not owned by package', async () => { + await createFakeFleetDataStream('dataset4', 'different_package'); + + const expectedStatusCode = 400; + await createPackagePolicyWithDataset(agentPolicyId, 'dataset4', expectedStatusCode); + + const dataset4PkgComponentTemplate = await getComponentTemplate('logs-dataset4@package'); + expect(dataset4PkgComponentTemplate).eql(null); + + await deleteDataStream('logs-dataset4'); + }); + + it('should not allow existing index template to be overwritten if not owned by package', async () => { + await createFakeFleetIndexTemplate('dataset4', 'different_package'); + + const expectedStatusCode = 400; + await createPackagePolicyWithDataset(agentPolicyId, 'dataset4', expectedStatusCode); + + const dataset4PkgComponentTemplate = await getComponentTemplate('logs-dataset4@package'); + expect(dataset4PkgComponentTemplate).eql(null); + + await deleteIndexTemplate('logs-dataset4'); + }); + + it('should allow data to be sent to existing stream if not owned by package if force flag provided', async () => { + await createFakeFleetDataStream('dataset5', 'different_package'); + + const expectedStatusCode = 200; + const force = true; + await createPackagePolicyWithDataset(agentPolicyId, 'dataset5', expectedStatusCode, force); + + const dataset5PkgComponentTemplate = await getComponentTemplate('logs-dataset5@package'); + expect(dataset5PkgComponentTemplate).eql(null); + + const dataset5IndexTemplate = await getIndexTemplate('logs-dataset5'); + expect(dataset5IndexTemplate?._meta?.package?.name).eql('different_package'); + + await deleteDataStream('logs-dataset5'); + }); + + it('should not override existing index template with no associated streams ', async () => { + await createFakeFleetIndexTemplate('dataset6', 'different_package'); + + const expectedStatusCode = 200; + const force = true; + await createPackagePolicyWithDataset(agentPolicyId, 'dataset6', expectedStatusCode, force); + + const dataset6PkgComponentTemplate = await getComponentTemplate('logs-dataset6@package'); + expect(dataset6PkgComponentTemplate).eql(null); + + const dataset6IndexTemplate = await getIndexTemplate('logs-dataset6'); + expect(dataset6IndexTemplate?._meta?.package?.name).eql('different_package'); + + await deleteIndexTemplate('logs-dataset6'); + }); + + it('should update all index templates created by package policies when the package is upgraded', async () => { + // version 1.1.0 of the test package introduces elasticsearch mappings to the index + // templates, upgrading the package should add this field to both package component templates + await installPackage(PACKAGE_NAME, UPGRADE_VERSION); + + const dataset1PkgComponentTemplate = await getComponentTemplate('logs-dataset1@package'); + expect(dataset1PkgComponentTemplate).not.eql(null); + const mappingsWithTimestamp = { + '@timestamp': { ignore_malformed: false, type: 'date' }, + input: { + properties: { + name: { + type: 'constant_keyword', + value: 'logs', + }, + }, + }, + }; + expect(dataset1PkgComponentTemplate!.component_template.template?.mappings?.properties).eql( + mappingsWithTimestamp + ); + + const dataset2PkgComponentTemplate = await getComponentTemplate('logs-dataset2@package'); + expect(dataset2PkgComponentTemplate).not.eql(null); + expect(dataset2PkgComponentTemplate!.component_template.template?.mappings?.properties).eql( + mappingsWithTimestamp + ); + }); + it('should delete all index templates created by package policies when the package is uninstalled', async () => { + for (const packagePolicyId of packagePolicyIds) { + await deletePackagePolicy(packagePolicyId); + } + await deleteAgentPolicy(agentPolicyId); + await uninstallPackage(PACKAGE_NAME, UPGRADE_VERSION); + + const dataset1PkgComponentTemplate = await getComponentTemplate('logs-dataset1@package'); + expect(dataset1PkgComponentTemplate).eql(null); + + const dataset2PkgComponentTemplate = await getComponentTemplate('logs-dataset2@package'); + expect(dataset2PkgComponentTemplate).eql(null); + }); + }); +} diff --git a/x-pack/test/fleet_api_integration/apis/package_policy/update.ts b/x-pack/test/fleet_api_integration/apis/package_policy/update.ts index 7f128071d4008..9c4b35030ced3 100644 --- a/x-pack/test/fleet_api_integration/apis/package_policy/update.ts +++ b/x-pack/test/fleet_api_integration/apis/package_policy/update.ts @@ -95,7 +95,7 @@ export default function (providerContext: FtrProviderContext) { paths: { type: 'text', value: ['/tmp/test.log'] }, tags: { type: 'text', value: ['tag1'] }, ignore_older: { value: '72h', type: 'text' }, - 'data_stream.dataset': { type: 'text', value: 'generic' }, + 'data_stream.dataset': { type: 'text', value: 'input_package_test' }, }, }, ],