diff --git a/x-pack/plugins/infra/common/alerting/logs/types.ts b/x-pack/plugins/infra/common/alerting/logs/types.ts index cbfffbfd8f940..884a813d74c86 100644 --- a/x-pack/plugins/infra/common/alerting/logs/types.ts +++ b/x-pack/plugins/infra/common/alerting/logs/types.ts @@ -4,6 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ import { i18n } from '@kbn/i18n'; +import * as rt from 'io-ts'; +import { commonSearchSuccessResponseFieldsRT } from '../../utils/elasticsearch_runtime_types'; export const LOG_DOCUMENT_COUNT_ALERT_TYPE_ID = 'logs.alert.document.count'; @@ -20,6 +22,19 @@ export enum Comparator { NOT_MATCH_PHRASE = 'does not match phrase', } +const ComparatorRT = rt.keyof({ + [Comparator.GT]: null, + [Comparator.GT_OR_EQ]: null, + [Comparator.LT]: null, + [Comparator.LT_OR_EQ]: null, + [Comparator.EQ]: null, + [Comparator.NOT_EQ]: null, + [Comparator.MATCH]: null, + [Comparator.NOT_MATCH]: null, + [Comparator.MATCH_PHRASE]: null, + [Comparator.NOT_MATCH_PHRASE]: null, +}); + // Maps our comparators to i18n strings, some comparators have more specific wording // depending on the field type the comparator is being used with. export const ComparatorToi18nMap = { @@ -74,22 +89,78 @@ export enum AlertStates { ERROR, } -export interface DocumentCount { - comparator: Comparator; - value: number; -} +const DocumentCountRT = rt.type({ + comparator: ComparatorRT, + value: rt.number, +}); -export interface Criterion { - field: string; - comparator: Comparator; - value: string | number; -} +export type DocumentCount = rt.TypeOf; -export interface LogDocumentCountAlertParams { - count: DocumentCount; - criteria: Criterion[]; - timeUnit: 's' | 'm' | 'h' | 'd'; - timeSize: number; -} +const CriterionRT = rt.type({ + field: rt.string, + comparator: ComparatorRT, + value: rt.union([rt.string, rt.number]), +}); + +export type Criterion = rt.TypeOf; + +const TimeUnitRT = rt.union([rt.literal('s'), rt.literal('m'), rt.literal('h'), rt.literal('d')]); +export type TimeUnit = rt.TypeOf; + +export const LogDocumentCountAlertParamsRT = rt.intersection([ + rt.type({ + count: DocumentCountRT, + criteria: rt.array(CriterionRT), + timeUnit: TimeUnitRT, + timeSize: rt.number, + }), + rt.partial({ + groupBy: rt.array(rt.string), + }), +]); + +export type LogDocumentCountAlertParams = rt.TypeOf; + +export const UngroupedSearchQueryResponseRT = rt.intersection([ + commonSearchSuccessResponseFieldsRT, + rt.type({ + hits: rt.type({ + total: rt.type({ + value: rt.number, + }), + }), + }), +]); + +export type UngroupedSearchQueryResponse = rt.TypeOf; + +export const GroupedSearchQueryResponseRT = rt.intersection([ + commonSearchSuccessResponseFieldsRT, + rt.type({ + aggregations: rt.type({ + groups: rt.intersection([ + rt.type({ + buckets: rt.array( + rt.type({ + key: rt.record(rt.string, rt.string), + doc_count: rt.number, + filtered_results: rt.type({ + doc_count: rt.number, + }), + }) + ), + }), + rt.partial({ + after_key: rt.record(rt.string, rt.string), + }), + ]), + }), + hits: rt.type({ + total: rt.type({ + value: rt.number, + }), + }), + }), +]); -export type TimeUnit = 's' | 'm' | 'h' | 'd'; +export type GroupedSearchQueryResponse = rt.TypeOf; diff --git a/x-pack/plugins/infra/common/utils/elasticsearch_runtime_types.ts b/x-pack/plugins/infra/common/utils/elasticsearch_runtime_types.ts new file mode 100644 index 0000000000000..a48c65d648b25 --- /dev/null +++ b/x-pack/plugins/infra/common/utils/elasticsearch_runtime_types.ts @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import * as rt from 'io-ts'; + +export const commonSearchSuccessResponseFieldsRT = rt.type({ + _shards: rt.type({ + total: rt.number, + successful: rt.number, + skipped: rt.number, + failed: rt.number, + }), + timed_out: rt.boolean, + took: rt.number, +}); diff --git a/x-pack/plugins/infra/public/components/alerting/logs/expression_editor/editor.tsx b/x-pack/plugins/infra/public/components/alerting/logs/expression_editor/editor.tsx index 9e4e78ca392fd..295e60552cce5 100644 --- a/x-pack/plugins/infra/public/components/alerting/logs/expression_editor/editor.tsx +++ b/x-pack/plugins/infra/public/components/alerting/logs/expression_editor/editor.tsx @@ -22,6 +22,7 @@ import { DocumentCount } from './document_count'; import { Criteria } from './criteria'; import { useSourceId } from '../../../../containers/source_id'; import { LogSourceProvider, useLogSourceContext } from '../../../../containers/logs/log_source'; +import { GroupByExpression } from '../../shared/group_by_expression/group_by_expression'; export interface ExpressionCriteria { field?: string; @@ -121,7 +122,6 @@ export const Editor: React.FC = (props) => { const { setAlertParams, alertParams, errors } = props; const [hasSetDefaults, setHasSetDefaults] = useState(false); const { sourceStatus } = useLogSourceContext(); - useMount(() => { for (const [key, value] of Object.entries({ ...DEFAULT_EXPRESSION, ...alertParams })) { setAlertParams(key, value); @@ -140,6 +140,17 @@ export const Editor: React.FC = (props) => { /* eslint-disable-next-line react-hooks/exhaustive-deps */ }, [sourceStatus]); + const groupByFields = useMemo(() => { + if (sourceStatus?.logIndexFields) { + return sourceStatus.logIndexFields.filter((field) => { + return field.type === 'string' && field.aggregatable; + }); + } else { + return []; + } + /* eslint-disable-next-line react-hooks/exhaustive-deps */ + }, [sourceStatus]); + const updateCount = useCallback( (countParams) => { const nextCountParams = { ...alertParams.count, ...countParams }; @@ -172,6 +183,13 @@ export const Editor: React.FC = (props) => { [setAlertParams] ); + const updateGroupBy = useCallback( + (groups: string[]) => { + setAlertParams('groupBy', groups); + }, + [setAlertParams] + ); + const addCriterion = useCallback(() => { const nextCriteria = alertParams?.criteria ? [...alertParams.criteria, DEFAULT_CRITERIA] @@ -219,6 +237,12 @@ export const Editor: React.FC = (props) => { errors={errors as { [key: string]: string[] }} /> + +
void; + label?: string; +} + +const DEFAULT_GROUP_BY_LABEL = i18n.translate('xpack.infra.alerting.alertFlyout.groupByLabel', { + defaultMessage: 'Group By', +}); + +const EVERYTHING_PLACEHOLDER = i18n.translate( + 'xpack.infra.alerting.alertFlyout.groupBy.placeholder', + { + defaultMessage: 'Nothing (ungrouped)', + } +); + +export const GroupByExpression: React.FC = ({ + selectedGroups = [], + fields, + label, + onChange, +}) => { + const [isPopoverOpen, setIsPopoverOpen] = useState(false); + + const expressionValue = useMemo(() => { + return selectedGroups.length > 0 ? selectedGroups.join(', ') : EVERYTHING_PLACEHOLDER; + }, [selectedGroups]); + + const labelProp = label ?? DEFAULT_GROUP_BY_LABEL; + + return ( + + + setIsPopoverOpen(true)} + /> + } + isOpen={isPopoverOpen} + closePopover={() => setIsPopoverOpen(false)} + ownFocus + panelPaddingSize="s" + anchorPosition="downLeft" + > +
+ {labelProp} + +
+
+
+
+ ); +}; diff --git a/x-pack/plugins/infra/public/components/alerting/shared/group_by_expression/selector.tsx b/x-pack/plugins/infra/public/components/alerting/shared/group_by_expression/selector.tsx new file mode 100644 index 0000000000000..7a6a7ff77335b --- /dev/null +++ b/x-pack/plugins/infra/public/components/alerting/shared/group_by_expression/selector.tsx @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { EuiComboBox } from '@elastic/eui'; +import React, { useCallback, useMemo } from 'react'; +import { IFieldType } from 'src/plugins/data/public'; + +interface Props { + selectedGroups?: string[]; + onChange: (groupBy: string[]) => void; + fields: IFieldType[]; + label: string; + placeholder: string; +} + +export const GroupBySelector = ({ + onChange, + fields, + selectedGroups = [], + label, + placeholder, +}: Props) => { + const handleChange = useCallback( + (selectedOptions: Array<{ label: string }>) => { + const groupBy = selectedOptions.map((option) => option.label); + onChange(groupBy); + }, + [onChange] + ); + + const formattedSelectedGroups = useMemo(() => { + return selectedGroups.map((group) => ({ label: group })); + }, [selectedGroups]); + + const options = useMemo(() => { + return fields.filter((field) => field.aggregatable).map((field) => ({ label: field.name })); + }, [fields]); + + return ( +
+ +
+ ); +}; diff --git a/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts b/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts index 905b7dfa314bd..018e5098a4291 100644 --- a/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts +++ b/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts @@ -60,6 +60,7 @@ export interface InfraDatabaseSearchResponse skipped: number; failed: number; }; + timed_out: boolean; aggregations?: Aggregations; hits: { total: { diff --git a/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.test.ts b/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.test.ts index a3b9e85458416..4f1e81e0b2c40 100644 --- a/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.test.ts +++ b/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.test.ts @@ -55,7 +55,7 @@ services.alertInstanceFactory.mockImplementation((instanceId: string) => { * Helper functions */ function getAlertState(instanceId: string): AlertStates { - const alert = alertInstances.get(instanceId); + const alert = alertInstances.get(`${instanceId}-*`); if (alert) { return alert.state.alertState; } else { @@ -73,11 +73,26 @@ const executor = (createLogThresholdExecutor('test', libsMock) as unknown) as (o // Wrapper to test type Comparison = [number, Comparator, number]; + async function callExecutor( [value, comparator, threshold]: Comparison, criteria: Criterion[] = [] ) { - services.callCluster.mockImplementationOnce(async (..._) => ({ count: value })); + services.callCluster.mockImplementationOnce(async (..._) => ({ + _shards: { + total: 1, + successful: 1, + skipped: 0, + failed: 0, + }, + timed_out: false, + took: 123456789, + hits: { + total: { + value, + }, + }, + })); return await executor({ services, @@ -90,222 +105,427 @@ async function callExecutor( }); } -describe('Comparators trigger alerts correctly', () => { - it('does not alert when counts do not reach the threshold', async () => { - await callExecutor([0, Comparator.GT, 1]); - expect(getAlertState('test')).toBe(AlertStates.OK); +describe('Ungrouped alerts', () => { + describe('Comparators trigger alerts correctly', () => { + it('does not alert when counts do not reach the threshold', async () => { + await callExecutor([0, Comparator.GT, 1]); + expect(getAlertState('test')).toBe(AlertStates.OK); - await callExecutor([0, Comparator.GT_OR_EQ, 1]); - expect(getAlertState('test')).toBe(AlertStates.OK); + await callExecutor([0, Comparator.GT_OR_EQ, 1]); + expect(getAlertState('test')).toBe(AlertStates.OK); - await callExecutor([1, Comparator.LT, 0]); - expect(getAlertState('test')).toBe(AlertStates.OK); + await callExecutor([1, Comparator.LT, 0]); + expect(getAlertState('test')).toBe(AlertStates.OK); - await callExecutor([1, Comparator.LT_OR_EQ, 0]); - expect(getAlertState('test')).toBe(AlertStates.OK); - }); + await callExecutor([1, Comparator.LT_OR_EQ, 0]); + expect(getAlertState('test')).toBe(AlertStates.OK); + }); - it('alerts when counts reach the threshold', async () => { - await callExecutor([2, Comparator.GT, 1]); - expect(getAlertState('test')).toBe(AlertStates.ALERT); + it('alerts when counts reach the threshold', async () => { + await callExecutor([2, Comparator.GT, 1]); + expect(getAlertState('test')).toBe(AlertStates.ALERT); - await callExecutor([1, Comparator.GT_OR_EQ, 1]); - expect(getAlertState('test')).toBe(AlertStates.ALERT); + await callExecutor([1, Comparator.GT_OR_EQ, 1]); + expect(getAlertState('test')).toBe(AlertStates.ALERT); - await callExecutor([1, Comparator.LT, 2]); - expect(getAlertState('test')).toBe(AlertStates.ALERT); + await callExecutor([1, Comparator.LT, 2]); + expect(getAlertState('test')).toBe(AlertStates.ALERT); - await callExecutor([2, Comparator.LT_OR_EQ, 2]); - expect(getAlertState('test')).toBe(AlertStates.ALERT); + await callExecutor([2, Comparator.LT_OR_EQ, 2]); + expect(getAlertState('test')).toBe(AlertStates.ALERT); + }); }); -}); -describe('Comparators create the correct ES queries', () => { - beforeEach(() => { - services.callCluster.mockReset(); - }); + describe('Comparators create the correct ES queries', () => { + beforeEach(() => { + services.callCluster.mockReset(); + }); - it('Works with `Comparator.EQ`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.EQ, value: 'bar' }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ term: { foo: { value: 'bar' } } }], + it('Works with `Comparator.EQ`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.EQ, value: 'bar' }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + term: { + foo: { + value: 'bar', + }, + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.NOT_EQ`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.NOT_EQ, value: 'bar' }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must_not: [{ term: { foo: { value: 'bar' } } }], + it('works with `Comparator.NOT_EQ`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.NOT_EQ, value: 'bar' }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + ], + must_not: [ + { + term: { + foo: { + value: 'bar', + }, + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.MATCH`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.MATCH, value: 'bar' }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ match: { foo: 'bar' } }], + it('works with `Comparator.MATCH`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.MATCH, value: 'bar' }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + match: { + foo: 'bar', + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.NOT_MATCH`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.NOT_MATCH, value: 'bar' }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must_not: [{ match: { foo: 'bar' } }], + it('works with `Comparator.NOT_MATCH`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.NOT_MATCH, value: 'bar' }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + ], + must_not: [ + { + match: { + foo: 'bar', + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.MATCH_PHRASE`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.MATCH_PHRASE, value: 'bar' }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ match_phrase: { foo: 'bar' } }], + it('works with `Comparator.MATCH_PHRASE`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.MATCH_PHRASE, value: 'bar' }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + match_phrase: { + foo: 'bar', + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.NOT_MATCH_PHRASE`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.NOT_MATCH_PHRASE, value: 'bar' }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must_not: [{ match_phrase: { foo: 'bar' } }], + it('works with `Comparator.NOT_MATCH_PHRASE`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.NOT_MATCH_PHRASE, value: 'bar' }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + ], + must_not: [ + { + match_phrase: { + foo: 'bar', + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.GT`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.GT, value: 1 }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ range: { foo: { gt: 1 } } }], + it('works with `Comparator.GT`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.GT, value: 1 }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + range: { + foo: { + gt: 1, + }, + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.GT_OR_EQ`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.GT_OR_EQ, value: 1 }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ range: { foo: { gte: 1 } } }], + it('works with `Comparator.GT_OR_EQ`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.GT_OR_EQ, value: 1 }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + range: { + foo: { + gte: 1, + }, + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.LT`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.LT, value: 1 }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ range: { foo: { lt: 1 } } }], + it('works with `Comparator.LT`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.LT, value: 1 }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + range: { + foo: { + lt: 1, + }, + }, + }, + ], + }, }, - }, + size: 0, + }); }); - }); - it('works with `Comparator.LT_OR_EQ`', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [{ field: 'foo', comparator: Comparator.LT_OR_EQ, value: 1 }] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ range: { foo: { lte: 1 } } }], + it('works with `Comparator.LT_OR_EQ`', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [{ field: 'foo', comparator: Comparator.LT_OR_EQ, value: 1 }] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + range: { + foo: { + lte: 1, + }, + }, + }, + ], + }, }, - }, + size: 0, + }); }); }); -}); -describe('Multiple criteria create the right ES query', () => { - beforeEach(() => { - services.callCluster.mockReset(); - }); - it('works', async () => { - await callExecutor( - [2, Comparator.GT, 1], // Not relevant - [ - { field: 'foo', comparator: Comparator.EQ, value: 'bar' }, - { field: 'http.status', comparator: Comparator.LT, value: 400 }, - ] - ); - - const query = services.callCluster.mock.calls[0][1]!; - expect(query.body).toMatchObject({ - query: { - bool: { - must: [{ term: { foo: { value: 'bar' } } }, { range: { 'http.status': { lt: 400 } } }], + describe('Multiple criteria create the right ES query', () => { + beforeEach(() => { + services.callCluster.mockReset(); + }); + it('works', async () => { + await callExecutor( + [2, Comparator.GT, 1], // Not relevant + [ + { field: 'foo', comparator: Comparator.EQ, value: 'bar' }, + { field: 'http.status', comparator: Comparator.LT, value: 400 }, + ] + ); + + const query = services.callCluster.mock.calls[0][1]!; + + expect(query.body).toMatchObject({ + track_total_hits: true, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + format: 'epoch_millis', + }, + }, + }, + { + term: { + foo: { + value: 'bar', + }, + }, + }, + { + range: { + 'http.status': { + lt: 400, + }, + }, + }, + ], + }, }, - }, + size: 0, + }); }); }); }); diff --git a/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.ts b/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.ts index ee4e1fcb3f6e2..a2fd01f859385 100644 --- a/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.ts +++ b/x-pack/plugins/infra/server/lib/alerting/log_threshold/log_threshold_executor.ts @@ -11,10 +11,19 @@ import { Comparator, LogDocumentCountAlertParams, Criterion, + GroupedSearchQueryResponseRT, + UngroupedSearchQueryResponseRT, + UngroupedSearchQueryResponse, + GroupedSearchQueryResponse, + LogDocumentCountAlertParamsRT, } from '../../../../common/alerting/logs/types'; import { InfraBackendLibs } from '../../infra_types'; import { getIntervalInSeconds } from '../../../utils/get_interval_in_seconds'; import { InfraSource } from '../../../../common/http_api/source_api'; +import { decodeOrThrow } from '../../../../common/runtime_types'; + +const UNGROUPED_FACTORY_KEY = '*'; +const COMPOSITE_GROUP_SIZE = 40; const checkValueAgainstComparatorMap: { [key: string]: (a: number, b: number) => boolean; @@ -25,37 +34,42 @@ const checkValueAgainstComparatorMap: { [Comparator.LT_OR_EQ]: (a: number, b: number) => a <= b, }; -export const createLogThresholdExecutor = (alertUUID: string, libs: InfraBackendLibs) => +export const createLogThresholdExecutor = (alertId: string, libs: InfraBackendLibs) => async function ({ services, params }: AlertExecutorOptions) { - const { count, criteria } = params as LogDocumentCountAlertParams; const { alertInstanceFactory, savedObjectsClient, callCluster } = services; const { sources } = libs; + const { groupBy } = params; const sourceConfiguration = await sources.getSourceConfiguration(savedObjectsClient, 'default'); const indexPattern = sourceConfiguration.configuration.logAlias; - - const alertInstance = alertInstanceFactory(alertUUID); + const alertInstance = alertInstanceFactory(alertId); try { - const query = getESQuery( - params as LogDocumentCountAlertParams, - sourceConfiguration.configuration - ); - const result = await getResults(query, indexPattern, callCluster); - - if (checkValueAgainstComparatorMap[count.comparator](result.count, count.value)) { - alertInstance.scheduleActions(FIRED_ACTIONS.id, { - matchingDocuments: result.count, - conditions: createConditionsMessage(criteria), - }); - - alertInstance.replaceState({ - alertState: AlertStates.ALERT, - }); + const validatedParams = decodeOrThrow(LogDocumentCountAlertParamsRT)(params); + + const query = + groupBy && groupBy.length > 0 + ? getGroupedESQuery(validatedParams, sourceConfiguration.configuration, indexPattern) + : getUngroupedESQuery(validatedParams, sourceConfiguration.configuration, indexPattern); + + if (!query) { + throw new Error('ES query could not be built from the provided alert params'); + } + + if (groupBy && groupBy.length > 0) { + processGroupByResults( + await getGroupedResults(query, callCluster), + validatedParams, + alertInstanceFactory, + alertId + ); } else { - alertInstance.replaceState({ - alertState: AlertStates.OK, - }); + processUngroupedResults( + await getUngroupedResults(query, callCluster), + validatedParams, + alertInstanceFactory, + alertId + ); } } catch (e) { alertInstance.replaceState({ @@ -66,27 +80,82 @@ export const createLogThresholdExecutor = (alertUUID: string, libs: InfraBackend } }; -const getESQuery = ( +const processUngroupedResults = ( + results: UngroupedSearchQueryResponse, params: LogDocumentCountAlertParams, - sourceConfiguration: InfraSource['configuration'] -): object => { + alertInstanceFactory: AlertExecutorOptions['services']['alertInstanceFactory'], + alertId: string +) => { + const { count, criteria } = params; + + const alertInstance = alertInstanceFactory(`${alertId}-${UNGROUPED_FACTORY_KEY}`); + const documentCount = results.hits.total.value; + + if (checkValueAgainstComparatorMap[count.comparator](documentCount, count.value)) { + alertInstance.scheduleActions(FIRED_ACTIONS.id, { + matchingDocuments: documentCount, + conditions: createConditionsMessage(criteria), + group: null, + }); + + alertInstance.replaceState({ + alertState: AlertStates.ALERT, + }); + } else { + alertInstance.replaceState({ + alertState: AlertStates.OK, + }); + } +}; + +interface ReducedGroupByResults { + name: string; + documentCount: number; +} + +const processGroupByResults = ( + results: GroupedSearchQueryResponse['aggregations']['groups']['buckets'], + params: LogDocumentCountAlertParams, + alertInstanceFactory: AlertExecutorOptions['services']['alertInstanceFactory'], + alertId: string +) => { + const { count, criteria } = params; + + const groupResults = results.reduce((acc, groupBucket) => { + const groupName = Object.values(groupBucket.key).join(', '); + const groupResult = { name: groupName, documentCount: groupBucket.filtered_results.doc_count }; + return [...acc, groupResult]; + }, []); + + groupResults.forEach((group) => { + const alertInstance = alertInstanceFactory(`${alertId}-${group.name}`); + const documentCount = group.documentCount; + + if (checkValueAgainstComparatorMap[count.comparator](documentCount, count.value)) { + alertInstance.scheduleActions(FIRED_ACTIONS.id, { + matchingDocuments: documentCount, + conditions: createConditionsMessage(criteria), + group: group.name, + }); + + alertInstance.replaceState({ + alertState: AlertStates.ALERT, + }); + } else { + alertInstance.replaceState({ + alertState: AlertStates.OK, + }); + } + }); +}; + +const buildFiltersFromCriteria = (params: LogDocumentCountAlertParams, timestampField: string) => { const { timeSize, timeUnit, criteria } = params; const interval = `${timeSize}${timeUnit}`; const intervalAsSeconds = getIntervalInSeconds(interval); + const intervalAsMs = intervalAsSeconds * 1000; const to = Date.now(); - const from = to - intervalAsSeconds * 1000; - - const rangeFilters = [ - { - range: { - [sourceConfiguration.fields.timestamp]: { - gte: from, - lte: to, - format: 'epoch_millis', - }, - }, - }, - ]; + const from = to - intervalAsMs; const positiveComparators = getPositiveComparators(); const negativeComparators = getNegativeComparators(); @@ -101,17 +170,121 @@ const getESQuery = ( // Negative assertions (things that "must not" match) const mustNotFilters = buildFiltersForCriteria(negativeCriteria); - const query = { + const rangeFilter = { + range: { + [timestampField]: { + gte: from, + lte: to, + format: 'epoch_millis', + }, + }, + }; + + // For group by scenarios we'll pad the time range by 1 x the interval size on the left (lte) and right (gte), this is so + // a wider net is cast to "capture" the groups. This is to account for scenarios where we want ascertain if + // there were "no documents" (less than 1 for example). In these cases we may be missing documents to build the groups + // and match / not match the criteria. + const groupedRangeFilter = { + range: { + [timestampField]: { + gte: from - intervalAsMs, + lte: to + intervalAsMs, + format: 'epoch_millis', + }, + }, + }; + + return { rangeFilter, groupedRangeFilter, mustFilters, mustNotFilters }; +}; + +const getGroupedESQuery = ( + params: LogDocumentCountAlertParams, + sourceConfiguration: InfraSource['configuration'], + index: string +): object | undefined => { + const { groupBy } = params; + + if (!groupBy || !groupBy.length) { + return; + } + + const timestampField = sourceConfiguration.fields.timestamp; + + const { rangeFilter, groupedRangeFilter, mustFilters, mustNotFilters } = buildFiltersFromCriteria( + params, + timestampField + ); + + const aggregations = { + groups: { + composite: { + size: COMPOSITE_GROUP_SIZE, + sources: groupBy.map((field, groupIndex) => ({ + [`group-${groupIndex}-${field}`]: { + terms: { field }, + }, + })), + }, + aggregations: { + filtered_results: { + filter: { + bool: { + // Scope the inner filtering back to the unpadded range + filter: [rangeFilter, ...mustFilters], + }, + }, + }, + }, + }, + }; + + const body = { query: { bool: { - filter: [...rangeFilters], - ...(mustFilters.length > 0 && { must: mustFilters }), + filter: [groupedRangeFilter], ...(mustNotFilters.length > 0 && { must_not: mustNotFilters }), }, }, + aggregations, + size: 0, }; - return query; + return { + index, + allowNoIndices: true, + ignoreUnavailable: true, + body, + }; +}; + +const getUngroupedESQuery = ( + params: LogDocumentCountAlertParams, + sourceConfiguration: InfraSource['configuration'], + index: string +): object => { + const { rangeFilter, mustFilters, mustNotFilters } = buildFiltersFromCriteria( + params, + sourceConfiguration.fields.timestamp + ); + + const body = { + // Ensure we accurately track the hit count for the ungrouped case, otherwise we can only ensure accuracy up to 10,000. + track_total_hits: true, + query: { + bool: { + filter: [rangeFilter, ...mustFilters], + ...(mustNotFilters.length > 0 && { must_not: mustNotFilters }), + }, + }, + size: 0, + }; + + return { + index, + allowNoIndices: true, + ignoreUnavailable: true, + body, + }; }; type SupportedESQueryTypes = 'term' | 'match' | 'match_phrase' | 'range'; @@ -145,7 +318,6 @@ const buildCriterionQuery = (criterion: Criterion): Filter | undefined => { }, }, }; - break; case 'match': { return { match: { @@ -221,15 +393,31 @@ const getQueryMappingForComparator = (comparator: Comparator) => { return queryMappings[comparator]; }; -const getResults = async ( - query: object, - index: string, - callCluster: AlertServices['callCluster'] -) => { - return await callCluster('count', { - body: query, - index, - }); +const getUngroupedResults = async (query: object, callCluster: AlertServices['callCluster']) => { + return decodeOrThrow(UngroupedSearchQueryResponseRT)(await callCluster('search', query)); +}; + +const getGroupedResults = async (query: object, callCluster: AlertServices['callCluster']) => { + let compositeGroupBuckets: GroupedSearchQueryResponse['aggregations']['groups']['buckets'] = []; + let lastAfterKey: GroupedSearchQueryResponse['aggregations']['groups']['after_key'] | undefined; + + while (true) { + const queryWithAfterKey: any = { ...query }; + queryWithAfterKey.body.aggregations.groups.composite.after = lastAfterKey; + const groupResponse: GroupedSearchQueryResponse = decodeOrThrow(GroupedSearchQueryResponseRT)( + await callCluster('search', queryWithAfterKey) + ); + compositeGroupBuckets = [ + ...compositeGroupBuckets, + ...groupResponse.aggregations.groups.buckets, + ]; + lastAfterKey = groupResponse.aggregations.groups.after_key; + if (groupResponse.aggregations.groups.buckets.length < COMPOSITE_GROUP_SIZE) { + break; + } + } + + return compositeGroupBuckets; }; const createConditionsMessage = (criteria: LogDocumentCountAlertParams['criteria']) => { diff --git a/x-pack/plugins/infra/server/lib/alerting/log_threshold/register_log_threshold_alert_type.ts b/x-pack/plugins/infra/server/lib/alerting/log_threshold/register_log_threshold_alert_type.ts index ed7e82fe29e4c..43c298019b632 100644 --- a/x-pack/plugins/infra/server/lib/alerting/log_threshold/register_log_threshold_alert_type.ts +++ b/x-pack/plugins/infra/server/lib/alerting/log_threshold/register_log_threshold_alert_type.ts @@ -28,6 +28,13 @@ const conditionsActionVariableDescription = i18n.translate( } ); +const groupByActionVariableDescription = i18n.translate( + 'xpack.infra.logs.alerting.threshold.groupByActionVariableDescription', + { + defaultMessage: 'The name of the group responsible for triggering the alert', + } +); + const countSchema = schema.object({ value: schema.number(), comparator: schema.oneOf([ @@ -75,6 +82,7 @@ export async function registerLogThresholdAlertType( criteria: schema.arrayOf(criteriaSchema), timeUnit: schema.string(), timeSize: schema.number(), + groupBy: schema.maybe(schema.arrayOf(schema.string())), }), }, defaultActionGroupId: FIRED_ACTIONS.id, @@ -84,6 +92,7 @@ export async function registerLogThresholdAlertType( context: [ { name: 'matchingDocuments', description: documentCountActionVariableDescription }, { name: 'conditions', description: conditionsActionVariableDescription }, + { name: 'group', description: groupByActionVariableDescription }, ], }, producer: 'logs',