From 6378ff3ac977eb02b5967b34fd7d3cb4413b062f Mon Sep 17 00:00:00 2001 From: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Date: Mon, 14 Oct 2024 23:16:35 +1100 Subject: [PATCH] [8.x] [Auto Import] CSV format support (#194386) (#196090) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Backport This will backport the following commits from `main` to `8.x`: - [[Auto Import] CSV format support (#194386)](https://github.com/elastic/kibana/pull/194386) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) Co-authored-by: Ilya Nikokoshev --- .../__jest__/fixtures/log_type_detection.ts | 2 + .../analyze_logs/analyze_logs_route.gen.ts | 4 + .../analyze_logs_route.schema.yaml | 6 + .../common/api/generation_error.ts | 41 +++ .../common/api/model/api_test.mock.ts | 2 + .../common/api/model/common_attributes.gen.ts | 20 ++ .../api/model/common_attributes.schema.yaml | 18 ++ .../integration_assistant/common/constants.ts | 3 +- .../integration_assistant/common/index.ts | 3 +- .../generation_modal.test.tsx | 2 + .../data_stream_step/generation_modal.tsx | 2 +- .../data_stream_step/sample_logs_input.tsx | 3 - .../steps/data_stream_step/translations.ts | 39 ++- .../steps/data_stream_step/use_generation.tsx | 24 +- .../server/graphs/csv/columns.test.ts | 243 ++++++++++++++++++ .../server/graphs/csv/columns.ts | 115 +++++++++ .../server/graphs/csv/csv.ts | 100 +++++++ .../server/graphs/kv/constants.ts | 10 - .../server/graphs/kv/graph.test.ts | 6 +- .../server/graphs/kv/validate.ts | 19 +- .../graphs/log_type_detection/constants.ts | 9 +- .../log_type_detection/detection.test.ts | 15 +- .../graphs/log_type_detection/detection.ts | 19 +- .../graphs/log_type_detection/graph.test.ts | 6 +- .../server/graphs/log_type_detection/graph.ts | 22 +- .../graphs/log_type_detection/prompts.ts | 39 +-- .../server/graphs/log_type_detection/types.ts | 1 + .../server/graphs/unstructured/constants.ts | 8 - .../server/graphs/unstructured/graph.test.ts | 6 +- .../server/graphs/unstructured/validate.ts | 7 +- .../lib/errors/unparseable_csv_error.ts | 43 ++++ .../server/lib/errors/unsupported_error.ts | 4 +- .../server/routes/analyze_logs_routes.ts | 19 +- .../server/routes/build_integration_routes.ts | 4 +- .../server/routes/categorization_routes.ts | 4 +- .../server/routes/ecs_routes.ts | 4 +- .../server/routes/pipeline_routes.ts | 4 +- .../server/routes/related_routes.ts | 4 +- .../server/routes/routes_util.test.ts | 6 +- .../server/routes/routes_util.ts | 6 +- .../integration_assistant/server/types.ts | 4 +- .../server/util/index.ts | 2 +- .../server/util/pipeline.ts | 23 +- .../server/util/processors.ts | 61 +++++ .../translations/translations/fr-FR.json | 1 - .../translations/translations/ja-JP.json | 1 - .../translations/translations/zh-CN.json | 1 - 47 files changed, 853 insertions(+), 132 deletions(-) create mode 100644 x-pack/plugins/integration_assistant/common/api/generation_error.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/csv/columns.test.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/csv/columns.ts create mode 100644 x-pack/plugins/integration_assistant/server/graphs/csv/csv.ts create mode 100644 x-pack/plugins/integration_assistant/server/lib/errors/unparseable_csv_error.ts diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts index b3c1e4c05ebd9..799dc45f8aadc 100644 --- a/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/log_type_detection.ts @@ -14,6 +14,8 @@ export const logFormatDetectionTestState = { exAnswer: 'testanswer', packageName: 'testPackage', dataStreamName: 'testDatastream', + packageTitle: 'Test Title', + dataStreamTitle: 'Test Datastream Title', finalized: false, samplesFormat: { name: SamplesFormatName.Values.structured }, header: true, diff --git a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.gen.ts b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.gen.ts index a224bb3cbe241..5e3c09c5fc74e 100644 --- a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.gen.ts +++ b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.gen.ts @@ -19,6 +19,8 @@ import { z } from '@kbn/zod'; import { PackageName, DataStreamName, + PackageTitle, + DataStreamTitle, LogSamples, Connector, LangSmithOptions, @@ -29,6 +31,8 @@ export type AnalyzeLogsRequestBody = z.infer; export const AnalyzeLogsRequestBody = z.object({ packageName: PackageName, dataStreamName: DataStreamName, + packageTitle: PackageTitle, + dataStreamTitle: DataStreamTitle, logSamples: LogSamples, connectorId: Connector, langSmithOptions: LangSmithOptions.optional(), diff --git a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml index 165a2cff91a06..298abbc1201ea 100644 --- a/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml +++ b/x-pack/plugins/integration_assistant/common/api/analyze_logs/analyze_logs_route.schema.yaml @@ -22,11 +22,17 @@ paths: - connectorId - packageName - dataStreamName + - packageTitle + - dataStreamTitle properties: packageName: $ref: "../model/common_attributes.schema.yaml#/components/schemas/PackageName" dataStreamName: $ref: "../model/common_attributes.schema.yaml#/components/schemas/DataStreamName" + packageTitle: + $ref: "../model/common_attributes.schema.yaml#/components/schemas/PackageTitle" + dataStreamTitle: + $ref: "../model/common_attributes.schema.yaml#/components/schemas/DataStreamTitle" logSamples: $ref: "../model/common_attributes.schema.yaml#/components/schemas/LogSamples" connectorId: diff --git a/x-pack/plugins/integration_assistant/common/api/generation_error.ts b/x-pack/plugins/integration_assistant/common/api/generation_error.ts new file mode 100644 index 0000000000000..e96ad8bff9b59 --- /dev/null +++ b/x-pack/plugins/integration_assistant/common/api/generation_error.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { GenerationErrorCode } from '../constants'; + +// Errors raised by the generation process should provide information through this interface. +export interface GenerationErrorBody { + message: string; + attributes: GenerationErrorAttributes; +} + +export function isGenerationErrorBody(obj: unknown | undefined): obj is GenerationErrorBody { + return ( + typeof obj === 'object' && + obj !== null && + 'message' in obj && + typeof obj.message === 'string' && + 'attributes' in obj && + obj.attributes !== undefined && + isGenerationErrorAttributes(obj.attributes) + ); +} + +export interface GenerationErrorAttributes { + errorCode: GenerationErrorCode; + underlyingMessages: string[] | undefined; +} + +export function isGenerationErrorAttributes(obj: unknown): obj is GenerationErrorAttributes { + return ( + typeof obj === 'object' && + obj !== null && + 'errorCode' in obj && + typeof obj.errorCode === 'string' && + (!('underlyingMessages' in obj) || Array.isArray(obj.underlyingMessages)) + ); +} diff --git a/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts b/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts index ea2aa61417526..c8f6967503ac3 100644 --- a/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts +++ b/x-pack/plugins/integration_assistant/common/api/model/api_test.mock.ts @@ -96,6 +96,8 @@ export const getRelatedRequestMock = (): RelatedRequestBody => ({ export const getAnalyzeLogsRequestBody = (): AnalyzeLogsRequestBody => ({ dataStreamName: 'test-data-stream-name', packageName: 'test-package-name', + packageTitle: 'Test package title', + dataStreamTitle: 'Test data stream title', connectorId: 'test-connector-id', logSamples: rawSamples, }); diff --git a/x-pack/plugins/integration_assistant/common/api/model/common_attributes.gen.ts b/x-pack/plugins/integration_assistant/common/api/model/common_attributes.gen.ts index 7b64b4f8a88d8..803f9b8a6c3af 100644 --- a/x-pack/plugins/integration_assistant/common/api/model/common_attributes.gen.ts +++ b/x-pack/plugins/integration_assistant/common/api/model/common_attributes.gen.ts @@ -31,6 +31,18 @@ export const PackageName = z.string().min(1); export type DataStreamName = z.infer; export const DataStreamName = z.string().min(1); +/** + * Package title for the integration to be built. + */ +export type PackageTitle = z.infer; +export const PackageTitle = z.string().min(1); + +/** + * DataStream title for the integration to be built. + */ +export type DataStreamTitle = z.infer; +export const DataStreamTitle = z.string().min(1); + /** * String form of the input logsamples. */ @@ -86,6 +98,14 @@ export const SamplesFormat = z.object({ * For some formats, specifies whether the samples can be multiline. */ multiline: z.boolean().optional(), + /** + * For CSV format, specifies whether the samples have a header row. For other formats, specifies the presence of header in each row. + */ + header: z.boolean().optional(), + /** + * For CSV format, specifies the column names proposed by the LLM. + */ + columns: z.array(z.string()).optional(), /** * For a JSON format, describes how to get to the sample array from the root of the JSON. */ diff --git a/x-pack/plugins/integration_assistant/common/api/model/common_attributes.schema.yaml b/x-pack/plugins/integration_assistant/common/api/model/common_attributes.schema.yaml index aba43d0174bb8..900b6e362a754 100644 --- a/x-pack/plugins/integration_assistant/common/api/model/common_attributes.schema.yaml +++ b/x-pack/plugins/integration_assistant/common/api/model/common_attributes.schema.yaml @@ -16,6 +16,16 @@ components: minLength: 1 description: DataStream name for the integration to be built. + PackageTitle: + type: string + minLength: 1 + description: Package title for the integration to be built. + + DataStreamTitle: + type: string + minLength: 1 + description: DataStream title for the integration to be built. + LogSamples: type: array items: @@ -66,6 +76,14 @@ components: multiline: type: boolean description: For some formats, specifies whether the samples can be multiline. + header: + type: boolean + description: For CSV format, specifies whether the samples have a header row. For other formats, specifies the presence of header in each row. + columns: + type: array + description: For CSV format, specifies the column names proposed by the LLM. + items: + type: string json_path: type: array description: For a JSON format, describes how to get to the sample array from the root of the JSON. diff --git a/x-pack/plugins/integration_assistant/common/constants.ts b/x-pack/plugins/integration_assistant/common/constants.ts index 1472a260fadf0..d652f661f10bb 100644 --- a/x-pack/plugins/integration_assistant/common/constants.ts +++ b/x-pack/plugins/integration_assistant/common/constants.ts @@ -30,8 +30,9 @@ export const MINIMUM_LICENSE_TYPE: LicenseType = 'enterprise'; // ErrorCodes -export enum ErrorCode { +export enum GenerationErrorCode { RECURSION_LIMIT = 'recursion-limit', RECURSION_LIMIT_ANALYZE_LOGS = 'recursion-limit-analyze-logs', UNSUPPORTED_LOG_SAMPLES_FORMAT = 'unsupported-log-samples-format', + UNPARSEABLE_CSV_DATA = 'unparseable-csv-data', } diff --git a/x-pack/plugins/integration_assistant/common/index.ts b/x-pack/plugins/integration_assistant/common/index.ts index 5e90e6e6a6217..b16254f9e11e2 100644 --- a/x-pack/plugins/integration_assistant/common/index.ts +++ b/x-pack/plugins/integration_assistant/common/index.ts @@ -27,10 +27,9 @@ export type { Integration, Pipeline, Docs, - SamplesFormat, LangSmithOptions, } from './api/model/common_attributes.gen'; -export { SamplesFormatName } from './api/model/common_attributes.gen'; +export { SamplesFormat, SamplesFormatName } from './api/model/common_attributes.gen'; export type { ESProcessorItem } from './api/model/processor_attributes.gen'; export type { CelInput } from './api/model/cel_input_attributes.gen'; diff --git a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx index a8e6a30ca5dfa..25ed031129777 100644 --- a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx +++ b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.test.tsx @@ -105,6 +105,8 @@ describe('GenerationModal', () => { it('should call runAnalyzeLogsGraph with correct parameters', () => { expect(mockRunAnalyzeLogsGraph).toHaveBeenCalledWith({ ...defaultRequest, + packageTitle: 'Mocked Integration title', + dataStreamTitle: 'Mocked Data Stream Title', logSamples: integrationSettingsNonJSON.logSamples ?? [], }); }); diff --git a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx index 21f82532dc21c..ba57d83618c13 100644 --- a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx +++ b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/generation_modal.tsx @@ -82,7 +82,7 @@ export const GenerationModal = React.memo( {error ? ( (({ integrationSe {i18n.LOGS_SAMPLE_DESCRIPTION} - - {i18n.LOGS_SAMPLE_DESCRIPTION_2} - } onChange={onChangeLogsSample} diff --git a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/translations.ts b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/translations.ts index 017a1a9c29caa..48793d20496d6 100644 --- a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/translations.ts +++ b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/translations.ts @@ -6,7 +6,8 @@ */ import { i18n } from '@kbn/i18n'; -import { ErrorCode } from '../../../../../../common/constants'; +import { GenerationErrorCode } from '../../../../../../common/constants'; +import type { GenerationErrorAttributes } from '../../../../../../common/api/generation_error'; export const INTEGRATION_NAME_TITLE = i18n.translate( 'xpack.integrationAssistant.step.dataStream.integrationNameTitle', @@ -109,12 +110,6 @@ export const LOGS_SAMPLE_DESCRIPTION = i18n.translate( defaultMessage: 'Drag and drop a file or Browse files.', } ); -export const LOGS_SAMPLE_DESCRIPTION_2 = i18n.translate( - 'xpack.integrationAssistant.step.dataStream.logsSample.description2', - { - defaultMessage: 'JSON/NDJSON format', - } -); export const LOGS_SAMPLE_TRUNCATED = (maxRows: number) => i18n.translate('xpack.integrationAssistant.step.dataStream.logsSample.truncatedWarning', { values: { maxRows }, @@ -188,7 +183,7 @@ export const PROGRESS_RELATED_GRAPH = i18n.translate( defaultMessage: 'Generating related fields', } ); -export const GENERATION_ERROR = (progressStep: string) => +export const GENERATION_ERROR_TITLE = (progressStep: string) => i18n.translate('xpack.integrationAssistant.step.dataStream.generationError', { values: { progressStep }, defaultMessage: 'An error occurred during: {progressStep}', @@ -198,24 +193,44 @@ export const RETRY = i18n.translate('xpack.integrationAssistant.step.dataStream. defaultMessage: 'Retry', }); -export const ERROR_TRANSLATION: Record = { - [ErrorCode.RECURSION_LIMIT_ANALYZE_LOGS]: i18n.translate( +export const GENERATION_ERROR_TRANSLATION: Record< + GenerationErrorCode, + string | ((attributes: GenerationErrorAttributes) => string) +> = { + [GenerationErrorCode.RECURSION_LIMIT_ANALYZE_LOGS]: i18n.translate( 'xpack.integrationAssistant.errors.recursionLimitAnalyzeLogsErrorMessage', { defaultMessage: 'Please verify the format of log samples is correct and try again. Try with a fewer samples if error persists.', } ), - [ErrorCode.RECURSION_LIMIT]: i18n.translate( + [GenerationErrorCode.RECURSION_LIMIT]: i18n.translate( 'xpack.integrationAssistant.errors.recursionLimitReached', { defaultMessage: 'Max attempts exceeded. Please try again.', } ), - [ErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT]: i18n.translate( + [GenerationErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT]: i18n.translate( 'xpack.integrationAssistant.errors.unsupportedLogSamples', { defaultMessage: 'Unsupported log format in the samples.', } ), + [GenerationErrorCode.UNPARSEABLE_CSV_DATA]: (attributes) => { + if ( + attributes.underlyingMessages !== undefined && + attributes.underlyingMessages?.length !== 0 + ) { + return i18n.translate('xpack.integrationAssistant.errors.uparseableCSV.withReason', { + values: { + reason: attributes.underlyingMessages[0], + }, + defaultMessage: `Cannot parse the samples as the CSV data (reason: {reason}). Please check the provided samples.`, + }); + } else { + return i18n.translate('xpack.integrationAssistant.errors.uparseableCSV.withoutReason', { + defaultMessage: `Cannot parse the samples as the CSV data. Please check the provided samples.`, + }); + } + }, }; diff --git a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/use_generation.tsx b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/use_generation.tsx index d062a0ff8b836..566451d624c5e 100644 --- a/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/use_generation.tsx +++ b/x-pack/plugins/integration_assistant/public/components/create_integration/create_integration_assistant/steps/data_stream_step/use_generation.tsx @@ -16,6 +16,7 @@ import { type EcsMappingRequestBody, type RelatedRequestBody, } from '../../../../../../common'; +import { isGenerationErrorBody } from '../../../../../../common/api/generation_error'; import { runCategorizationGraph, runEcsGraph, @@ -26,7 +27,6 @@ import { useKibana } from '../../../../../common/hooks/use_kibana'; import type { State } from '../../state'; import * as i18n from './translations'; import { useTelemetry } from '../../../telemetry'; -import type { ErrorCode } from '../../../../../../common/constants'; import type { AIConnector, IntegrationSettings } from '../../types'; export type OnComplete = (result: State['result']) => void; @@ -46,6 +46,18 @@ interface RunGenerationProps { setProgress: (progress: ProgressItem) => void; } +// If the result is classified as a generation error, produce an error message +// as defined in the i18n file. Otherwise, return undefined. +function generationErrorMessage(body: unknown | undefined): string | undefined { + if (!isGenerationErrorBody(body)) { + return; + } + + const errorCode = body.attributes.errorCode; + const translation = i18n.GENERATION_ERROR_TRANSLATION[errorCode]; + return typeof translation === 'function' ? translation(body.attributes) : translation; +} + interface GenerationResults { pipeline: Pipeline; docs: Docs; @@ -96,12 +108,7 @@ export const useGeneration = ({ error: originalErrorMessage, }); - let errorMessage = originalErrorMessage; - const errorCode = e.body?.attributes?.errorCode as ErrorCode | undefined; - if (errorCode != null) { - errorMessage = i18n.ERROR_TRANSLATION[errorCode]; - } - setError(errorMessage); + setError(generationErrorMessage(e.body) ?? originalErrorMessage); } finally { setIsRequesting(false); } @@ -145,6 +152,9 @@ async function runGeneration({ const analyzeLogsRequest: AnalyzeLogsRequestBody = { packageName: integrationSettings.name ?? '', dataStreamName: integrationSettings.dataStreamName ?? '', + packageTitle: integrationSettings.title ?? integrationSettings.name ?? '', + dataStreamTitle: + integrationSettings.dataStreamTitle ?? integrationSettings.dataStreamName ?? '', logSamples: integrationSettings.logSamples ?? [], connectorId: connector.id, langSmithOptions: getLangSmithOptions(), diff --git a/x-pack/plugins/integration_assistant/server/graphs/csv/columns.test.ts b/x-pack/plugins/integration_assistant/server/graphs/csv/columns.test.ts new file mode 100644 index 0000000000000..4e84fb9f00af0 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/csv/columns.test.ts @@ -0,0 +1,243 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + upperBoundForColumnCount, + generateColumnNames, + columnsFromHeader, + totalColumnCount, + toSafeColumnName, + yieldUniqueColumnNames, +} from './columns'; + +describe('upperBoundForColumnCount', () => { + it('should return the correct number of columns for a simple CSV', () => { + const samples = ['name,age,location', 'john,30,new york', 'jane,25,los angeles']; + expect(upperBoundForColumnCount(samples)).toBe(3); + }); + + it('should handle samples with varying column counts', () => { + const samples = ['name,age,location', 'john,30', 'jane,25,los angeles,usa']; + expect(upperBoundForColumnCount(samples)).toBe(4); + }); + + it('should return 0 for empty samples', () => { + const samples: string[] = []; + expect(upperBoundForColumnCount(samples)).toBe(0); + }); + + it('should handle samples with empty strings', () => { + const samples = ['', 'john,30,new york', 'jane,25,los angeles']; + expect(upperBoundForColumnCount(samples)).toBe(3); + }); + + it('should handle samples with only one column', () => { + const samples = ['name', 'john', 'jane']; + expect(upperBoundForColumnCount(samples)).toBe(1); + }); + + it('should handle samples with extra commas', () => { + const samples = ['name,age,location', 'john,30', 'jane,25,"los angeles,usa"']; + expect(upperBoundForColumnCount(samples)).toBeGreaterThanOrEqual(3); + }); +}); + +describe('generateColumnNames', () => { + it('should generate the correct number of column names', () => { + const count = 5; + const expected = ['column1', 'column2', 'column3', 'column4', 'column5']; + expect(generateColumnNames(count)).toEqual(expected); + }); + + it('should return an empty array when count is 0', () => { + const count = 0; + const expected: string[] = []; + expect(generateColumnNames(count)).toEqual(expected); + }); + + it('should handle large counts correctly', () => { + const count = 100; + const result = generateColumnNames(count); + expect(result.length).toBe(count); + expect(result[0]).toBe('column1'); + expect(result[count - 1]).toBe('column100'); + }); +}); + +describe('columnsFromHeader', () => { + it('should return the correct columns from the header object', () => { + const tempColumnNames = ['column1', 'column2', 'column3']; + const headerObject = { column1: 'name', column2: 'age', column3: 'location' }; + expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual(['name', 'age', 'location']); + }); + + it('should return an empty array if no columns match', () => { + const tempColumnNames = ['column1', 'column2', 'column3']; + const headerObject = { column4: 'name', column5: 'age', column6: 'location' }; + expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([]); + }); + + it('should handle missing columns in the header object', () => { + const tempColumnNames = ['column1', 'column2', 'column3', 'column4']; + const headerObject = { column1: 'name', column3: 'location' }; + expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([ + 'name', + undefined, + 'location', + ]); + }); + + it('should handle an empty header object', () => { + const tempColumnNames = ['column1', 'column2', 'column3']; + const headerObject = {}; + expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([]); + }); + + it('should handle an empty tempColumnNames array', () => { + const tempColumnNames: string[] = []; + const headerObject = { column1: 'name', column2: 'age', column3: 'location' }; + expect(columnsFromHeader(tempColumnNames, headerObject)).toEqual([]); + }); +}); + +describe('totalColumnCount', () => { + it('should return the correct total column count for a simple CSV', () => { + const tempColumnNames = ['column1', 'column2', 'column3']; + const csvRows = [ + { column1: 'john', column2: '30', column3: 'new york' }, + { column1: 'jane', column3: '25', column4: 'los angeles' }, + ]; + expect(totalColumnCount(tempColumnNames, csvRows)).toBe(3); + }); + + it('should handle rows with varying column counts', () => { + const tempColumnNames = ['column1', 'column2', 'column3', 'column4']; + const csvRows = [ + { column1: 'john', column2: '30' }, + { column1: 'jane', column3: 'los angeles', column4: 'usa' }, + ]; + expect(totalColumnCount(tempColumnNames, csvRows)).toBe(4); + }); + + it('should return 0 for empty rows', () => { + const tempColumnNames = ['column1', 'column2', 'column3']; + expect(totalColumnCount(tempColumnNames, [])).toBe(0); + }); + + it('should handle rows with empty objects', () => { + const tempColumnNames = ['column1', 'column2', 'column3']; + const csvRows = [ + {}, + { column1: 'john', column2: '30', column3: 'new york' }, + { column1: 'jane', column2: '25', column3: 'los angeles' }, + ]; + expect(totalColumnCount(tempColumnNames, csvRows)).toBe(3); + }); + + it('should handle rows with only one column', () => { + const tempColumnNames = ['column1']; + const csvRows = [{ column1: 'john' }, { column1: 'jane' }]; + expect(totalColumnCount(tempColumnNames, csvRows)).toBe(1); + }); + + it('should handle rows with extra columns', () => { + const tempColumnNames = ['column1', 'column2', 'column3']; + const csvRows = [ + { column1: 'john', column2: '30' }, + { column1: 'jane', column2: '25', column3: 'los angeles', column4: 'usa' }, + ]; + expect(totalColumnCount(tempColumnNames, csvRows)).toBe(3); + }); + + describe('toSafeColumnName', () => { + it('should return undefined for non-string and non-number inputs', () => { + expect(toSafeColumnName(null)).toBeUndefined(); + expect(toSafeColumnName(undefined)).toBeUndefined(); + expect(toSafeColumnName({})).toBeUndefined(); + expect(toSafeColumnName([1, 2])).toBeUndefined(); + }); + + it('should replace non-alphanumeric characters with underscores', () => { + expect(toSafeColumnName('name@age!location')).toBe('name_age_location'); + expect(toSafeColumnName('column#1')).toBe('column_1'); + }); + + it('should return the same string if it contains only alphanumeric characters and underscores', () => { + expect(toSafeColumnName('Column1')).toBe('Column1'); + expect(toSafeColumnName('Location')).toBe('Location'); + }); + + it('should handle empty strings', () => { + expect(toSafeColumnName('')).toBeUndefined(); + }); + + it('should handle strings starting from a digit or numbers', () => { + expect(toSafeColumnName('1ABC')).toBe('Column1ABC'); + expect(toSafeColumnName(123)).toBe('Column123'); + }); + }); +}); + +describe('yieldUniqueColumnNames', () => { + it('should yield unique column names based on preferred and fallback names', () => { + const count = 5; + const preferredNames = [ + ['name1', 'name2', undefined, 'name4', undefined], + [undefined, 'altName2', 'altName3', undefined, 'altName5'], + ]; + const fallbackNames = ['fallback1', 'fallback2', 'fallback3', 'fallback4', 'fallback5']; + + const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames)); + expect(result).toEqual(['name1', 'name2', 'altName3', 'name4', 'altName5']); + }); + + it('should use fallback names when preferred names are not provided', () => { + const count = 3; + const preferredNames = [['name1', undefined, 'name3']]; + const fallbackNames = ['fallback1', 'fallback2', 'fallback3']; + + const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames)); + expect(result).toEqual(['name1', 'fallback2', 'name3']); + }); + + it('should append postfix to duplicate names to ensure uniqueness', () => { + const count = 4; + const preferredNames = [['name', 'name', 'name', 'name']]; + const fallbackNames = ['fallback1', 'fallback2', 'fallback3', 'fallback4']; + + const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames)); + expect(result).toEqual(['name', 'name_2', 'name_3', 'name_4']); + }); + + it('should handle mixed preferred and fallback names with duplicates', () => { + const count = 6; + const preferredNames = [ + ['name', undefined, 'name', undefined, undefined, undefined], + [undefined, 'altName', undefined, 'altName', undefined, 'altName'], + ]; + const fallbackNames = [ + 'fallback1', + 'fallback2', + 'fallback3', + 'fallback4', + 'fallback5', + 'fallback6', + ]; + + const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames)); + expect(result).toEqual(['name', 'altName', 'name_2', 'altName_2', 'fallback5', 'altName_3']); + }); + + it('should handle empty preferred names', () => { + const count = 3; + const preferredNames: Array> = []; + const fallbackNames: string[] = ['fallback1', 'fallback2', 'fallback3']; + + const result = Array.from(yieldUniqueColumnNames(count, preferredNames, fallbackNames)); + expect(result).toEqual(['fallback1', 'fallback2', 'fallback3']); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/csv/columns.ts b/x-pack/plugins/integration_assistant/server/graphs/csv/columns.ts new file mode 100644 index 0000000000000..108c4cec75bf6 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/csv/columns.ts @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +// Estimates from above the number of columns in the CSV samples. +export function upperBoundForColumnCount(csvSamples: string[]): number { + return Math.max(0, ...csvSamples.map((sample) => sample.split(',').length)); +} + +// Generates a list of temporary column names. +export function generateColumnNames(count: number): string[] { + return Array.from({ length: count }).map((_, i) => `column${i + 1}`); +} + +// Converts a column name into a safe one to use in the `if ctx...` clause. +// Result must pass rules at https://www.elastic.co/guide/en/elasticsearch/painless/8.15/painless-identifiers.html +export function toSafeColumnName(columnName: unknown): string | undefined { + if (typeof columnName === 'number') { + return `Column${columnName}`; + } + if (typeof columnName !== 'string') { + return undefined; + } + if (columnName.length === 0) { + return undefined; + } + const safeName = columnName.replace(/[^a-zA-Z0-9_]/g, '_'); + return /^[0-9]/.test(safeName) ? `Column${safeName}` : safeName; +} +// Returns the column list from a header row. We skip values that are not strings. + +export function columnsFromHeader( + tempColumnNames: string[], + headerObject: { [key: string]: unknown } +): Array { + const maxIndex = tempColumnNames.findLastIndex( + (columnName) => headerObject[columnName] !== undefined + ); + return tempColumnNames + .slice(0, maxIndex + 1) + .map((columnName) => headerObject[columnName]) + .map(toSafeColumnName); +} +// Count the number of columns actually present in the rows. + +export function totalColumnCount( + tempColumnNames: string[], + csvRows: Array<{ [key: string]: unknown }> +): number { + return ( + Math.max( + -1, + ...csvRows.map((row) => + tempColumnNames.findLastIndex((columnName) => row[columnName] !== undefined) + ) + ) + 1 + ); +} +// Prefixes each column with the provided prefixes, separated by a period. +export function prefixColumns(columns: string[], prefixes: string[]): string[] { + return columns.map((column) => [...prefixes, column].join('.')); +} +/** + * Generates a list of unique column names based on preferred and fallback names. + * + * The preferred names are used first, followed by the fallback names. It is required that + * there are enough fallback names to cover the number of unique column names needed. + * + * The resulting column names are guaranteed to be unique. If a column name is already in use, + * a postfix like _2, _3 and so on is added to the name to make it unique. + * + * @generator + * @param {number} count - The number of unique column names to generate. + * @param {Array>} preferredNames - A 2D array where each sub-array contains a list of names. + * @param {string[]} fallbackNames - An array of fallback names to use if no preferred name is defined. + * @yields {string} - A sequence of column names, such that no two are the same. + */ + +export function* yieldUniqueColumnNames( + count: number, + preferredNames: Array>, + fallbackNames: string[] +): Generator { + const knownNames = new Set(); + + for (let i = 0; i < count; i++) { + let selectedName: string = fallbackNames[i]; + + for (const nameList of preferredNames) { + const name = nameList[i]; + if (name) { + selectedName = name; + break; + } + } + + let postfixString = ''; + + if (knownNames.has(selectedName)) { + for (let postfix = 2; ; postfix++) { + postfixString = `_${postfix}`; + if (!knownNames.has(selectedName + postfixString)) { + break; + } + } + } + + selectedName += postfixString; + knownNames.add(selectedName); + yield selectedName; + } +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/csv/csv.ts b/x-pack/plugins/integration_assistant/server/graphs/csv/csv.ts new file mode 100644 index 0000000000000..d753fd7995688 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/csv/csv.ts @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { LogFormatDetectionState } from '../../types'; +import type { LogDetectionNodeParams } from '../log_type_detection/types'; +import { createJSONInput } from '../../util'; +import { createCSVProcessor, createDropProcessor } from '../../util/processors'; +import { CSVParseError, UnparseableCSVFormatError } from '../../lib/errors/unparseable_csv_error'; +import { + generateColumnNames, + upperBoundForColumnCount, + columnsFromHeader, + toSafeColumnName, + totalColumnCount, + yieldUniqueColumnNames, + prefixColumns, +} from './columns'; + +// We will only create the processor for the first MAX_CSV_COLUMNS columns. +const MAX_CSV_COLUMNS = 100; + +// Converts CSV samples into JSON samples. +export async function handleCSV({ + state, + client, +}: LogDetectionNodeParams): Promise> { + const packageName = state.packageName; + const dataStreamName = state.dataStreamName; + + const samples = state.logSamples; + const temporaryColumns = generateColumnNames( + Math.min(upperBoundForColumnCount(samples), MAX_CSV_COLUMNS) + ); + const temporaryProcessor = createCSVProcessor('message', temporaryColumns); + + const { pipelineResults: tempResults, errors: tempErrors } = await createJSONInput( + [temporaryProcessor], + samples, + client + ); + + if (tempErrors.length > 0) { + throw new UnparseableCSVFormatError(tempErrors as CSVParseError[]); + } + + const headerColumns = state.samplesFormat.header + ? columnsFromHeader(temporaryColumns, tempResults[0]) + : []; + const llmProvidedColumns = (state.samplesFormat.columns || []).map(toSafeColumnName); + const needColumns = totalColumnCount(temporaryColumns, tempResults); + const columns: string[] = Array.from( + yieldUniqueColumnNames(needColumns, [llmProvidedColumns, headerColumns], temporaryColumns) + ); + + const prefix = [packageName, dataStreamName]; + const prefixedColumns = prefixColumns(columns, prefix); + const csvProcessor = createCSVProcessor('message', prefixedColumns); + const csvHandlingProcessors = [csvProcessor]; + + if (headerColumns.length > 0) { + const dropValues = columns.reduce((acc, column, index) => { + if (headerColumns[index] !== undefined) { + acc[column] = String(headerColumns[index]); + } + return acc; + }, {} as Record); + const dropProcessor = createDropProcessor( + dropValues, + prefix, + 'remove_csv_header', + 'Remove the CSV header line by comparing the values' + ); + csvHandlingProcessors.push(dropProcessor); + } + + const { pipelineResults: finalResults, errors: finalErrors } = await createJSONInput( + csvHandlingProcessors, + samples, + client + ); + + if (finalErrors.length > 0) { + throw new UnparseableCSVFormatError(finalErrors as CSVParseError[]); + } + + // Converts JSON Object into a string and parses it as a array of JSON strings + const jsonSamples = finalResults + .map((log) => log[packageName]) + .map((log) => (log as Record)[dataStreamName]) + .map((log) => JSON.stringify(log)); + + return { + jsonSamples, + additionalProcessors: [...state.additionalProcessors, ...csvHandlingProcessors], + lastExecutedChain: 'handleCSV', + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/constants.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/constants.ts index 183898ec31354..46ed44db01db4 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/constants.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/constants.ts @@ -26,16 +26,6 @@ export const KV_HEADER_ERROR_EXAMPLE_ANSWER = { '%{TIMESTAMP:cisco.audit.timestamp}:%{WORD:cisco.audit.value1};%{WORD:cisco.audit.key2}:%{WORD:cisco.audit.value2}:%{GREEDYDATA:message}', }; -export const onFailure = { - append: { - field: 'error.message', - value: - '{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}', - }, -}; - -export const removeProcessor = { remove: { field: 'message', ignore_missing: true } }; - export const COMMON_ERRORS = [ { error: 'field [message] does not contain value_split [=]', diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/graph.test.ts index 4b995c9b8f31f..9c16cf2fb7864 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/graph.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/graph.test.ts @@ -29,11 +29,7 @@ describe('KVGraph', () => { it('Ensures that the graph compiles', async () => { // When getKVGraph runs, langgraph compiles the graph it will error if the graph has any issues. // Common issues for example detecting a node has no next step, or there is a infinite loop between them. - try { - await getKVGraph({ model, client }); - } catch (error) { - fail(`getKVGraph threw an error: ${error}`); - } + await getKVGraph({ model, client }); }); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts index e130a69910076..6781f5cfa46d9 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts @@ -10,8 +10,11 @@ import { ESProcessorItem } from '../../../common'; import type { KVState } from '../../types'; import type { HandleKVNodeParams } from './types'; import { testPipeline } from '../../util'; -import { onFailure, removeProcessor } from './constants'; -import { createGrokProcessor } from '../../util/processors'; +import { + createGrokProcessor, + createPassthroughFailureProcessor, + createRemoveProcessor, +} from '../../util/processors'; interface StructuredLogResult { [packageName: string]: { [dataStreamName: string]: unknown }; @@ -65,7 +68,7 @@ export async function handleHeaderValidate({ }: HandleKVNodeParams): Promise> { const grokPattern = state.grokPattern; const grokProcessor = createGrokProcessor([grokPattern]); - const pipeline = { processors: grokProcessor, on_failure: [onFailure] }; + const pipeline = { processors: grokProcessor, on_failure: [createPassthroughFailureProcessor()] }; const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as { pipelineResults: GrokResult[]; @@ -94,7 +97,10 @@ async function verifyKVProcessor( client: IScopedClusterClient ): Promise<{ errors: object[] }> { // This processor removes the original message field in the output - const pipeline = { processors: [kvProcessor[0], removeProcessor], on_failure: [onFailure] }; + const pipeline = { + processors: [kvProcessor[0], createRemoveProcessor()], + on_failure: [createPassthroughFailureProcessor()], + }; const { errors } = await testPipeline(formattedSamples, pipeline, client); return { errors }; } @@ -104,7 +110,10 @@ async function buildJSONSamples( processors: object[], client: IScopedClusterClient ): Promise { - const pipeline = { processors: [...processors, removeProcessor], on_failure: [onFailure] }; + const pipeline = { + processors: [...processors, createRemoveProcessor()], + on_failure: [createPassthroughFailureProcessor()], + }; const { pipelineResults } = (await testPipeline(samples, pipeline, client)) as { pipelineResults: StructuredLogResult[]; }; diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts index ca29ba284fc06..065daa5268cb6 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/constants.ts @@ -5,7 +5,10 @@ * 2.0. */ -export const EX_ANSWER_LOG_TYPE = { - log_type: 'structured', - header: true, +import { SamplesFormat } from '../../../common'; + +export const EX_ANSWER_LOG_TYPE: SamplesFormat = { + name: 'csv', + header: false, + columns: ['ip', 'timestamp', 'request', 'status', '', 'bytes'], }; diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts index df78f1b9a0489..a0230e0347af8 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.test.ts @@ -13,17 +13,26 @@ import { FakeLLM } from '@langchain/core/utils/testing'; import { logFormatDetectionTestState } from '../../../__jest__/fixtures/log_type_detection'; import type { LogFormatDetectionState } from '../../types'; import { handleLogFormatDetection } from './detection'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; const model = new FakeLLM({ - response: '{ "log_type": "structured"}', + response: '{ "name": "structured"}', }) as unknown as ActionsClientChatOpenAI | ActionsClientSimpleChatModel; const state: LogFormatDetectionState = logFormatDetectionTestState; describe('Testing log type detection handler', () => { it('handleLogFormatDetection()', async () => { - const response = await handleLogFormatDetection({ state, model }); - expect(response.samplesFormat).toStrictEqual({ name: 'structured' }); + const client = { + asCurrentUser: { + ingest: { + simulate: jest.fn(), + }, + }, + } as unknown as IScopedClusterClient; + + const response = await handleLogFormatDetection({ state, model, client }); + expect(response.samplesFormat).toStrictEqual({ name: 'structured', header: false }); expect(response.lastExecutedChain).toBe('logFormatDetection'); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts index 4920adb609967..a8334432a0211 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/detection.ts @@ -8,6 +8,7 @@ import { JsonOutputParser } from '@langchain/core/output_parsers'; import type { LogFormatDetectionState } from '../../types'; import { LOG_FORMAT_DETECTION_PROMPT } from './prompts'; import type { LogDetectionNodeParams } from './types'; +import { SamplesFormat } from '../../../common'; const MaxLogSamplesInPrompt = 5; @@ -23,13 +24,23 @@ export async function handleLogFormatDetection({ ? state.logSamples.slice(0, MaxLogSamplesInPrompt) : state.logSamples; - const detectedLogFormatAnswer = await logFormatDetectionNode.invoke({ + const logFormatDetectionResult = await logFormatDetectionNode.invoke({ ex_answer: state.exAnswer, log_samples: samples, + package_title: state.packageTitle, + datastream_title: state.dataStreamTitle, }); - const logFormat = detectedLogFormatAnswer.log_type; - const header = detectedLogFormatAnswer.header; + let samplesFormat: SamplesFormat = { name: 'unsupported' }; - return { samplesFormat: { name: logFormat }, header, lastExecutedChain: 'logFormatDetection' }; + try { + samplesFormat = SamplesFormat.parse(logFormatDetectionResult); + if (samplesFormat.header === undefined) { + samplesFormat.header = false; + } + } catch (error) { + // If the LLM fails to produce the output of specified format, we will default to unsupported. + } + + return { samplesFormat, header: samplesFormat.header, lastExecutedChain: 'logFormatDetection' }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts index 361852f051e50..a8a8494de7626 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.test.ts @@ -29,11 +29,7 @@ describe('LogFormatDetectionGraph', () => { it('Ensures that the graph compiles', async () => { // When getLogFormatDetectionGraph runs, langgraph compiles the graph it will error if the graph has any issues. // Common issues for example detecting a node has no next step, or there is a infinite loop between them. - try { - await getLogFormatDetectionGraph({ model, client }); - } catch (error) { - fail(`getLogFormatDetectionGraph threw an error: ${error}`); - } + await getLogFormatDetectionGraph({ model, client }); }); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts index 4a3f2e2536266..95d624a7436c7 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/graph.ts @@ -10,6 +10,7 @@ import { END, START, StateGraph } from '@langchain/langgraph'; import type { LogFormatDetectionState } from '../../types'; import { EX_ANSWER_LOG_TYPE } from './constants'; import { handleLogFormatDetection } from './detection'; +import { handleCSV } from '../csv/csv'; import { ESProcessorItem, SamplesFormat } from '../../../common'; import { getKVGraph } from '../kv/graph'; import { LogDetectionGraphParams, LogDetectionBaseNodeParams } from './types'; @@ -29,6 +30,14 @@ const graphState: StateGraphArgs['channels'] = { value: (x: string, y?: string) => y ?? x, default: () => '', }, + packageTitle: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + dataStreamTitle: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, logSamples: { value: (x: string[], y?: string[]) => y ?? x, default: () => [], @@ -94,9 +103,9 @@ function logFormatRouter({ state }: LogDetectionBaseNodeParams): string { if (state.samplesFormat.name === LogFormat.UNSTRUCTURED) { return 'unstructured'; } - // if (state.samplesFormat === LogFormat.CSV) { - // return 'csv'; - // } + if (state.samplesFormat.name === LogFormat.CSV) { + return 'csv'; + } return 'unsupported'; } @@ -107,15 +116,16 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection .addNode('modelInput', (state: LogFormatDetectionState) => modelInput({ state })) .addNode('modelOutput', (state: LogFormatDetectionState) => modelOutput({ state })) .addNode('handleLogFormatDetection', (state: LogFormatDetectionState) => - handleLogFormatDetection({ state, model }) + handleLogFormatDetection({ state, model, client }) ) .addNode('handleKVGraph', await getKVGraph({ model, client })) .addNode('handleUnstructuredGraph', await getUnstructuredGraph({ model, client })) - // .addNode('handleCsvGraph', (state: LogFormatDetectionState) => getCompiledCsvGraph({state, model})) + .addNode('handleCSV', (state: LogFormatDetectionState) => handleCSV({ state, model, client })) .addEdge(START, 'modelInput') .addEdge('modelInput', 'handleLogFormatDetection') .addEdge('handleKVGraph', 'modelOutput') .addEdge('handleUnstructuredGraph', 'modelOutput') + .addEdge('handleCSV', 'modelOutput') .addEdge('modelOutput', END) .addConditionalEdges( 'handleLogFormatDetection', @@ -123,7 +133,7 @@ export async function getLogFormatDetectionGraph({ model, client }: LogDetection { structured: 'handleKVGraph', unstructured: 'handleUnstructuredGraph', - // csv: 'handleCsvGraph', + csv: 'handleCSV', unsupported: 'modelOutput', } ); diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts index 74ba8f719f875..71246d46363cb 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/prompts.ts @@ -8,30 +8,27 @@ import { ChatPromptTemplate } from '@langchain/core/prompts'; export const LOG_FORMAT_DETECTION_PROMPT = ChatPromptTemplate.fromMessages([ [ 'system', - `You are a helpful, expert assistant in identifying different log types based on the format. - -Here is some context for you to reference for your task, read it carefully as you will get questions about it later: - - -{log_samples} - -`, + `You are a helpful, expert assistant specializing in all things logs. You're great at analyzing log samples.`, ], [ 'human', - `Looking at the log samples , our goal is to identify the syslog type based on the guidelines below. -Follow these steps to identify the log format type: -1. Go through each log sample and identify the log format type. + `The current task is to identify the log format from the provided samples based on the guidelines below. + +The samples apply to the data stream {datastream_title} inside the integration package {package_title}. + +Follow these steps to do this: +1. Go through each log sample and identify the log format. Output this as "name: ". 2. If the samples have any or all of priority, timestamp, loglevel, hostname, ipAddress, messageId in the beginning information then set "header: true". 3. If the samples have a syslog header then set "header: true" , else set "header: false". If you are unable to determine the syslog header presence then set "header: false". -4. If the log samples have structured message body with key-value pairs then classify it as "log_type: structured". Look for a flat list of key-value pairs, often separated by spaces, commas, or other delimiters. +4. If the log samples have structured message body with key-value pairs then classify it as "name: structured". Look for a flat list of key-value pairs, often separated by spaces, commas, or other delimiters. 5. Consider variations in formatting, such as quotes around values ("key=value", key="value"), special characters in keys or values, or escape sequences. -6. If the log samples have unstructured body like a free-form text then classify it as "log_type: unstructured". -7. If the log samples follow a csv format then classify it as "log_type: csv". -8. If the samples are identified as "csv" and there is a csv header then set "header: true" , else set "header: false". -9. If you do not find the log format in any of the above categories then classify it as "log_type: unsupported". +6. If the log samples have unstructured body like a free-form text then classify it as "name: unstructured". +7. If the log samples follow a csv format then classify it with "name: csv". There are two sub-cases for csv: + a. If there is a csv header then set "header: true". + b. If there is no csv header then set "header: false" and try to find good names for the columns in the "columns" array by looking into the values of data in those columns. For each column, if you are unable to find good name candidate for it then output an empty string, like in the example. +8. If you cannot put the format into any of the above categories then classify it with "name: unsupported". - You ALWAYS follow these guidelines when writing your response: +You ALWAYS follow these guidelines when writing your response: - Do not respond with anything except the updated current mapping JSON object enclosed with 3 backticks (\`). See example response below. @@ -42,7 +39,13 @@ A: Please find the JSON object below: \`\`\`json {ex_answer} \`\`\` -`, + + +Please process these log samples: + +{log_samples} + +`, ], ['ai', 'Please find the JSON object below:'], ]); diff --git a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts index 8f73988ec30cc..a076b17a160f1 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/log_type_detection/types.ts @@ -14,6 +14,7 @@ export interface LogDetectionBaseNodeParams { export interface LogDetectionNodeParams extends LogDetectionBaseNodeParams { model: ChatModels; + client: IScopedClusterClient; } export interface LogDetectionGraphParams { diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts index b0e36de9be85d..beba111e39a18 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/constants.ts @@ -17,11 +17,3 @@ export const GROK_ERROR_EXAMPLE_ANSWER = { '%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}', ], }; - -export const onFailure = { - append: { - field: 'error.message', - value: - '{% raw %}Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}{% endraw %}', - }, -}; diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts index 60a9bdc4329de..456adb8eebcc6 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/graph.test.ts @@ -29,11 +29,7 @@ describe('UnstructuredGraph', () => { it('Ensures that the graph compiles', async () => { // When getUnstructuredGraph runs, langgraph compiles the graph it will error if the graph has any issues. // Common issues for example detecting a node has no next step, or there is a infinite loop between them. - try { - await getUnstructuredGraph({ model, client }); - } catch (error) { - fail(`getUnstructuredGraph threw an error: ${error}`); - } + await getUnstructuredGraph({ model, client }); }); }); }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts index eea7602b641d6..b616f60080150 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/unstructured/validate.ts @@ -8,8 +8,7 @@ import type { UnstructuredLogState } from '../../types'; import type { HandleUnstructuredNodeParams, LogResult } from './types'; import { testPipeline } from '../../util'; -import { onFailure } from './constants'; -import { createGrokProcessor } from '../../util/processors'; +import { createGrokProcessor, createPassthroughFailureProcessor } from '../../util/processors'; export async function handleUnstructuredValidate({ state, @@ -17,10 +16,10 @@ export async function handleUnstructuredValidate({ }: HandleUnstructuredNodeParams): Promise> { const grokPatterns = state.grokPatterns; const grokProcessor = createGrokProcessor(grokPatterns); - const pipeline = { processors: grokProcessor, on_failure: [onFailure] }; + const pipeline = { processors: grokProcessor, on_failure: [createPassthroughFailureProcessor()] }; + const packageName = state.packageName; const dataStreamName = state.dataStreamName; - const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as { pipelineResults: LogResult[]; errors: object[]; diff --git a/x-pack/plugins/integration_assistant/server/lib/errors/unparseable_csv_error.ts b/x-pack/plugins/integration_assistant/server/lib/errors/unparseable_csv_error.ts new file mode 100644 index 0000000000000..ab4010707d664 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/lib/errors/unparseable_csv_error.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { KibanaResponseFactory } from '@kbn/core/server'; +import { ErrorThatHandlesItsOwnResponse } from './types'; +import { GenerationErrorCode } from '../../../common/constants'; +import { + GenerationErrorAttributes, + GenerationErrorBody, +} from '../../../common/api/generation_error'; + +const errorCode = GenerationErrorCode.UNPARSEABLE_CSV_DATA; + +export interface CSVParseError { + message: string[]; +} + +export class UnparseableCSVFormatError extends Error implements ErrorThatHandlesItsOwnResponse { + attributes: GenerationErrorAttributes; + + constructor(csvParseErrors: CSVParseError[]) { + super(errorCode); + this.attributes = { + errorCode, + underlyingMessages: csvParseErrors.flatMap((error) => error.message), + }; + } + + public sendResponse(res: KibanaResponseFactory) { + const body: GenerationErrorBody = { + message: errorCode, + attributes: this.attributes, + }; + return res.customError({ + statusCode: 422, + body, + }); + } +} diff --git a/x-pack/plugins/integration_assistant/server/lib/errors/unsupported_error.ts b/x-pack/plugins/integration_assistant/server/lib/errors/unsupported_error.ts index 79c4f2ccf69a1..7ab4e0569ca83 100644 --- a/x-pack/plugins/integration_assistant/server/lib/errors/unsupported_error.ts +++ b/x-pack/plugins/integration_assistant/server/lib/errors/unsupported_error.ts @@ -7,10 +7,10 @@ import { KibanaResponseFactory } from '@kbn/core/server'; import { ErrorThatHandlesItsOwnResponse } from './types'; -import { ErrorCode } from '../../../common/constants'; +import { GenerationErrorCode } from '../../../common/constants'; export class UnsupportedLogFormatError extends Error implements ErrorThatHandlesItsOwnResponse { - private readonly errorCode: string = ErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT; + private readonly errorCode: string = GenerationErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT; // eslint-disable-next-line @typescript-eslint/no-useless-constructor constructor(message: string) { diff --git a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts index 2f0f3db47a7a9..639cd62f275b1 100644 --- a/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/analyze_logs_routes.ts @@ -18,7 +18,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation'; import { withAvailability } from './with_availability'; import { isErrorThatHandlesItsOwnResponse, UnsupportedLogFormatError } from '../lib/errors'; import { handleCustomErrors } from './routes_util'; -import { ErrorCode } from '../../common/constants'; +import { GenerationErrorCode } from '../../common/constants'; export function registerAnalyzeLogsRoutes( router: IRouter @@ -43,7 +43,14 @@ export function registerAnalyzeLogsRoutes( }, }, withAvailability(async (context, req, res): Promise> => { - const { packageName, dataStreamName, logSamples, langSmithOptions } = req.body; + const { + packageName, + dataStreamName, + packageTitle, + dataStreamTitle, + logSamples, + langSmithOptions, + } = req.body; const services = await context.resolve(['core']); const { client } = services.core.elasticsearch; const { getStartServices, logger } = await context.integrationAssistant; @@ -79,18 +86,20 @@ export function registerAnalyzeLogsRoutes( const logFormatParameters = { packageName, dataStreamName, + packageTitle, + dataStreamTitle, logSamples, }; const graph = await getLogFormatDetectionGraph({ model, client }); const graphResults = await graph.invoke(logFormatParameters, options); const graphLogFormat = graphResults.results.samplesFormat.name; - if (graphLogFormat === 'unsupported' || graphLogFormat === 'csv') { - throw new UnsupportedLogFormatError(ErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT); + if (graphLogFormat === 'unsupported') { + throw new UnsupportedLogFormatError(GenerationErrorCode.UNSUPPORTED_LOG_SAMPLES_FORMAT); } return res.ok({ body: AnalyzeLogsResponse.parse(graphResults) }); } catch (err) { try { - handleCustomErrors(err, ErrorCode.RECURSION_LIMIT_ANALYZE_LOGS); + handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT_ANALYZE_LOGS); } catch (e) { if (isErrorThatHandlesItsOwnResponse(e)) { return e.sendResponse(res); diff --git a/x-pack/plugins/integration_assistant/server/routes/build_integration_routes.ts b/x-pack/plugins/integration_assistant/server/routes/build_integration_routes.ts index 1a7ecb58a2062..6d7e5155a3d23 100644 --- a/x-pack/plugins/integration_assistant/server/routes/build_integration_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/build_integration_routes.ts @@ -13,7 +13,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation'; import { withAvailability } from './with_availability'; import { isErrorThatHandlesItsOwnResponse } from '../lib/errors'; import { handleCustomErrors } from './routes_util'; -import { ErrorCode } from '../../common/constants'; +import { GenerationErrorCode } from '../../common/constants'; export function registerIntegrationBuilderRoutes( router: IRouter ) { @@ -42,7 +42,7 @@ export function registerIntegrationBuilderRoutes( }); } catch (err) { try { - handleCustomErrors(err, ErrorCode.RECURSION_LIMIT); + handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT); } catch (e) { if (isErrorThatHandlesItsOwnResponse(e)) { return e.sendResponse(response); diff --git a/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts b/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts index 635ef08dcdf9c..c437f6fc35546 100644 --- a/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/categorization_routes.ts @@ -22,7 +22,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation'; import { withAvailability } from './with_availability'; import { isErrorThatHandlesItsOwnResponse } from '../lib/errors'; import { handleCustomErrors } from './routes_util'; -import { ErrorCode } from '../../common/constants'; +import { GenerationErrorCode } from '../../common/constants'; export function registerCategorizationRoutes( router: IRouter @@ -103,7 +103,7 @@ export function registerCategorizationRoutes( return res.ok({ body: CategorizationResponse.parse(results) }); } catch (err) { try { - handleCustomErrors(err, ErrorCode.RECURSION_LIMIT); + handleCustomErrors(err, GenerationErrorCode.RECURSION_LIMIT); } catch (e) { if (isErrorThatHandlesItsOwnResponse(e)) { return e.sendResponse(res); diff --git a/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts b/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts index 12d77c66a1132..43ca0fe396cae 100644 --- a/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts +++ b/x-pack/plugins/integration_assistant/server/routes/ecs_routes.ts @@ -18,7 +18,7 @@ import { buildRouteValidationWithZod } from '../util/route_validation'; import { withAvailability } from './with_availability'; import { isErrorThatHandlesItsOwnResponse } from '../lib/errors'; import { handleCustomErrors } from './routes_util'; -import { ErrorCode } from '../../common/constants'; +import { GenerationErrorCode } from '../../common/constants'; export function registerEcsRoutes(router: IRouter) { router.versioned @@ -97,7 +97,7 @@ export function registerEcsRoutes(router: IRouter) { router.versioned @@ -51,7 +51,7 @@ export function registerPipelineRoutes(router: IRouter) { router.versioned @@ -94,7 +94,7 @@ export function registerRelatedRoutes(router: IRouter { it('should throw a RecursionLimitError when given a GraphRecursionError', () => { const errorMessage = 'Recursion limit exceeded'; - const errorCode = ErrorCode.RECURSION_LIMIT; + const errorCode = GenerationErrorCode.RECURSION_LIMIT; const recursionError = new GraphRecursionError(errorMessage); expect(() => { @@ -26,7 +26,7 @@ describe('handleError', () => { it('should rethrow the error when given an error that is not a GraphRecursionError', () => { const errorMessage = 'Some other error'; - const errorCode = ErrorCode.RECURSION_LIMIT; + const errorCode = GenerationErrorCode.RECURSION_LIMIT; const otherError = new Error(errorMessage); expect(() => { diff --git a/x-pack/plugins/integration_assistant/server/routes/routes_util.ts b/x-pack/plugins/integration_assistant/server/routes/routes_util.ts index 5622392cd06a9..9773bb42bba6c 100644 --- a/x-pack/plugins/integration_assistant/server/routes/routes_util.ts +++ b/x-pack/plugins/integration_assistant/server/routes/routes_util.ts @@ -6,7 +6,7 @@ */ import { GraphRecursionError } from '@langchain/langgraph'; -import { ErrorCode } from '../../common/constants'; +import { GenerationErrorCode } from '../../common/constants'; import { RecursionLimitError } from '../lib/errors'; /** @@ -21,7 +21,9 @@ import { RecursionLimitError } from '../lib/errors'; */ export function handleCustomErrors( err: Error, - recursionErrorCode: ErrorCode.RECURSION_LIMIT | ErrorCode.RECURSION_LIMIT_ANALYZE_LOGS + recursionErrorCode: + | GenerationErrorCode.RECURSION_LIMIT + | GenerationErrorCode.RECURSION_LIMIT_ANALYZE_LOGS ) { if (err instanceof GraphRecursionError) { throw new RecursionLimitError(err.message, recursionErrorCode); diff --git a/x-pack/plugins/integration_assistant/server/types.ts b/x-pack/plugins/integration_assistant/server/types.ts index 3c17761c495b0..a8f0d86a925ba 100644 --- a/x-pack/plugins/integration_assistant/server/types.ts +++ b/x-pack/plugins/integration_assistant/server/types.ts @@ -109,6 +109,8 @@ export interface LogFormatDetectionState { lastExecutedChain: string; packageName: string; dataStreamName: string; + packageTitle: string; + dataStreamTitle: string; logSamples: string[]; jsonSamples: string[]; exAnswer: string; @@ -117,7 +119,7 @@ export interface LogFormatDetectionState { header: boolean; ecsVersion: string; results: object; - additionalProcessors: ESProcessorItem[]; // # This will be generated in the sub-graphs + additionalProcessors: ESProcessorItem[]; // Generated in handleXXX nodes or subgraphs. } export interface KVState { diff --git a/x-pack/plugins/integration_assistant/server/util/index.ts b/x-pack/plugins/integration_assistant/server/util/index.ts index 9017f6e216ea8..019cea1888502 100644 --- a/x-pack/plugins/integration_assistant/server/util/index.ts +++ b/x-pack/plugins/integration_assistant/server/util/index.ts @@ -17,5 +17,5 @@ export { export { generateFields, mergeSamples } from './samples'; export { deepCopy, generateUniqueId } from './util'; -export { testPipeline } from './pipeline'; +export { testPipeline, createJSONInput } from './pipeline'; export { combineProcessors } from './processors'; diff --git a/x-pack/plugins/integration_assistant/server/util/pipeline.ts b/x-pack/plugins/integration_assistant/server/util/pipeline.ts index c9c58c78b6c9a..5df0ad0ea4917 100644 --- a/x-pack/plugins/integration_assistant/server/util/pipeline.ts +++ b/x-pack/plugins/integration_assistant/server/util/pipeline.ts @@ -5,6 +5,8 @@ * 2.0. */ import type { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { ESProcessorItem } from '../../common'; +import { createPassthroughFailureProcessor, createRemoveProcessor } from './processors'; interface DocTemplate { _index: string; @@ -29,15 +31,17 @@ export async function testPipeline( samples: string[], pipeline: object, client: IScopedClusterClient -): Promise<{ pipelineResults: object[]; errors: object[] }> { +): Promise<{ pipelineResults: Array<{ [key: string]: unknown }>; errors: object[] }> { const docs = samples.map((sample) => formatSample(sample)); - const pipelineResults: object[] = []; + const pipelineResults: Array<{ [key: string]: unknown }> = []; const errors: object[] = []; try { const output = await client.asCurrentUser.ingest.simulate({ docs, pipeline }); for (const doc of output.docs) { - if (doc.doc?._source?.error) { + if (!doc) { + // Nothing to do – the document was dropped. + } else if (doc.doc?._source?.error) { errors.push(doc.doc._source.error); } else if (doc.doc?._source) { pipelineResults.push(doc.doc._source); @@ -49,3 +53,16 @@ export async function testPipeline( return { pipelineResults, errors }; } + +export async function createJSONInput( + processors: ESProcessorItem[], + formattedSamples: string[], + client: IScopedClusterClient +): Promise<{ pipelineResults: Array<{ [key: string]: unknown }>; errors: object[] }> { + const pipeline = { + processors: [...processors, createRemoveProcessor()], + on_failure: [createPassthroughFailureProcessor()], + }; + const { pipelineResults, errors } = await testPipeline(formattedSamples, pipeline, client); + return { pipelineResults, errors }; +} diff --git a/x-pack/plugins/integration_assistant/server/util/processors.ts b/x-pack/plugins/integration_assistant/server/util/processors.ts index b2e6b1683482a..8e5f63db7f156 100644 --- a/x-pack/plugins/integration_assistant/server/util/processors.ts +++ b/x-pack/plugins/integration_assistant/server/util/processors.ts @@ -77,3 +77,64 @@ export function createKVProcessor(kvInput: KVProcessor, state: KVState): ESProce const kvProcessor = safeLoad(renderedTemplate) as ESProcessorItem; return kvProcessor; } + +// Processor for the csv input to convert it to JSON. +export function createCSVProcessor(source: string, targets: string[]): ESProcessorItem { + return { + csv: { + field: source, + target_fields: targets, + description: 'Parse CSV input', + tag: 'parse_csv', + }, + }; +} + +// Trivial processor for the on_failure part of the pipeline. +// Use only if the source of error is not necessary. +export function createPassthroughFailureProcessor(): ESProcessorItem { + return { + append: { + field: 'error.message', + description: 'Append the error message as-is', + tag: 'append_error_message', + value: '{{{_ingest.on_failure_message}}}', + }, + }; +} + +// Processor to remove the message field. +export function createRemoveProcessor(): ESProcessorItem { + return { + remove: { + field: 'message', + ignore_missing: true, + description: 'Remove the message field', + tag: 'remove_message_field', + }, + }; +} + +// Processor to drop the specific values. +// values is a record of key value pairs to match against the fields +// root is the root of the fields to match against +export function createDropProcessor( + values: Record, + prefix: string[], + tag: string, + description: string +): ESProcessorItem { + const prefixExpression = prefix.join('?.'); + const conditions = Object.entries(values) + .map(([key, value]) => `ctx.${prefixExpression}?.${key} == '${String(value)}'`) + .join(' && '); + + return { + drop: { + if: conditions, + ignore_failure: true, + description, + tag, + }, + }; +} diff --git a/x-pack/plugins/translations/translations/fr-FR.json b/x-pack/plugins/translations/translations/fr-FR.json index 6f32ddce9464a..ac217a445f29f 100644 --- a/x-pack/plugins/translations/translations/fr-FR.json +++ b/x-pack/plugins/translations/translations/fr-FR.json @@ -24777,7 +24777,6 @@ "xpack.integrationAssistant.step.dataStream.integrationNameDescription": "Le nom du package est utilisé pour faire référence à l'intégration dans le pipeline d'ingestion d'Elastic", "xpack.integrationAssistant.step.dataStream.integrationNameTitle": "Définir le nom du package", "xpack.integrationAssistant.step.dataStream.logsSample.description": "Glissez et déposez un fichier ou parcourez les fichiers.", - "xpack.integrationAssistant.step.dataStream.logsSample.description2": "Format JSON/NDJSON", "xpack.integrationAssistant.step.dataStream.logsSample.errorCanNotRead": "Impossible de lire le fichier de logs exemple", "xpack.integrationAssistant.step.dataStream.logsSample.errorEmpty": "Le fichier de logs exemple est vide", "xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "Le fichier de logs exemple n'est pas un tableau", diff --git a/x-pack/plugins/translations/translations/ja-JP.json b/x-pack/plugins/translations/translations/ja-JP.json index 0e07e97ddebb7..9c1c339f3aff8 100644 --- a/x-pack/plugins/translations/translations/ja-JP.json +++ b/x-pack/plugins/translations/translations/ja-JP.json @@ -24524,7 +24524,6 @@ "xpack.integrationAssistant.step.dataStream.integrationNameDescription": "このパッケージ名は、Elasticのインジェストパイプラインの統合を参照するために使用されます", "xpack.integrationAssistant.step.dataStream.integrationNameTitle": "パッケージ名を定義", "xpack.integrationAssistant.step.dataStream.logsSample.description": "ファイルをドラッグアンドドロップするか、ファイルを参照します", - "xpack.integrationAssistant.step.dataStream.logsSample.description2": "JSON/NDJSON形式", "xpack.integrationAssistant.step.dataStream.logsSample.errorCanNotRead": "ログサンプルファイルを読み取れませんでした", "xpack.integrationAssistant.step.dataStream.logsSample.errorEmpty": "ログサンプルファイルが空です", "xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "ログサンプルファイルは配列ではありません", diff --git a/x-pack/plugins/translations/translations/zh-CN.json b/x-pack/plugins/translations/translations/zh-CN.json index a6b6fd6524b59..eeaed8b7cdb4f 100644 --- a/x-pack/plugins/translations/translations/zh-CN.json +++ b/x-pack/plugins/translations/translations/zh-CN.json @@ -24558,7 +24558,6 @@ "xpack.integrationAssistant.step.dataStream.integrationNameDescription": "软件包名称用于在 Elastic 采集管道中引用集成", "xpack.integrationAssistant.step.dataStream.integrationNameTitle": "定义软件包名称", "xpack.integrationAssistant.step.dataStream.logsSample.description": "拖放文件或浏览文件。", - "xpack.integrationAssistant.step.dataStream.logsSample.description2": "JSON/NDJSON 格式", "xpack.integrationAssistant.step.dataStream.logsSample.errorCanNotRead": "无法读取日志样例文件", "xpack.integrationAssistant.step.dataStream.logsSample.errorEmpty": "日志样例文件为空", "xpack.integrationAssistant.step.dataStream.logsSample.errorNotArray": "日志样例文件不是数组",