From c4bbddb7cfd683ce2145c0f1492f6838bacffae7 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Fri, 8 Sep 2023 00:14:34 -0400 Subject: [PATCH] [ResponseOps] resolve conflicts when updating alert docs after rule execution resolves: #158403 When conflicts are detected while updating alert docs after a rule runs, we'll try to resolve the conflict by `mget()`'ing the alert documents again, to get the updated OCC info `_seq_no` and `_primary_term`. We'll also get the current versions of "ad-hoc" updated fields (which caused the conflict), like workflow status, case assignments, etc. And then attempt to update the alert doc again, with that info, which should get it back up-to-date. - hard-code the fields to refresh - add skeletal version of a function test - add some debugging for CI-only/not-local test failure - change new rule type to wait for signal to finish - a little clean up, no clue why this passes locally and fails in CI though - dump rule type registry to see if rule type there - remove diagnostic code from FT - a real test that passes locally, for alerting framework - add test for RR, but it's failing as it doesn't resolve conflicts yet - fix some stuff, add support for upcoming untracked alert status - change recover algorithm to retry subsequent times corectly - remove RR support (not needed), so simplify other things - remove more RR bits (TransportStuff) and add jest tests - add multi-alert bulk update function test - rebase main --- .../server/alerts_client/alerts_client.ts | 24 +- .../lib/alert_conflict_resolver.test.ts | 219 ++++++++++++++ .../lib/alert_conflict_resolver.ts | 243 ++++++++++++++++ .../plugins/alerts/server/alert_types.ts | 133 ++++++++- .../common/plugins/alerts/server/plugin.ts | 6 + .../packages/helpers/es_test_index_tool.ts | 11 + .../alerts_as_data_conflicts.ts | 275 ++++++++++++++++++ .../alerting/group4/alerts_as_data/index.ts | 1 + 8 files changed, 902 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts create mode 100644 x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts create mode 100644 x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/alerts_as_data_conflicts.ts diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts index 90fbba5969de8..ea6632f74ee9b 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts @@ -7,6 +7,7 @@ import { ElasticsearchClient } from '@kbn/core/server'; import { ALERT_RULE_UUID, ALERT_UUID } from '@kbn/rule-data-utils'; +import { BulkRequest } from '@elastic/elasticsearch/lib/api/types'; import { chunk, flatMap, isEmpty, keys } from 'lodash'; import { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { Alert } from '@kbn/alerts-as-data-utils'; @@ -49,6 +50,7 @@ import { getContinualAlertsQuery, } from './lib'; import { isValidAlertIndexName } from '../alerts_service'; +import { resolveAlertConflicts } from './lib/alert_conflict_resolver'; // Term queries can take up to 10,000 terms const CHUNK_SIZE = 10000; @@ -406,6 +408,13 @@ export class AlertsClient< ]) ); + const bulkRequest: BulkRequest = { + refresh: 'wait_for', + index: this.indexTemplateAndPattern.alias, + require_alias: !this.isUsingDataStreams(), + operations: bulkBody, + }; + try { const response = await esClient.bulk({ refresh: 'wait_for', @@ -416,15 +425,12 @@ export class AlertsClient< // If there were individual indexing errors, they will be returned in the success response if (response && response.errors) { - const errorsInResponse = (response.items ?? []) - .map((item) => item?.index?.error || item?.create?.error) - .filter((item) => item != null); - - this.options.logger.error( - `Error writing ${errorsInResponse.length} out of ${ - alertsToIndex.length - } alerts - ${JSON.stringify(errorsInResponse)}` - ); + await resolveAlertConflicts({ + logger: this.options.logger, + esClient, + bulkRequest, + bulkResponse: response, + }); } } catch (err) { this.options.logger.error( diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts new file mode 100644 index 0000000000000..7625158ab11e4 --- /dev/null +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts @@ -0,0 +1,219 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { elasticsearchServiceMock } from '@kbn/core/server/mocks'; +import { + BulkRequest, + BulkResponse, + BulkResponseItem, + BulkOperationType, +} from '@elastic/elasticsearch/lib/api/types'; + +import { resolveAlertConflicts } from './alert_conflict_resolver'; + +const logger = loggingSystemMock.create().get(); +const esClient = elasticsearchServiceMock.createElasticsearchClient(); + +const alertDoc = { + event: { action: 'active' }, + kibana: { + alert: { + status: 'untracked', + workflow_status: 'a-ok!', + workflow_tags: ['fee', 'fi', 'fo', 'fum'], + case_ids: ['123', '456', '789'], + }, + }, +}; + +describe('alert_conflict_resolver', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + describe('is successful with', () => { + test('no bulk results', async () => { + const { bulkRequest, bulkResponse } = getReqRes(''); + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + expect(logger.error).not.toHaveBeenCalled(); + }); + + test('no errors in bulk results', async () => { + const { bulkRequest, bulkResponse } = getReqRes('c is is c is'); + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + expect(logger.error).not.toHaveBeenCalled(); + }); + + test('one conflicted doc', async () => { + const { bulkRequest, bulkResponse } = getReqRes('ic'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(0, alertDoc)], + }); + + esClient.bulk.mockResolvedValueOnce({ + errors: false, + took: 0, + items: [getBulkResItem(0)], + }); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing 1 out of 1 alerts - [{"message":"hallo"}]` + ); + expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 1 conflicted alerts succeeded` + ); + }); + }); +}); + +function getBulkResItem(id: number) { + return { + index: { + _index: `index-${id}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + status: 200, + }, + }; +} + +function getMGetResDoc(id: number, doc: unknown) { + return { + _index: `index-${id}}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + found: true, + _source: doc, + }; +} + +interface GetReqResResult { + bulkRequest: BulkRequest; + bulkResponse: BulkResponse; +} + +/** + * takes as input a string of c, is, ic, ie tokens and builds appropriate + * bulk request and response objects + */ +function getReqRes(bulkOps: string): GetReqResResult { + const ops = bulkOps.trim().split(/\s+/g); + + const bulkRequest = getBulkRequest(); + const bulkResponse = getBulkResponse(); + + bulkRequest.operations = []; + bulkResponse.items = []; + bulkResponse.errors = false; + + if (ops[0] === '') return { bulkRequest, bulkResponse }; + + const createOp = { create: {} }; + + let id = 0; + for (const op of ops) { + id++; + switch (op) { + // create, ignored + case 'c': + bulkRequest.operations.push(createOp, alertDoc); + bulkResponse.items.push(getResponseItem('create', id, false, 200)); + break; + + // index with success + case 'is': + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, false, 200)); + break; + + // index with conflict + case 'ic': + bulkResponse.errors = true; + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, true, 409)); + break; + + // index with error but not conflict + case 'ie': + bulkResponse.errors = true; + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, true, 418)); // I'm a teapot + break; + + default: + throw new Error('bad input'); + } + } + + return { bulkRequest, bulkResponse }; +} + +function getBulkRequest(): BulkRequest { + return { + refresh: 'wait_for', + index: 'some-index', + require_alias: true, + operations: [], + }; +} + +function getIndexOp(id: number) { + return { + index: { + _id: `id-${id}`, + _index: `index-${id}`, + if_seq_no: 17, + if_primary_term: 1, + require_alias: false, + }, + }; +} + +function getBulkResponse(): BulkResponse { + return { + errors: false, + took: 0, + items: [], + }; +} + +function getResponseItem( + type: BulkOperationType, + id: number, + error: boolean, + status: number +): Partial> { + if (error) { + return { + [type]: { + _index: `index-${id}`, + _id: `id-${id}`, + error: { message: 'hallo' }, + status, + }, + }; + } + + return { + [type]: { + _index: `index-${id}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + status: 200, + }, + }; +} diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts new file mode 100644 index 0000000000000..d3a178c91379c --- /dev/null +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts @@ -0,0 +1,243 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + BulkRequest, + BulkResponse, + BulkOperationContainer, + MgetResponseItem, +} from '@elastic/elasticsearch/lib/api/types'; + +import { Logger, ElasticsearchClient } from '@kbn/core/server'; +import { ALERT_STATUS, ALERT_STATUS_ACTIVE, ALERT_STATUS_RECOVERED } from '@kbn/rule-data-utils'; + +import { set } from '@kbn/safer-lodash-set'; +import { zip, get } from 'lodash'; + +const REFRESH_FIELDS_ALWAYS = [ + 'kibana.alert.workflow_status', + 'kibana.alert.workflow_tags', + 'kibana.alert.case_ids', +]; +const REFRESH_FIELDS_CONDITIONAL = ['kibana.alert.status']; + +const REFRESH_FIELDS = [...REFRESH_FIELDS_ALWAYS, ...REFRESH_FIELDS_CONDITIONAL]; + +export interface ResolveAlertConflictsParams { + esClient: ElasticsearchClient; + logger: Logger; + bulkRequest: BulkRequest; + bulkResponse: BulkResponse; +} + +interface NormalizedBulkRequest { + op: BulkOperationContainer; + doc: unknown; +} + +export async function resolveAlertConflicts(params: ResolveAlertConflictsParams): Promise { + const { logger, esClient, bulkRequest, bulkResponse } = params; + + const errorsInResponse = (bulkResponse.items ?? []) + .map((item) => item?.index?.error || item?.create?.error) + .filter((item) => item != null); + + if (errorsInResponse.length === 0) return; + + const normalizedRequest = getConflictRequest(bulkRequest, bulkResponse); + + logger.error( + `Error writing ${errorsInResponse.length} out of ${ + bulkResponse.items.length + } alerts - ${JSON.stringify(errorsInResponse)}` + ); + + const freshDocs = await getFreshDocs(esClient, normalizedRequest); + await updateOCC(normalizedRequest, freshDocs); + await refreshFieldsInDocs(normalizedRequest, freshDocs); + + logger.info(`Retrying bulk update of ${normalizedRequest.length} conflicted alerts`); + + const mbrResponse = await makeBulkRequest(params.esClient, params.bulkRequest, normalizedRequest); + + if (mbrResponse.bulkResponse?.items.length !== normalizedRequest.length) { + const actual = mbrResponse.bulkResponse?.items.length; + const expected = normalizedRequest.length; + logger.error( + `Unexpected number of bulk response items retried; expecting ${expected}, retried ${actual}` + ); + return; + } + + if (mbrResponse.error) { + const index = bulkRequest.index || 'unknown index'; + logger.error( + `Error writing ${normalizedRequest.length} alerts to ${index} - ${mbrResponse.error.message}` + ); + return; + } + + if (mbrResponse.errors === 0) { + logger.info(`Retried bulk update of ${normalizedRequest.length} conflicted alerts succeeded`); + } else { + logger.error( + `Retried bulk update of ${normalizedRequest.length} conflicted alerts still had ${mbrResponse.errors} conflicts` + ); + } +} + +interface MakeBulkRequestResponse { + bulkRequest: BulkRequest; + bulkResponse?: BulkResponse; + errors: number; + error?: Error; +} + +async function makeBulkRequest( + esClient: ElasticsearchClient, + bulkRequest: BulkRequest, + conflictRequest: NormalizedBulkRequest[] +): Promise { + const operations = conflictRequest.map((req) => [req.op, req.doc]).flat(); + const updatedBulkRequest = { ...bulkRequest, operations }; + + let bulkResponse: Awaited>; + try { + bulkResponse = await esClient.bulk(updatedBulkRequest); + } catch (error) { + return { bulkRequest, errors: 0, error }; + } + + const errors = bulkResponse.items.filter((item) => item.index?.error).length; + return { bulkRequest, bulkResponse, errors }; +} + +/** Update the certain fields in the conflict requests with fresh data. */ +async function refreshFieldsInDocs( + conflictRequests: NormalizedBulkRequest[], + freshResponses: MgetResponseItem[] +) { + for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) { + if (!conflictRequest?.op.index || !freshResponse) continue; + + // @ts-expect-error @elastic/elasticsearch _seq_no is not in the type! + const freshDoc = freshResponse._source; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const conflictDoc = conflictRequest.doc as Record; + if (!freshDoc || !conflictDoc) continue; + + for (const refreshField of REFRESH_FIELDS_ALWAYS) { + const val = get(freshDoc, refreshField); + set(conflictDoc, refreshField, val); + } + + // structured this way to make sure all conditional refresh + // fields are listed in REFRESH_FIELDS_CONDITIONAL when we mget + for (const refreshField of REFRESH_FIELDS_CONDITIONAL) { + switch (refreshField) { + // hamdling for kibana.alert.status: overwrite conflict doc + // with fresh version if it's not active or recovered (ie, untracked) + case ALERT_STATUS: + const freshStatus = get(freshDoc, ALERT_STATUS); + + if (freshStatus !== ALERT_STATUS_ACTIVE && freshStatus !== ALERT_STATUS_RECOVERED) { + set(conflictDoc, ALERT_STATUS, freshStatus); + } + break; + } + } + } +} + +/** Update the OCC info in the conflict request with the fresh info. */ +async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: MgetResponseItem[]) { + for (const [req, freshDoc] of zip(conflictRequests, freshDocs)) { + if (!req?.op.index || !freshDoc) continue; + + // @ts-expect-error @elastic/elasticsearch _seq_no is not in the type! + const seqNo: number | undefined = freshDoc._seq_no; + // @ts-expect-error @elastic/elasticsearch _primary_term is not in the type! + const primaryTerm: number | undefined = freshDoc._primary_term; + + if (seqNo === undefined) throw new Error('Unexpected undefined seqNo'); + if (primaryTerm === undefined) throw new Error('Unexpected undefined primaryTerm'); + + req.op.index.if_seq_no = seqNo; + req.op.index.if_primary_term = primaryTerm; + } +} + +/** Get the latest version of the conflicted docs, with fields to refresh. */ +async function getFreshDocs( + esClient: ElasticsearchClient, + conflictRequests: NormalizedBulkRequest[] +): Promise { + const docs: Array<{ _id: string; _index: string }> = []; + + conflictRequests.forEach((req) => { + const [id, index] = [req.op.index?._id, req.op.index?._index]; + if (!id || !index) return; + + docs.push({ _id: id, _index: index }); + }); + + const mgetRes = await esClient.mget({ docs, _source_includes: REFRESH_FIELDS }); + + if (mgetRes.docs.length !== docs.length) { + throw new Error( + `Unexpected number of mget response docs; expected ${docs.length}, got ${mgetRes.docs.length}` + ); + } + + return mgetRes.docs; +} + +/** Return the bulk request, filtered to those requests that had conflicts. */ +function getConflictRequest( + bulkRequest: BulkRequest, + bulkResponse: BulkResponse +): NormalizedBulkRequest[] { + const request = normalizeRequest(bulkRequest); + + if (request.length !== bulkResponse.items.length) { + throw new Error('Unexpected number of bulk response items'); + } + if (request.length === 0) return []; + + const conflictRequest = zip(request, bulkResponse.items) + .filter(([_, res]) => res?.index?.status === 409) + .map(([req, _]) => req!); + + return conflictRequest; +} + +/** Convert a bulk request (op | doc)[] to an array of { op, doc }[] for index op */ +function normalizeRequest(bulkRequest: BulkRequest) { + if (!bulkRequest.operations) return []; + const result: NormalizedBulkRequest[] = []; + + let index = 0; + while (index < bulkRequest.operations.length) { + const op = bulkRequest.operations[index] as BulkOperationContainer; + + if (op.create || op.index || op.update) { + index++; + if (op.index) { + const doc = bulkRequest.operations[index]; + result.push({ op, doc }); + } + } else if (op.delete) { + // no doc for delete op + } else { + throw new Error(`Unsupported bulk operation: ${JSON.stringify(op)}`); + } + + index++; + } + + return result; +} diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts index 5003acd160f29..72b3b7b34476f 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts @@ -7,7 +7,7 @@ import { v4 as uuidv4 } from 'uuid'; import { Logger } from '@kbn/logging'; -import { CoreSetup } from '@kbn/core/server'; +import { CoreSetup, ElasticsearchClient } from '@kbn/core/server'; import { schema, TypeOf } from '@kbn/config-schema'; import { curry, range, times } from 'lodash'; import { @@ -941,6 +941,136 @@ function getAlwaysFiringAlertAsDataRuleType( }); } +function getWaitingRuleType(logger: Logger) { + const ParamsType = schema.object({ + source: schema.string(), + alerts: schema.number(), + }); + type ParamsType = TypeOf; + interface State extends RuleTypeState { + runCount?: number; + } + const id = 'test.waitingRule'; + + const result: RuleType< + ParamsType, + never, + State, + {}, + {}, + 'default', + 'recovered', + { runCount: number } + > = { + id, + name: 'Test: Rule that waits for a signal before finishing', + actionGroups: [{ id: 'default', name: 'Default' }], + producer: 'alertsFixture', + defaultActionGroupId: 'default', + minimumLicenseRequired: 'basic', + isExportable: true, + doesSetRecoveryContext: true, + validate: { params: ParamsType }, + alerts: { + context: id.toLowerCase(), + shouldWrite: true, + mappings: { + fieldMap: { + runCount: { required: false, type: 'long' }, + }, + }, + }, + async executor(alertExecutorOptions) { + const { services, state, params } = alertExecutorOptions; + const { source, alerts } = params; + + const alertsClient = services.alertsClient; + if (!alertsClient) throw new Error(`Expected alertsClient!`); + + const runCount = (state.runCount || 0) + 1; + const es = services.scopedClusterClient.asInternalUser; + + await sendSignal(logger, es, id, source, `rule-starting-${runCount}`); + await waitForSignal(logger, es, id, source, `rule-complete-${runCount}`); + + for (let i = 0; i < alerts; i++) { + alertsClient.report({ + id: `alert-${i}`, + actionGroup: 'default', + payload: { runCount }, + }); + } + + return { state: { runCount } }; + }, + }; + + return result; +} + +async function sendSignal( + logger: Logger, + es: ElasticsearchClient, + id: string, + source: string, + reference: string +) { + logger.info(`rule type ${id} sending signal ${reference}`); + await es.index({ index: ES_TEST_INDEX_NAME, refresh: 'true', body: { source, reference } }); +} + +async function waitForSignal( + logger: Logger, + es: ElasticsearchClient, + id: string, + source: string, + reference: string +) { + let docs: unknown[] = []; + for (let attempt = 0; attempt < 20; attempt++) { + docs = await getSignalDocs(es, source, reference); + if (docs.length > 0) { + logger.info(`rule type ${id} received signal ${reference}`); + break; + } + + logger.info(`rule type ${id} waiting for signal ${reference}`); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + if (docs.length === 0) { + throw new Error(`Expected to find docs with source ${source}`); + } +} + +async function getSignalDocs(es: ElasticsearchClient, source: string, reference: string) { + const body = { + query: { + bool: { + must: [ + { + term: { + source, + }, + }, + { + term: { + reference, + }, + }, + ], + }, + }, + }; + const params = { + index: ES_TEST_INDEX_NAME, + size: 1000, + _source: false, + body, + }; + const result = await es.search(params, { meta: true }); + return result?.body?.hits?.hits || []; +} + export function defineAlertTypes( core: CoreSetup, { alerting, ruleRegistry }: Pick, @@ -1162,4 +1292,5 @@ export function defineAlertTypes( alerting.registerType(getAlwaysFiringAlertAsDataRuleType(logger, { ruleRegistry })); alerting.registerType(getPatternFiringAutoRecoverFalseAlertType()); alerting.registerType(getPatternFiringAlertsAsDataRuleType()); + alerting.registerType(getWaitingRuleType(logger)); } diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts index 0809a4a5b71c7..ff304719bb5f4 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts @@ -88,6 +88,8 @@ export class FixturePlugin implements Plugin, b: SearchHit) { + return a._source!.kibana.alert.instance.id.localeCompare(b._source!.kibana.alert.instance.id); +} + +// eslint-disable-next-line import/no-default-export +export default function createAlertsAsDataInstallResourcesTest({ getService }: FtrProviderContext) { + const es = getService('es'); + const retry = getService('retry'); + const supertestWithoutAuth = getService('supertestWithoutAuth'); + const objectRemover = new ObjectRemover(supertestWithoutAuth); + const esTestIndexTool = new ESTestIndexTool(es, retry); + + describe('document conflicts during rule execution', () => { + before(async () => { + await esTestIndexTool.destroy(); + await esTestIndexTool.setup(); + }); + + after(async () => { + await objectRemover.removeAll(); + await esTestIndexTool.destroy(); + }); + + const ruleType = 'test.waitingRule'; + const aadIndex = `.alerts-${ruleType.toLowerCase()}.alerts-default`; + + describe(`should be handled for alerting framework based AaD`, () => { + it('for a single conflicted alert', async () => { + const source = uuidv4(); + const count = 1; + const params = { source, alerts: count }; + const createdRule = await supertestWithoutAuth + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + name: `${basename(__filename)} ${ruleType} ${source}}`, + rule_type_id: ruleType, + schedule: { interval: '1s' }, + throttle: null, + params, + actions: [], + }) + ); + + if (createdRule.status !== 200) { + log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`); + } + expect(createdRule.status).to.eql(200); + + const ruleId = createdRule.body.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + log(`signal the rule to finish the first run`); + await esTestIndexTool.indexDoc(source, 'rule-complete-1'); + + log(`wait for the first alert doc to be created`); + const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count); + expect(initialDocs.length).to.eql(count); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-2'); + + log(`ad-hoc update the alert doc`); + await adHocUpdate(es, aadIndex, initialDocs[0]._id); + + log(`signal the rule to finish`); + await esTestIndexTool.indexDoc(source, 'rule-complete-2'); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-3'); + + log(`get the updated alert doc`); + const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count); + expect(updatedDocs.length).to.eql(1); + + log(`signal the rule to finish, then delete it`); + await esTestIndexTool.indexDoc(source, 'rule-complete-3'); + await objectRemover.removeAll(); + + // compare the initial and updated alert docs + compareAlertDocs(initialDocs[0], updatedDocs[0], true); + }); + + it('for a mix of successful and conflicted alerts', async () => { + const source = uuidv4(); + const count = 5; + const params = { source, alerts: count }; + const createdRule = await supertestWithoutAuth + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + name: `${basename(__filename)} ${ruleType} ${source}}`, + rule_type_id: ruleType, + schedule: { interval: '1s' }, + throttle: null, + params, + actions: [], + }) + ); + + if (createdRule.status !== 200) { + log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`); + } + expect(createdRule.status).to.eql(200); + + const ruleId = createdRule.body.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + log(`signal the rule to finish the first run`); + await esTestIndexTool.indexDoc(source, 'rule-complete-1'); + + log(`wait for the first alert doc to be created`); + const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count); + initialDocs.sort(sortAlertDocsByInstanceId); + expect(initialDocs.length).to.eql(5); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-2'); + + log(`ad-hoc update the 2nd and 4th alert docs`); + await adHocUpdate(es, aadIndex, initialDocs[1]._id); + await adHocUpdate(es, aadIndex, initialDocs[3]._id); + + log(`signal the rule to finish`); + await esTestIndexTool.indexDoc(source, 'rule-complete-2'); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-3'); + + log(`get the updated alert doc`); + const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count); + updatedDocs.sort(sortAlertDocsByInstanceId); + expect(updatedDocs.length).to.eql(5); + + log(`signal the rule to finish, then delete it`); + await esTestIndexTool.indexDoc(source, 'rule-complete-3'); + await objectRemover.removeAll(); + + // compare the initial and updated alert docs + compareAlertDocs(initialDocs[0], updatedDocs[0], false); + compareAlertDocs(initialDocs[1], updatedDocs[1], true); + compareAlertDocs(initialDocs[2], updatedDocs[2], false); + compareAlertDocs(initialDocs[3], updatedDocs[3], true); + compareAlertDocs(initialDocs[4], updatedDocs[4], false); + }); + }); + }); + + async function waitForAlertDocs( + index: string, + ruleId: string, + count: number = 1 + ): Promise>> { + return await retry.try(async () => { + const searchResult = await es.search({ + index, + size: count, + body: { + query: { + bool: { + must: [{ term: { 'kibana.alert.rule.uuid': ruleId } }], + }, + }, + }, + }); + + const docs = searchResult.hits.hits as Array>; + if (docs.length < count) throw new Error(`only ${docs.length} out of ${count} docs found`); + + return docs; + }); + } +} + +function compareAlertDocs( + initialDoc: SearchHit, + updatedDoc: SearchHit, + conflicted: boolean +) { + // ensure both rule run updates and other updates persisted + if (!initialDoc) throw new Error('not enough initial docs'); + if (!updatedDoc) throw new Error('not enough updated docs'); + + const initialAlert = initialDoc._source!; + const updatedAlert = updatedDoc._source!; + + expect(initialAlert.runCount).to.be.greaterThan(0); + expect(updatedAlert.runCount).not.to.eql(-1); + expect(updatedAlert.runCount).to.be.greaterThan(initialAlert.runCount); + + if (conflicted) { + expect(get(updatedAlert, 'kibana.alert.case_ids')).to.eql( + get(DocUpdate, 'kibana.alert.case_ids') + ); + expect(get(updatedAlert, 'kibana.alert.workflow_tags')).to.eql( + get(DocUpdate, 'kibana.alert.workflow_tags') + ); + expect(get(updatedAlert, 'kibana.alert.workflow_status')).to.eql( + get(DocUpdate, 'kibana.alert.workflow_status') + ); + + expect(get(initialAlert, 'kibana.alert.status')).to.be('active'); + expect(get(updatedAlert, 'kibana.alert.status')).to.be('untracked'); + } + + const initial = omit(initialAlert, SkipFields); + const updated = omit(updatedAlert, SkipFields); + + expect(initial).to.eql(updated); +} + +// perform an adhoc update to an alert doc +async function adHocUpdate(es: Client, index: string, id: string) { + const body = { doc: DocUpdate }; + await es.update({ index, id, body, refresh: true }); +} + +// we'll do the adhoc updates with this data +const DocUpdate = { + runCount: -1, // rule-specific field, will be overwritten by rule execution + kibana: { + alert: { + action_group: 'not-the-default', // will be overwritten by rule execution + // below are all fields that will NOT be overwritten by rule execution + workflow_status: 'a-ok!', + workflow_tags: ['fee', 'fi', 'fo', 'fum'], + case_ids: ['123', '456', '789'], + status: 'untracked', + }, + }, +}; + +const SkipFields = [ + // dynamically changing fields we have no control over + '@timestamp', + 'event.action', + 'kibana.alert.duration.us', + 'kibana.alert.flapping_history', + 'kibana.alert.rule.execution.uuid', + + // fields under our control we test separately + 'runCount', + 'kibana.alert.status', + 'kibana.alert.case_ids', + 'kibana.alert.workflow_tags', + 'kibana.alert.workflow_status', +]; + +function log(message: string) { + // eslint-disable-next-line no-console + console.log(`${new Date().toISOString()} ${message}`); +} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts index 9156fb9e8ec37..20342e053016d 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts @@ -13,5 +13,6 @@ export default function alertsAsDataTests({ loadTestFile }: FtrProviderContext) loadTestFile(require.resolve('./install_resources')); loadTestFile(require.resolve('./alerts_as_data')); loadTestFile(require.resolve('./alerts_as_data_flapping')); + loadTestFile(require.resolve('./alerts_as_data_conflicts')); }); }