Skip to content

Commit

Permalink
[Fleet] parse from manifest and small refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
hop-dev committed Nov 3, 2022
1 parent 360d5d7 commit c0c6c7a
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 39 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ export interface RegistryElasticsearch {
'index_template.settings'?: estypes.IndicesIndexSettings;
'index_template.mappings'?: estypes.MappingTypeMapping;
'ingest_pipeline.name'?: string;
source_mode?: 'default' | 'synthetic';
}

export interface RegistryDataStreamPrivileges {
Expand Down
111 changes: 111 additions & 0 deletions x-pack/plugins/fleet/server/services/epm/archive/parse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { parseDefaultIngestPipeline, parseDataStreamElasticsearchEntry } from './parse';
describe('parseDefaultIngestPipeline', () => {
it('Should return undefined for stream without any elasticsearch dir', () => {
expect(
parseDefaultIngestPipeline({
pkgKey: 'pkg-1.0.0',
paths: ['pkg-1.0.0/data_stream/stream1/manifest.yml'],
dataStreamPath: 'stream1',
})
).toEqual(undefined);
});
it('Should return undefined for stream with non default ingest pipeline', () => {
expect(
parseDefaultIngestPipeline({
pkgKey: 'pkg-1.0.0',
paths: [
'pkg-1.0.0/data_stream/stream1/manifest.yml',
'pkg-1.0.0/data_stream/stream1/elasticsearch/ingest_pipeline/someotherpipeline.yml',
],
dataStreamPath: 'stream1',
})
).toEqual(undefined);
});
it('Should return default for yml ingest pipeline', () => {
expect(
parseDefaultIngestPipeline({
pkgKey: 'pkg-1.0.0',
paths: [
'pkg-1.0.0/data_stream/stream1/manifest.yml',
'pkg-1.0.0/data_stream/stream1/elasticsearch/ingest_pipeline/default.yml',
],
dataStreamPath: 'stream1',
})
).toEqual('default');
});
it('Should return default for json ingest pipeline', () => {
expect(
parseDefaultIngestPipeline({
pkgKey: 'pkg-1.0.0',
paths: [
'pkg-1.0.0/data_stream/stream1/manifest.yml',
'pkg-1.0.0/data_stream/stream1/elasticsearch/ingest_pipeline/default.json',
],
dataStreamPath: 'stream1',
})
).toEqual('default');
});
});

describe('parseDataStreamElasticsearchEntry', () => {
it('Should handle empty elasticsearch', () => {
expect(parseDataStreamElasticsearchEntry({})).toEqual({});
});
it('Should not include junk keys', () => {
expect(parseDataStreamElasticsearchEntry({ a: 1, b: 2 })).toEqual({});
});
it('Should add index pipeline', () => {
expect(parseDataStreamElasticsearchEntry({}, 'default')).toEqual({
'ingest_pipeline.name': 'default',
});
});
it('Should add privileges', () => {
expect(
parseDataStreamElasticsearchEntry({ privileges: { index: ['priv1'], cluster: ['priv2'] } })
).toEqual({ privileges: { index: ['priv1'], cluster: ['priv2'] } });
});
it('Should add source_mode', () => {
expect(parseDataStreamElasticsearchEntry({ source_mode: 'default' })).toEqual({
source_mode: 'default',
});
expect(parseDataStreamElasticsearchEntry({ source_mode: 'synthetic' })).toEqual({
source_mode: 'synthetic',
});
});
it('Should add index_template mappings and expand dots', () => {
expect(
parseDataStreamElasticsearchEntry({
index_template: { mappings: { dynamic: false, something: { 'dot.somethingelse': 'val' } } },
})
).toEqual({
'index_template.mappings': { dynamic: false, something: { dot: { somethingelse: 'val' } } },
});
});
it('Should add index_template settings and expand dots', () => {
expect(
parseDataStreamElasticsearchEntry({
index_template: {
settings: {
index: {
codec: 'best_compression',
'sort.field': 'monitor.id',
},
},
},
})
).toEqual({
'index_template.settings': {
index: {
codec: 'best_compression',
sort: { field: 'monitor.id' },
},
},
});
});
});
98 changes: 59 additions & 39 deletions x-pack/plugins/fleet/server/services/epm/archive/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,52 +295,19 @@ export function parseAndVerifyDataStreams(
elasticsearch,
...restOfProps
} = manifest;

if (!(dataStreamTitle && type)) {
throw new PackageInvalidArchiveError(
`Invalid manifest for data stream '${dataStreamPath}': one or more fields missing of 'title', 'type'`
);
}

let ingestPipeline;
const ingestPipelinePaths = paths.filter((filePath) =>
filePath.startsWith(`${pkgKey}/data_stream/${dataStreamPath}/elasticsearch/ingest_pipeline`)
);

if (
ingestPipelinePaths.length &&
(ingestPipelinePaths.some((ingestPipelinePath) =>
ingestPipelinePath.endsWith(DEFAULT_INGEST_PIPELINE_FILE_NAME_YML)
) ||
ingestPipelinePaths.some((ingestPipelinePath) =>
ingestPipelinePath.endsWith(DEFAULT_INGEST_PIPELINE_FILE_NAME_JSON)
))
) {
ingestPipeline = DEFAULT_INGEST_PIPELINE_VALUE;
}

const ingestPipeline = parseDefaultIngestPipeline({ pkgKey, dataStreamPath, paths });
const streams = parseAndVerifyStreams(manifestStreams, dataStreamPath);

const parsedElasticsearchEntry: Record<string, any> = {};

if (ingestPipeline) {
parsedElasticsearchEntry['ingest_pipeline.name'] = DEFAULT_INGEST_PIPELINE_VALUE;
}

if (elasticsearch?.privileges) {
parsedElasticsearchEntry.privileges = elasticsearch.privileges;
}

if (elasticsearch?.index_template?.mappings) {
parsedElasticsearchEntry['index_template.mappings'] = expandDottedEntries(
elasticsearch.index_template.mappings
);
}

if (elasticsearch?.index_template?.settings) {
parsedElasticsearchEntry['index_template.settings'] = expandDottedEntries(
elasticsearch.index_template.settings
);
}
const parsedElasticsearchEntry = parseDataStreamElasticsearchEntry(
elasticsearch,
ingestPipeline
);

// Build up the stream object here so we can conditionally insert nullable fields. The package registry omits undefined
// fields, so we're mimicking that behavior here.
Expand Down Expand Up @@ -534,3 +501,56 @@ export function parseAndVerifyInputs(manifestInputs: any, location: string): Reg
}
return inputs;
}

