Skip to content

Commit

Permalink
[FLEET] Adding support for installing ML models (#107710)
Browse files Browse the repository at this point in the history
* adds support for saved object based ml models

* adds es asset type and ml model install handler

* wip: handle top level pipeline install

* remove unnecessary mlModel savedObject type

* add package manifest license check

* get modelid from model path

* add fleet api test for ml model

* replace test mlModel for api test with smaller test model

* cleanup install/remove and ensure pipelines are retained when upgrading

* fix types - update test model id

* fix types

* remove hard coded ml category and check top level pipeline on upgrade

* update ml model test file

* ensure deduplicated asset refs are saved

* Fix api integration update test

Co-authored-by: Kibana Machine <[email protected]>
Co-authored-by: Nicolas Chaulet <[email protected]>
  • Loading branch information
3 people authored Oct 15, 2021
1 parent 5fcc118 commit c240ccf
Show file tree
Hide file tree
Showing 21 changed files with 523 additions and 49 deletions.
9 changes: 8 additions & 1 deletion x-pack/plugins/fleet/common/services/license.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import type { Observable, Subscription } from 'rxjs';

import type { ILicense } from '../../../licensing/common/types';
import type { ILicense, LicenseType } from '../../../licensing/common/types';

// Generic license service class that works with the license observable
// Both server and client plugins instancates a singleton version of this class
Expand Down Expand Up @@ -53,4 +53,11 @@ export class LicenseService {
this.licenseInformation?.hasAtLeast('enterprise')
);
}
public hasAtLeast(licenseType: LicenseType) {
return (
this.licenseInformation?.isAvailable &&
this.licenseInformation?.isActive &&
this.licenseInformation?.hasAtLeast(licenseType)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ describe('Fleet - packageToPackagePolicy', () => {
transform: [],
ilm_policy: [],
data_stream_ilm_policy: [],
ml_model: [],
},
},
status: 'not_installed',
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export enum ElasticsearchAssetType {
ilmPolicy = 'ilm_policy',
transform = 'transform',
dataStreamIlmPolicy = 'data_stream_ilm_policy',
mlModel = 'ml_model',
}

export type DataType = typeof dataTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ export const AssetsFacetGroup = ({ width }: Args) => {
elasticsearch: {
component_template: [],
data_stream_ilm_policy: [],
data_stream: [],
ilm_policy: [],
index_template: [],
ingest_pipeline: [],
transform: [],
ml_model: [],
},
}}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ export const AssetTitleMap: Record<DisplayedAssetType, string> = {
ml_module: i18n.translate('xpack.fleet.epm.assetTitles.mlModules', {
defaultMessage: 'ML modules',
}),
ml_model: i18n.translate('xpack.fleet.epm.assetTitles.mlModels', {
defaultMessage: 'ML models',
}),
view: i18n.translate('xpack.fleet.epm.assetTitles.views', {
defaultMessage: 'Views',
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
* 2.0.
*/

export { installPipelines } from './install';
export { installPipelines, isTopLevelPipeline } from './install';

export { deletePreviousPipelines, deletePipeline } from './remove';
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ interface RewriteSubstitution {
templateFunction: string;
}

export const isTopLevelPipeline = (path: string) => {
const pathParts = getPathParts(path);
return (
pathParts.type === ElasticsearchAssetType.ingestPipeline && pathParts.dataset === undefined
);
};

export const installPipelines = async (
installablePackage: InstallablePackage,
paths: string[],
Expand All @@ -39,25 +46,41 @@ export const installPipelines = async (
// so do not remove the currently installed pipelines here
const dataStreams = installablePackage.data_streams;
const { name: pkgName, version: pkgVersion } = installablePackage;
if (!dataStreams?.length) return [];
const pipelinePaths = paths.filter((path) => isPipeline(path));
const topLevelPipelinePaths = paths.filter((path) => isTopLevelPipeline(path));

if (!dataStreams?.length && topLevelPipelinePaths.length === 0) return [];

// get and save pipeline refs before installing pipelines
const pipelineRefs = dataStreams.reduce<EsAssetReference[]>((acc, dataStream) => {
const filteredPaths = pipelinePaths.filter((path) =>
isDataStreamPipeline(path, dataStream.path)
);
const pipelineObjectRefs = filteredPaths.map((path) => {
const { name } = getNameAndExtension(path);
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
dataStream,
packageVersion: installablePackage.version,
});
return { id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
let pipelineRefs = dataStreams
? dataStreams.reduce<EsAssetReference[]>((acc, dataStream) => {
const filteredPaths = pipelinePaths.filter((path) =>
isDataStreamPipeline(path, dataStream.path)
);
const pipelineObjectRefs = filteredPaths.map((path) => {
const { name } = getNameAndExtension(path);
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
dataStream,
packageVersion: installablePackage.version,
});
return { id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
});
acc.push(...pipelineObjectRefs);
return acc;
}, [])
: [];

const topLevelPipelineRefs = topLevelPipelinePaths.map((path) => {
const { name } = getNameAndExtension(path);
const nameForInstallation = getPipelineNameForInstallation({
pipelineName: name,
packageVersion: installablePackage.version,
});
acc.push(...pipelineObjectRefs);
return acc;
}, []);
return { id: nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
});

pipelineRefs = [...pipelineRefs, ...topLevelPipelineRefs];

// check that we don't duplicate the pipeline refs if the user is reinstalling
const installedPkg = await getInstallationObject({
Expand All @@ -73,19 +96,33 @@ export const installPipelines = async (
pkgVersion
);
await saveInstalledEsRefs(savedObjectsClient, installablePackage.name, pipelineRefs);
const pipelines = dataStreams.reduce<Array<Promise<EsAssetReference[]>>>((acc, dataStream) => {
if (dataStream.ingest_pipeline) {
acc.push(
installPipelinesForDataStream({
dataStream,
esClient,
paths: pipelinePaths,
pkgVersion: installablePackage.version,
})
);
}
return acc;
}, []);
const pipelines = dataStreams
? dataStreams.reduce<Array<Promise<EsAssetReference[]>>>((acc, dataStream) => {
if (dataStream.ingest_pipeline) {
acc.push(
installAllPipelines({
dataStream,
esClient,
paths: pipelinePaths,
pkgVersion: installablePackage.version,
})
);
}
return acc;
}, [])
: [];

if (topLevelPipelinePaths) {
pipelines.push(
installAllPipelines({
dataStream: undefined,
esClient,
paths: topLevelPipelinePaths,
pkgVersion: installablePackage.version,
})
);
}

return await Promise.all(pipelines).then((results) => results.flat());
};

Expand All @@ -110,7 +147,7 @@ export function rewriteIngestPipeline(
return pipeline;
}

export async function installPipelinesForDataStream({
export async function installAllPipelines({
esClient,
pkgVersion,
paths,
Expand All @@ -119,9 +156,11 @@ export async function installPipelinesForDataStream({
esClient: ElasticsearchClient;
pkgVersion: string;
paths: string[];
dataStream: RegistryDataStream;
dataStream?: RegistryDataStream;
}): Promise<EsAssetReference[]> {
const pipelinePaths = paths.filter((path) => isDataStreamPipeline(path, dataStream.path));
const pipelinePaths = dataStream
? paths.filter((path) => isDataStreamPipeline(path, dataStream.path))
: paths;
let pipelines: any[] = [];
const substitutions: RewriteSubstitution[] = [];

Expand Down Expand Up @@ -256,11 +295,15 @@ export const getPipelineNameForInstallation = ({
packageVersion,
}: {
pipelineName: string;
dataStream: RegistryDataStream;
dataStream?: RegistryDataStream;
packageVersion: string;
}): string => {
const isPipelineEntry = pipelineName === dataStream.ingest_pipeline;
const suffix = isPipelineEntry ? '' : `-${pipelineName}`;
// if this is the pipeline entry, don't add a suffix
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}${suffix}`;
if (dataStream !== undefined) {
const isPipelineEntry = pipelineName === dataStream.ingest_pipeline;
const suffix = isPipelineEntry ? '' : `-${pipelineName}`;
// if this is the pipeline entry, don't add a suffix
return `${dataStream.type}-${dataStream.dataset}-${packageVersion}${suffix}`;
}
// It's a top-level pipeline
return `${packageVersion}-${pipelineName}`;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export { getAsset } from '../../archive';
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export { installMlModel } from './install';
export { deleteMlModel } from './remove';
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ElasticsearchClient, SavedObjectsClientContract } from 'kibana/server';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';

import { saveInstalledEsRefs } from '../../packages/install';
import { getPathParts } from '../../archive';
import { ElasticsearchAssetType } from '../../../../../common/types/models';
import type { EsAssetReference, InstallablePackage } from '../../../../../common/types/models';

import { getAsset } from './common';

interface MlModelInstallation {
installationName: string;
content: string;
}

export const installMlModel = async (
installablePackage: InstallablePackage,
paths: string[],
esClient: ElasticsearchClient,
savedObjectsClient: SavedObjectsClientContract
) => {
const mlModelPath = paths.find((path) => isMlModel(path));

const installedMlModels: EsAssetReference[] = [];
if (mlModelPath !== undefined) {
const content = getAsset(mlModelPath).toString('utf-8');
const pathParts = mlModelPath.split('/');
const modelId = pathParts[pathParts.length - 1].replace('.json', '');

const mlModelRef = {
id: modelId,
type: ElasticsearchAssetType.mlModel,
};

// get and save ml model refs before installing ml model
await saveInstalledEsRefs(savedObjectsClient, installablePackage.name, [mlModelRef]);

const mlModel: MlModelInstallation = {
installationName: modelId,
content,
};

const result = await handleMlModelInstall({ esClient, mlModel });
installedMlModels.push(result);
}
return installedMlModels;
};

const isMlModel = (path: string) => {
const pathParts = getPathParts(path);

return !path.endsWith('/') && pathParts.type === ElasticsearchAssetType.mlModel;
};

async function handleMlModelInstall({
esClient,
mlModel,
}: {
esClient: ElasticsearchClient;
mlModel: MlModelInstallation;
}): Promise<EsAssetReference> {
try {
await esClient.ml.putTrainedModel({
model_id: mlModel.installationName,
defer_definition_decompression: true,
timeout: '45s',
body: mlModel.content,
});
} catch (err) {
// swallow the error if the ml model already exists.
const isAlreadyExistError =
err instanceof ResponseError &&
err?.body?.error?.type === 'resource_already_exists_exception';
if (!isAlreadyExistError) {
throw err;
}
}

return { id: mlModel.installationName, type: ElasticsearchAssetType.mlModel };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ElasticsearchClient } from 'kibana/server';

import { appContextService } from '../../../app_context';

export const deleteMlModel = async (esClient: ElasticsearchClient, mlModelIds: string[]) => {
const logger = appContextService.getLogger();
if (mlModelIds.length) {
logger.info(`Deleting currently installed ml model ids ${mlModelIds}`);
}
await Promise.all(
mlModelIds.map(async (modelId) => {
await esClient.ml.deleteTrainedModel({ model_id: modelId }, { ignore: [404] });
logger.info(`Deleted: ${modelId}`);
})
);
};
Loading

0 comments on commit c240ccf

Please sign in to comment.