Skip to content

Commit

Permalink
[Ingest Manager] Fix package upgrade breaking after first rollover be…
Browse files Browse the repository at this point in the history
…fore new data has arrived (#79887)

* build datastream name from index name

* query for data_stream constants to create data stream name

* simply datastream tests and add a test to upgrade after a datastream rolls over

* improve query

* remove dup
  • Loading branch information
neptunian committed Oct 12, 2020
1 parent 4e68bc7 commit 10bbbc8
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 79 deletions.
5 changes: 5 additions & 0 deletions x-pack/plugins/ingest_manager/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ export enum ElasticsearchAssetType {
transform = 'transform',
}

export enum DataType {
logs = 'logs',
metrics = 'metrics',
}

export enum AgentAssetType {
input = 'input',
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
TemplateRef,
IndexTemplate,
IndexTemplateMappings,
DataType,
} from '../../../../types';
import { getRegistryDataStreamAssetBaseName } from '../index';

Expand Down Expand Up @@ -400,13 +401,6 @@ const updateExistingIndex = async ({
delete mappings.properties.stream;
delete mappings.properties.data_stream;

// get the data_stream values from the index template to compose data stream name
const indexMappings = await getIndexMappings(indexName, callCluster);
const dataStream = indexMappings[indexName].mappings.properties.data_stream.properties;
if (!dataStream.type.value || !dataStream.dataset.value || !dataStream.namespace.value)
throw new Error(`data_stream values are missing from the index template ${indexName}`);
const dataStreamName = `${dataStream.type.value}-${dataStream.dataset.value}-${dataStream.namespace.value}`;

// try to update the mappings first
try {
await callCluster('indices.putMapping', {
Expand All @@ -416,13 +410,54 @@ const updateExistingIndex = async ({
// if update fails, rollover data stream
} catch (err) {
try {
// get the data_stream values to compose datastream name
const searchDataStreamFieldsResponse = await callCluster('search', {
index: indexTemplate.index_patterns[0],
body: {
size: 1,
_source: ['data_stream.namespace', 'data_stream.type', 'data_stream.dataset'],
query: {
bool: {
filter: [
{
exists: {
field: 'data_stream.type',
},
},
{
exists: {
field: 'data_stream.dataset',
},
},
{
exists: {
field: 'data_stream.namespace',
},
},
],
},
},
},
});
if (searchDataStreamFieldsResponse.hits.total.value === 0)
throw new Error('data_stream fields are missing from datastream indices');
const {
dataset,
namespace,
type,
}: {
dataset: string;
namespace: string;
type: DataType;
} = searchDataStreamFieldsResponse.hits.hits[0]._source.data_stream;
const dataStreamName = `${type}-${dataset}-${namespace}`;
const path = `/${dataStreamName}/_rollover`;
await callCluster('transport.request', {
method: 'POST',
path,
});
} catch (error) {
throw new Error(`cannot rollover data stream ${dataStreamName}`);
throw new Error(`cannot rollover data stream ${error}`);
}
}
// update settings after mappings was successful to ensure
Expand All @@ -438,14 +473,3 @@ const updateExistingIndex = async ({
throw new Error(`could not update index template settings for ${indexName}`);
}
};

const getIndexMappings = async (indexName: string, callCluster: CallESAsCurrentUser) => {
try {
const indexMappings = await callCluster('indices.getMapping', {
index: indexName,
});
return indexMappings;
} catch (err) {
throw new Error(`could not get mapping from ${indexName}`);
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import {
import * as Registry from '../../registry';
import { loadFieldsFromYaml, Fields, Field } from '../../fields/field';
import { getPackageKeysByStatus } from '../../packages/get';
import { InstallationStatus, RegistryPackage, CallESAsCurrentUser } from '../../../../types';
import {
InstallationStatus,
RegistryPackage,
CallESAsCurrentUser,
DataType,
} from '../../../../types';
import { appContextService } from '../../../../services';

interface FieldFormatMap {
Expand Down Expand Up @@ -69,10 +74,7 @@ export interface IndexPatternField {
lang?: string;
readFromDocValues: boolean;
}
export enum IndexPatternType {
logs = 'logs',
metrics = 'metrics',
}

// TODO: use a function overload and make pkgName and pkgVersion required for install/update
// and not for an update removal. or separate out the functions
export async function installIndexPatterns(
Expand Down Expand Up @@ -117,7 +119,7 @@ export async function installIndexPatterns(
const packageVersionsInfo = await Promise.all(packageVersionsFetchInfoPromise);

// for each index pattern type, create an index pattern
const indexPatternTypes = [IndexPatternType.logs, IndexPatternType.metrics];
const indexPatternTypes = [DataType.logs, DataType.metrics];
indexPatternTypes.forEach(async (indexPatternType) => {
// if this is an update because a package is being uninstalled (no pkgkey argument passed) and no other packages are installed, remove the index pattern
if (!pkgName && installedPackages.length === 0) {
Expand All @@ -144,7 +146,7 @@ export async function installIndexPatterns(
// of all fields from all data streams matching data stream type
export const getAllDataStreamFieldsByType = async (
packages: RegistryPackage[],
dataStreamType: IndexPatternType
dataStreamType: DataType
): Promise<Fields> => {
const dataStreamsPromises = packages.reduce<Array<Promise<Field[]>>>((acc, pkg) => {
if (pkg.data_streams) {
Expand Down Expand Up @@ -389,7 +391,7 @@ export const ensureDefaultIndices = async (callCluster: CallESAsCurrentUser) =>
// that no matching indices exist https://github.com/elastic/kibana/issues/62343
const logger = appContextService.getLogger();
return Promise.all(
Object.keys(IndexPatternType).map(async (indexPattern) => {
Object.keys(DataType).map(async (indexPattern) => {
const defaultIndexPatternName = indexPattern + INDEX_PATTERN_PLACEHOLDER_SUFFIX;
const indexExists = await callCluster('indices.exists', { index: defaultIndexPatternName });
if (!indexExists) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/types/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export {
// Agent Request types
PostAgentEnrollRequest,
PostAgentCheckinRequest,
DataType,
} from '../../common';

export type CallESAsCurrentUser = LegacyScopedClusterClient['callAsCurrentUser'];
Expand Down
101 changes: 49 additions & 52 deletions x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ export default function (providerContext: FtrProviderContext) {
await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx');
};
const installPackage = async (pkg: string) => {
await supertest
return await supertest
.post(`/api/fleet/epm/packages/${pkg}`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true });
.send({ force: true })
.expect(200);
};

describe('datastreams', async () => {
skipIfNoDockerRegistry(providerContext);
before(async () => {
beforeEach(async () => {
await installPackage(pkgKey);
await es.transport.request({
method: 'POST',
Expand Down Expand Up @@ -61,8 +62,7 @@ export default function (providerContext: FtrProviderContext) {
},
});
});
after(async () => {
await uninstallPackage(pkgUpdateKey);
afterEach(async () => {
await es.transport.request({
method: 'DELETE',
path: `/_data_stream/${logsTemplateName}-default`,
Expand All @@ -71,60 +71,57 @@ export default function (providerContext: FtrProviderContext) {
method: 'DELETE',
path: `/_data_stream/${metricsTemplateName}-default`,
});
await uninstallPackage(pkgKey);
await uninstallPackage(pkgUpdateKey);
});
describe('get datastreams after data sent', async () => {
skipIfNoDockerRegistry(providerContext);
let resLogsDatastream: any;
let resMetricsDatastream: any;
before(async () => {
resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});
});
it('should list the logs datastream', async function () {
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${logsTemplateName}-default-000001`
);
it('should list the logs and metrics datastream', async function () {
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
it('should list the metrics datastream', async function () {
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${metricsTemplateName}-default-000001`
);
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${logsTemplateName}-default-000001`
);
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${metricsTemplateName}-default-000001`
);
});
describe('rollover datastream when mappings are not compatible', async () => {
skipIfNoDockerRegistry(providerContext);
let resLogsDatastream: any;
let resMetricsDatastream: any;
before(async () => {
await installPackage(pkgUpdateKey);
resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});

it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
await installPackage(pkgUpdateKey);
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
it('should have rolled over logs datastream', async function () {
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal(
`.ds-${logsTemplateName}-default-000002`
);
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal(
`.ds-${logsTemplateName}-default-000002`
);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});
it('should be able to upgrade a package after a rollover', async function () {
await es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-default/_rollover`,
});
it('should have not rolled over metrics datastream', async function () {
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
await installPackage(pkgUpdateKey);
});
});
}

0 comments on commit 10bbbc8

Please sign in to comment.