From 53b55937d8eda3014c25ef5edb47749aa274a801 Mon Sep 17 00:00:00 2001 From: Sean Story Date: Tue, 30 Aug 2022 08:42:18 -0500 Subject: [PATCH] First pass at creating pipeline definitions (#139595) * First pass at creating pipeline definitions * Linting and added a real test * fix bad import * [CI] Auto-commit changed files from 'node scripts/eslint --no-cache --fix' Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> --- .../routes/enterprise_search/indices.ts | 24 ++ .../utils/create_pipeline_definitions.test.ts | 36 +++ .../utils/create_pipeline_definitions.ts | 227 ++++++++++++++++++ 3 files changed, 287 insertions(+) create mode 100644 x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.test.ts create mode 100644 x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.ts diff --git a/x-pack/plugins/enterprise_search/server/routes/enterprise_search/indices.ts b/x-pack/plugins/enterprise_search/server/routes/enterprise_search/indices.ts index 6741721053b86..e481abea40a1a 100644 --- a/x-pack/plugins/enterprise_search/server/routes/enterprise_search/indices.ts +++ b/x-pack/plugins/enterprise_search/server/routes/enterprise_search/indices.ts @@ -6,6 +6,7 @@ */ import { schema } from '@kbn/config-schema'; + import { i18n } from '@kbn/i18n'; import { ErrorCode } from '../../../common/types/error_codes'; @@ -20,6 +21,7 @@ import { fetchIndices } from '../../lib/indices/fetch_indices'; import { generateApiKey } from '../../lib/indices/generate_api_key'; import { RouteDependencies } from '../../plugin'; import { createError } from '../../utils/create_error'; +import { createIndexPipelineDefinitions } from '../../utils/create_pipeline_definitions'; import { elasticsearchErrorHandler } from '../../utils/elasticsearch_error_handler'; import { isIndexNotFoundException } from '../../utils/identify_exceptions'; @@ -231,6 +233,28 @@ export function registerIndexRoutes({ router, log }: RouteDependencies) { }) ); + router.post( + { + path: '/internal/enterprise_search/indices/{indexName}/pipelines', + validate: { + params: schema.object({ + indexName: schema.string(), + }), + }, + }, + elasticsearchErrorHandler(log, async (context, request, response) => { + const indexName = decodeURIComponent(request.params.indexName); + const { client } = (await context.core).elasticsearch; + + const createResult = await createIndexPipelineDefinitions(indexName, client.asCurrentUser); + + return response.ok({ + body: createResult, + headers: { 'content-type': 'application/json' }, + }); + }) + ); + router.post( { path: '/internal/enterprise_search/indices', diff --git a/x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.test.ts b/x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.test.ts new file mode 100644 index 0000000000000..27208dbaed00e --- /dev/null +++ b/x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.test.ts @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core/server'; + +import { createIndexPipelineDefinitions } from './create_pipeline_definitions'; + +describe('createIndexPipelineDefinitions util function', () => { + const indexName = 'my-index'; + + const mockClient = { + ingest: { + putPipeline: jest.fn(), + }, + }; + + const expectedResult = { + created: [indexName, `${indexName}@custom`, `${indexName}@ml-inference`], + }; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should create the pipelines', () => { + mockClient.ingest.putPipeline.mockImplementation(() => Promise.resolve({ acknowledged: true })); + expect( + createIndexPipelineDefinitions(indexName, mockClient as unknown as ElasticsearchClient) + ).toEqual(expectedResult); + expect(mockClient.ingest.putPipeline).toHaveBeenCalledTimes(3); + }); +}); diff --git a/x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.ts b/x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.ts new file mode 100644 index 0000000000000..377f12fd63208 --- /dev/null +++ b/x-pack/plugins/enterprise_search/server/utils/create_pipeline_definitions.ts @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core/server'; + +export interface CreatedPipelines { + created: string[]; +} + +/** + * Used to create index-specific Ingest Pipelines to be used in conjunction with Enterprise Search + * ingestion mechanisms. Three pipelines are created: + * 1. `@ml-inference` - empty by default. Any ML Inference processors that are enabled + * for this index are intended to be encapsulated in this pipeline. + * 2. `@custom` - empty by default. The customer is encouraged to edit this pipeline + * manually in order to meet their business needs. + * 3. `` - includes processors for binary content extraction, whitespace reduction, and nests + * the above pipelines in itself via pipeline processors. + * @param indexName the index for which the pipelines should be created. + * @param esClient the Elasticsearch Client with which to create the pipelines. + */ +export const createIndexPipelineDefinitions = ( + indexName: string, + esClient: ElasticsearchClient +): CreatedPipelines => { + // TODO: add back descriptions (see: https://github.com/elastic/elasticsearch-specification/issues/1827) + esClient.ingest.putPipeline({ + description: `Enterprise Search Machine Learning Inference pipeline for the '${indexName}' index`, + id: `${indexName}@ml-inference`, + processors: [], + version: 1, + }); + esClient.ingest.putPipeline({ + description: `Enterprise Search customizable ingest pipeline for the '${indexName}' index`, + id: `${indexName}@custom`, + processors: [], + version: 1, + }); + esClient.ingest.putPipeline({ + _meta: { + managed: true, + managed_by: 'Enterprise Search', + }, + description: `Enterprise Search ingest pipeline for the '${indexName}' index`, + id: `${indexName}`, + processors: [ + { + attachment: { + field: '_attachment', + if: 'ctx?._extract_binary_content == true', + ignore_missing: true, + indexed_chars_field: '_attachment_indexed_chars', + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + [ + "Processor 'attachment' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + ], + }, + }, + ], + target_field: '_extracted_attachment', + }, + }, + { + set: { + field: 'body', + if: 'ctx?._extract_binary_content == true', + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + [ + "Processor 'set' with tag 'set_body' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + ], + }, + }, + ], + tag: 'set_body', + value: '{{{_extracted_attachment.content}}}', + }, + }, + { + pipeline: { + if: 'ctx?._run_ml_inference == true', + name: `${indexName}@ml-inference`, + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + "Processor 'pipeline' with tag 'index_ml_inference_pipeline' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + }, + }, + ], + tag: 'index_ml_inference_pipeline', + }, + }, + { + pipeline: { + name: `${indexName}@custom`, + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + "Processor 'pipeline' with tag 'index_custom_pipeline' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + }, + }, + ], + tag: 'index_custom_pipeline', + }, + }, + { + gsub: { + field: 'body', + if: 'ctx?._extract_binary_content == true', + ignore_missing: true, + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + "Processor 'gsub' with tag 'remove_replacement_chars' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + }, + }, + ], + pattern: '�', + replacement: '', + tag: 'remove_replacement_chars', + }, + }, + { + remove: { + field: [ + '_attachment', + '_attachment_indexed_chars', + '_extracted_attachment', + '_extract_binary_content', + ], + if: 'ctx?._extract_binary_content == true', + ignore_missing: true, + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + "Processor 'remove' with tag 'remove_attachment_fields' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + }, + }, + ], + tag: 'remove_attachment_fields', + }, + }, + { + gsub: { + field: 'body', + if: 'ctx?._reduce_whitespace == true', + ignore_missing: true, + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + "Processor 'gsub' with tag 'remove_extra_whitespace' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + }, + }, + ], + pattern: '\\s+', + replacement: ' ', + tag: 'remove_extra_whitespace', + }, + }, + { + trim: { + field: 'body', + if: 'ctx?._reduce_whitespace == true', + ignore_missing: true, + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + "Processor 'trim' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + }, + }, + ], + }, + }, + { + remove: { + field: ['_reduce_whitespace'], + if: 'ctx?._reduce_whitespace == true', + ignore_missing: true, + on_failure: [ + { + append: { + field: '_ingestion_errors', + value: [ + "Processor 'remove' with tag 'remove_whitespace_fields' in pipeline '{{ _ingest.on_failure_pipeline }}' failed with message '{{ _ingest.on_failure_message }}'", + ], + }, + }, + ], + tag: 'remove_whitespace_fields', + }, + }, + ], + version: 1, + }); + return { created: [indexName, `${indexName}@custom`, `${indexName}@ml-inference`] }; +};