export function parseDataStreamElasticsearchEntry(
elasticsearch: Record<string, any>,
ingestPipeline?: string
) {
const parsedElasticsearchEntry: Record<string, any> = {};

if (ingestPipeline) {
parsedElasticsearchEntry['ingest_pipeline.name'] = ingestPipeline;
}

if (elasticsearch?.privileges) {
parsedElasticsearchEntry.privileges = elasticsearch.privileges;
}

if (elasticsearch?.source_mode) {
parsedElasticsearchEntry.source_mode = elasticsearch.source_mode;
}

if (elasticsearch?.index_template?.mappings) {
parsedElasticsearchEntry['index_template.mappings'] = expandDottedEntries(
elasticsearch.index_template.mappings
);
}

if (elasticsearch?.index_template?.settings) {
parsedElasticsearchEntry['index_template.settings'] = expandDottedEntries(
elasticsearch.index_template.settings
);
}

return parsedElasticsearchEntry;
}

const isDefaultPipelineFile = (pipelinePath: string) =>
pipelinePath.endsWith(DEFAULT_INGEST_PIPELINE_FILE_NAME_YML) ||
pipelinePath.endsWith(DEFAULT_INGEST_PIPELINE_FILE_NAME_JSON);

export function parseDefaultIngestPipeline(opts: {
pkgKey: string;
paths: string[];
dataStreamPath: string;
}) {
const { pkgKey, paths, dataStreamPath } = opts;
const ingestPipelineDirPath = `${pkgKey}/data_stream/${dataStreamPath}/elasticsearch/ingest_pipeline`;
const defaultIngestPipelinePaths = paths.filter(
(path) => path.startsWith(ingestPipelineDirPath) && isDefaultPipelineFile(path)
);

if (!defaultIngestPipelinePaths.length) return undefined;

return DEFAULT_INGEST_PIPELINE_VALUE;
}

0 comments on commit c0c6c7a

Please sign in to comment.