diff --git a/x-pack/plugins/ingest_manager/common/types/models/epm.ts b/x-pack/plugins/ingest_manager/common/types/models/epm.ts index ea7fd60d1fa3f..2ec9d7be6c882 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/epm.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/epm.ts @@ -44,6 +44,11 @@ export enum ElasticsearchAssetType { transform = 'transform', } +export enum DataType { + logs = 'logs', + metrics = 'metrics', +} + export enum AgentAssetType { input = 'input', } diff --git a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts index e0fea59107c26..8d33180d6262d 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts @@ -11,6 +11,7 @@ import { TemplateRef, IndexTemplate, IndexTemplateMappings, + DataType, } from '../../../../types'; import { getRegistryDataStreamAssetBaseName } from '../index'; @@ -400,13 +401,6 @@ const updateExistingIndex = async ({ delete mappings.properties.stream; delete mappings.properties.data_stream; - // get the data_stream values from the index template to compose data stream name - const indexMappings = await getIndexMappings(indexName, callCluster); - const dataStream = indexMappings[indexName].mappings.properties.data_stream.properties; - if (!dataStream.type.value || !dataStream.dataset.value || !dataStream.namespace.value) - throw new Error(`data_stream values are missing from the index template ${indexName}`); - const dataStreamName = `${dataStream.type.value}-${dataStream.dataset.value}-${dataStream.namespace.value}`; - // try to update the mappings first try { await callCluster('indices.putMapping', { @@ -416,13 +410,54 @@ const updateExistingIndex = async ({ // if update fails, rollover data stream } catch (err) { try { + // get the data_stream values to compose datastream name + const searchDataStreamFieldsResponse = await callCluster('search', { + index: indexTemplate.index_patterns[0], + body: { + size: 1, + _source: ['data_stream.namespace', 'data_stream.type', 'data_stream.dataset'], + query: { + bool: { + filter: [ + { + exists: { + field: 'data_stream.type', + }, + }, + { + exists: { + field: 'data_stream.dataset', + }, + }, + { + exists: { + field: 'data_stream.namespace', + }, + }, + ], + }, + }, + }, + }); + if (searchDataStreamFieldsResponse.hits.total.value === 0) + throw new Error('data_stream fields are missing from datastream indices'); + const { + dataset, + namespace, + type, + }: { + dataset: string; + namespace: string; + type: DataType; + } = searchDataStreamFieldsResponse.hits.hits[0]._source.data_stream; + const dataStreamName = `${type}-${dataset}-${namespace}`; const path = `/${dataStreamName}/_rollover`; await callCluster('transport.request', { method: 'POST', path, }); } catch (error) { - throw new Error(`cannot rollover data stream ${dataStreamName}`); + throw new Error(`cannot rollover data stream ${error}`); } } // update settings after mappings was successful to ensure @@ -438,14 +473,3 @@ const updateExistingIndex = async ({ throw new Error(`could not update index template settings for ${indexName}`); } }; - -const getIndexMappings = async (indexName: string, callCluster: CallESAsCurrentUser) => { - try { - const indexMappings = await callCluster('indices.getMapping', { - index: indexName, - }); - return indexMappings; - } catch (err) { - throw new Error(`could not get mapping from ${indexName}`); - } -}; diff --git a/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts b/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts index 4804701df23a8..4e307e1ac6880 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts @@ -12,7 +12,12 @@ import { import * as Registry from '../../registry'; import { loadFieldsFromYaml, Fields, Field } from '../../fields/field'; import { getPackageKeysByStatus } from '../../packages/get'; -import { InstallationStatus, RegistryPackage, CallESAsCurrentUser } from '../../../../types'; +import { + InstallationStatus, + RegistryPackage, + CallESAsCurrentUser, + DataType, +} from '../../../../types'; import { appContextService } from '../../../../services'; interface FieldFormatMap { @@ -69,10 +74,7 @@ export interface IndexPatternField { lang?: string; readFromDocValues: boolean; } -export enum IndexPatternType { - logs = 'logs', - metrics = 'metrics', -} + // TODO: use a function overload and make pkgName and pkgVersion required for install/update // and not for an update removal. or separate out the functions export async function installIndexPatterns( @@ -117,7 +119,7 @@ export async function installIndexPatterns( const packageVersionsInfo = await Promise.all(packageVersionsFetchInfoPromise); // for each index pattern type, create an index pattern - const indexPatternTypes = [IndexPatternType.logs, IndexPatternType.metrics]; + const indexPatternTypes = [DataType.logs, DataType.metrics]; indexPatternTypes.forEach(async (indexPatternType) => { // if this is an update because a package is being uninstalled (no pkgkey argument passed) and no other packages are installed, remove the index pattern if (!pkgName && installedPackages.length === 0) { @@ -144,7 +146,7 @@ export async function installIndexPatterns( // of all fields from all data streams matching data stream type export const getAllDataStreamFieldsByType = async ( packages: RegistryPackage[], - dataStreamType: IndexPatternType + dataStreamType: DataType ): Promise => { const dataStreamsPromises = packages.reduce>>((acc, pkg) => { if (pkg.data_streams) { @@ -389,7 +391,7 @@ export const ensureDefaultIndices = async (callCluster: CallESAsCurrentUser) => // that no matching indices exist https://github.com/elastic/kibana/issues/62343 const logger = appContextService.getLogger(); return Promise.all( - Object.keys(IndexPatternType).map(async (indexPattern) => { + Object.keys(DataType).map(async (indexPattern) => { const defaultIndexPatternName = indexPattern + INDEX_PATTERN_PLACEHOLDER_SUFFIX; const indexExists = await callCluster('indices.exists', { index: defaultIndexPatternName }); if (!indexExists) { diff --git a/x-pack/plugins/ingest_manager/server/types/index.tsx b/x-pack/plugins/ingest_manager/server/types/index.tsx index 0c070959e3b93..7d841ed024ce5 100644 --- a/x-pack/plugins/ingest_manager/server/types/index.tsx +++ b/x-pack/plugins/ingest_manager/server/types/index.tsx @@ -73,6 +73,7 @@ export { // Agent Request types PostAgentEnrollRequest, PostAgentCheckinRequest, + DataType, } from '../../common'; export type CallESAsCurrentUser = LegacyScopedClusterClient['callAsCurrentUser']; diff --git a/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts b/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts index 5da9b5e3031b2..b9558240ca007 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts @@ -24,15 +24,16 @@ export default function (providerContext: FtrProviderContext) { await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx'); }; const installPackage = async (pkg: string) => { - await supertest + return await supertest .post(`/api/fleet/epm/packages/${pkg}`) .set('kbn-xsrf', 'xxxx') - .send({ force: true }); + .send({ force: true }) + .expect(200); }; describe('datastreams', async () => { skipIfNoDockerRegistry(providerContext); - before(async () => { + beforeEach(async () => { await installPackage(pkgKey); await es.transport.request({ method: 'POST', @@ -61,8 +62,7 @@ export default function (providerContext: FtrProviderContext) { }, }); }); - after(async () => { - await uninstallPackage(pkgUpdateKey); + afterEach(async () => { await es.transport.request({ method: 'DELETE', path: `/_data_stream/${logsTemplateName}-default`, @@ -71,60 +71,57 @@ export default function (providerContext: FtrProviderContext) { method: 'DELETE', path: `/_data_stream/${metricsTemplateName}-default`, }); + await uninstallPackage(pkgKey); + await uninstallPackage(pkgUpdateKey); }); - describe('get datastreams after data sent', async () => { - skipIfNoDockerRegistry(providerContext); - let resLogsDatastream: any; - let resMetricsDatastream: any; - before(async () => { - resLogsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${logsTemplateName}-default`, - }); - resMetricsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${metricsTemplateName}-default`, - }); - }); - it('should list the logs datastream', async function () { - expect(resLogsDatastream.body.data_streams.length).equal(1); - expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1); - expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal( - `.ds-${logsTemplateName}-default-000001` - ); + it('should list the logs and metrics datastream', async function () { + const resLogsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${logsTemplateName}-default`, }); - it('should list the metrics datastream', async function () { - expect(resMetricsDatastream.body.data_streams.length).equal(1); - expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); - expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal( - `.ds-${metricsTemplateName}-default-000001` - ); + const resMetricsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${metricsTemplateName}-default`, }); + expect(resLogsDatastream.body.data_streams.length).equal(1); + expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1); + expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal( + `.ds-${logsTemplateName}-default-000001` + ); + expect(resMetricsDatastream.body.data_streams.length).equal(1); + expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); + expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal( + `.ds-${metricsTemplateName}-default-000001` + ); }); - describe('rollover datastream when mappings are not compatible', async () => { - skipIfNoDockerRegistry(providerContext); - let resLogsDatastream: any; - let resMetricsDatastream: any; - before(async () => { - await installPackage(pkgUpdateKey); - resLogsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${logsTemplateName}-default`, - }); - resMetricsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${metricsTemplateName}-default`, - }); + + it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () { + await installPackage(pkgUpdateKey); + const resLogsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${logsTemplateName}-default`, }); - it('should have rolled over logs datastream', async function () { - expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2); - expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal( - `.ds-${logsTemplateName}-default-000002` - ); + const resMetricsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${metricsTemplateName}-default`, + }); + expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2); + expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal( + `.ds-${logsTemplateName}-default-000002` + ); + expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); + }); + it('should be able to upgrade a package after a rollover', async function () { + await es.transport.request({ + method: 'POST', + path: `/${logsTemplateName}-default/_rollover`, }); - it('should have not rolled over metrics datastream', async function () { - expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); + const resLogsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${logsTemplateName}-default`, }); + expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2); + await installPackage(pkgUpdateKey); }); }); }