diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts index a5378245b7a02..6fc742e344758 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.test.ts @@ -1299,7 +1299,7 @@ describe('Alerts Client', () => { expect(clusterClient.bulk).toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith( - `Error writing 1 out of 2 alerts - [{\"type\":\"action_request_validation_exception\",\"reason\":\"Validation Failed: 1: index is missing;2: type is missing;\"}]` + `Error writing alerts: 1 successful, 0 conflicts, 1 errors: Validation Failed: 1: index is missing;2: type is missing;` ); }); diff --git a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts index 8164989761af7..eec5d3c5595bd 100644 --- a/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerting/server/alerts_client/alerts_client.ts @@ -55,6 +55,7 @@ import { getContinualAlertsQuery, } from './lib'; import { isValidAlertIndexName } from '../alerts_service'; +import { resolveAlertConflicts } from './lib/alert_conflict_resolver'; // Term queries can take up to 10,000 terms const CHUNK_SIZE = 10000; @@ -467,15 +468,17 @@ export class AlertsClient< // If there were individual indexing errors, they will be returned in the success response if (response && response.errors) { - const errorsInResponse = (response.items ?? []) - .map((item) => item?.index?.error || item?.create?.error) - .filter((item) => item != null); - - this.options.logger.error( - `Error writing ${errorsInResponse.length} out of ${ - alertsToIndex.length - } alerts - ${JSON.stringify(errorsInResponse)}` - ); + await resolveAlertConflicts({ + logger: this.options.logger, + esClient, + bulkRequest: { + refresh: 'wait_for', + index: this.indexTemplateAndPattern.alias, + require_alias: !this.isUsingDataStreams(), + operations: bulkBody, + }, + bulkResponse: response, + }); } } catch (err) { this.options.logger.error( diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts new file mode 100644 index 0000000000000..ffa2adc96f54f --- /dev/null +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.test.ts @@ -0,0 +1,307 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { elasticsearchServiceMock } from '@kbn/core/server/mocks'; +import { + BulkRequest, + BulkResponse, + BulkResponseItem, + BulkOperationType, +} from '@elastic/elasticsearch/lib/api/types'; + +import { resolveAlertConflicts } from './alert_conflict_resolver'; + +const logger = loggingSystemMock.create().get(); +const esClient = elasticsearchServiceMock.createElasticsearchClient(); + +const alertDoc = { + event: { action: 'active' }, + kibana: { + alert: { + status: 'untracked', + workflow_status: 'a-ok!', + workflow_tags: ['fee', 'fi', 'fo', 'fum'], + case_ids: ['123', '456', '789'], + }, + }, +}; + +describe('alert_conflict_resolver', () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + describe('handles errors gracefully', () => { + test('when mget fails', async () => { + const { bulkRequest, bulkResponse } = getReqRes('ic'); + + esClient.mget.mockRejectedValueOnce(new Error('mget failed')); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 2, + 'Error resolving alert conflicts: mget failed' + ); + }); + + test('when bulk fails', async () => { + const { bulkRequest, bulkResponse } = getReqRes('ic'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(0, alertDoc)], + }); + esClient.bulk.mockRejectedValueOnce(new Error('bulk failed')); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 2, + 'Error resolving alert conflicts: bulk failed' + ); + }); + }); + + describe('is successful with', () => { + test('no bulk results', async () => { + const { bulkRequest, bulkResponse } = getReqRes(''); + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + expect(logger.error).not.toHaveBeenCalled(); + }); + + test('no errors in bulk results', async () => { + const { bulkRequest, bulkResponse } = getReqRes('c is is c is'); + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + expect(logger.error).not.toHaveBeenCalled(); + }); + + test('one conflicted doc', async () => { + const { bulkRequest, bulkResponse } = getReqRes('ic'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(0, alertDoc)], + }); + + esClient.bulk.mockResolvedValueOnce({ + errors: false, + took: 0, + items: [getBulkResItem(0)], + }); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing alerts: 0 successful, 1 conflicts, 0 errors: ` + ); + expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 1 conflicted alerts succeeded` + ); + }); + + test('one conflicted doc amonst other successes and errors', async () => { + const { bulkRequest, bulkResponse } = getReqRes('is c ic ie'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(2, alertDoc)], + }); + + esClient.bulk.mockResolvedValueOnce({ + errors: false, + took: 0, + items: [getBulkResItem(2)], + }); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing alerts: 2 successful, 1 conflicts, 1 errors: hallo` + ); + expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 1 conflicted alerts succeeded` + ); + }); + + test('multiple conflicted doc amonst other successes and errors', async () => { + const { bulkRequest, bulkResponse } = getReqRes('is c ic ic ie ic'); + + esClient.mget.mockResolvedValueOnce({ + docs: [getMGetResDoc(2, alertDoc), getMGetResDoc(3, alertDoc), getMGetResDoc(5, alertDoc)], + }); + + esClient.bulk.mockResolvedValueOnce({ + errors: false, + took: 0, + items: [getBulkResItem(2), getBulkResItem(3), getBulkResItem(5)], + }); + + await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse }); + + expect(logger.error).toHaveBeenNthCalledWith( + 1, + `Error writing alerts: 2 successful, 3 conflicts, 1 errors: hallo` + ); + expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 3 conflicted alerts`); + expect(logger.info).toHaveBeenNthCalledWith( + 2, + `Retried bulk update of 3 conflicted alerts succeeded` + ); + }); + }); +}); + +function getBulkResItem(id: number) { + return { + index: { + _index: `index-${id}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + status: 200, + }, + }; +} + +function getMGetResDoc(id: number, doc: unknown) { + return { + _index: `index-${id}}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + found: true, + _source: doc, + }; +} + +interface GetReqResResult { + bulkRequest: BulkRequest; + bulkResponse: BulkResponse; +} + +/** + * takes as input a string of c, is, ic, ie tokens and builds appropriate + * bulk request and response objects to use in the tests: + * - c: create, ignored by the resolve logic + * - is: index with success + * - ic: index with conflict + * - ie: index with error but not conflict + */ +function getReqRes(bulkOps: string): GetReqResResult { + const ops = bulkOps.trim().split(/\s+/g); + + const bulkRequest = getBulkRequest(); + const bulkResponse = getBulkResponse(); + + bulkRequest.operations = []; + bulkResponse.items = []; + bulkResponse.errors = false; + + if (ops[0] === '') return { bulkRequest, bulkResponse }; + + const createOp = { create: {} }; + + let id = 0; + for (const op of ops) { + id++; + switch (op) { + // create, ignored by the resolve logic + case 'c': + bulkRequest.operations.push(createOp, alertDoc); + bulkResponse.items.push(getResponseItem('create', id, false, 200)); + break; + + // index with success + case 'is': + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, false, 200)); + break; + + // index with conflict + case 'ic': + bulkResponse.errors = true; + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, true, 409)); + break; + + // index with error but not conflict + case 'ie': + bulkResponse.errors = true; + bulkRequest.operations.push(getIndexOp(id), alertDoc); + bulkResponse.items.push(getResponseItem('index', id, true, 418)); // I'm a teapot + break; + + // developer error + default: + throw new Error('bad input'); + } + } + + return { bulkRequest, bulkResponse }; +} + +function getBulkRequest(): BulkRequest { + return { + refresh: 'wait_for', + index: 'some-index', + require_alias: true, + operations: [], + }; +} + +function getIndexOp(id: number) { + return { + index: { + _id: `id-${id}`, + _index: `index-${id}`, + if_seq_no: 17, + if_primary_term: 1, + require_alias: false, + }, + }; +} + +function getBulkResponse(): BulkResponse { + return { + errors: false, + took: 0, + items: [], + }; +} + +function getResponseItem( + type: BulkOperationType, + id: number, + error: boolean, + status: number +): Partial> { + if (error) { + return { + [type]: { + _index: `index-${id}`, + _id: `id-${id}`, + error: { reason: 'hallo' }, + status, + }, + }; + } + + return { + [type]: { + _index: `index-${id}`, + _id: `id-${id}`, + _seq_no: 18, + _primary_term: 1, + status: 200, + }, + }; +} diff --git a/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts new file mode 100644 index 0000000000000..223070c0e7245 --- /dev/null +++ b/x-pack/plugins/alerting/server/alerts_client/lib/alert_conflict_resolver.ts @@ -0,0 +1,288 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + BulkRequest, + BulkResponse, + BulkOperationContainer, + MgetResponseItem, +} from '@elastic/elasticsearch/lib/api/types'; + +import { Logger, ElasticsearchClient } from '@kbn/core/server'; +import { + ALERT_STATUS, + ALERT_STATUS_ACTIVE, + ALERT_STATUS_RECOVERED, + ALERT_WORKFLOW_STATUS, + ALERT_WORKFLOW_TAGS, + ALERT_CASE_IDS, +} from '@kbn/rule-data-utils'; + +import { set } from '@kbn/safer-lodash-set'; +import { zip, get } from 'lodash'; + +// these fields are the one's we'll refresh from the fresh mget'd docs +const REFRESH_FIELDS_ALWAYS = [ALERT_WORKFLOW_STATUS, ALERT_WORKFLOW_TAGS, ALERT_CASE_IDS]; +const REFRESH_FIELDS_CONDITIONAL = [ALERT_STATUS]; +const REFRESH_FIELDS_ALL = [...REFRESH_FIELDS_ALWAYS, ...REFRESH_FIELDS_CONDITIONAL]; + +export interface ResolveAlertConflictsParams { + esClient: ElasticsearchClient; + logger: Logger; + bulkRequest: BulkRequest; + bulkResponse: BulkResponse; +} + +interface NormalizedBulkRequest { + op: BulkOperationContainer; + doc: unknown; +} + +// wrapper to catch anything thrown; current usage of this function is +// to replace just logging that the error occurred, so we don't want +// to cause _more_ errors ... +export async function resolveAlertConflicts(params: ResolveAlertConflictsParams): Promise { + const { logger } = params; + try { + await resolveAlertConflicts_(params); + } catch (err) { + logger.error(`Error resolving alert conflicts: ${err.message}`); + } +} + +async function resolveAlertConflicts_(params: ResolveAlertConflictsParams): Promise { + const { logger, esClient, bulkRequest, bulkResponse } = params; + if (bulkRequest.operations && bulkRequest.operations?.length === 0) return; + if (bulkResponse.items && bulkResponse.items?.length === 0) return; + + // get numbers for a summary log message + const { success, errors, conflicts, messages } = getResponseStats(bulkResponse); + if (conflicts === 0 && errors === 0) return; + + const allMessages = messages.join('; '); + logger.error( + `Error writing alerts: ${success} successful, ${conflicts} conflicts, ${errors} errors: ${allMessages}` + ); + + // get a new bulk request for just conflicted docs + const conflictRequest = getConflictRequest(bulkRequest, bulkResponse); + if (conflictRequest.length === 0) return; + + // get the fresh versions of those docs + const freshDocs = await getFreshDocs(esClient, conflictRequest); + + // update the OCC and refresh-able fields + await updateOCC(conflictRequest, freshDocs); + await refreshFieldsInDocs(conflictRequest, freshDocs); + + logger.info(`Retrying bulk update of ${conflictRequest.length} conflicted alerts`); + const mbrResponse = await makeBulkRequest(params.esClient, params.bulkRequest, conflictRequest); + + if (mbrResponse.bulkResponse?.items.length !== conflictRequest.length) { + const actual = mbrResponse.bulkResponse?.items.length; + const expected = conflictRequest.length; + logger.error( + `Unexpected number of bulk response items retried; expecting ${expected}, retried ${actual}` + ); + return; + } + + if (mbrResponse.error) { + const index = bulkRequest.index || 'unknown index'; + logger.error( + `Error writing ${conflictRequest.length} alerts to ${index} - ${mbrResponse.error.message}` + ); + return; + } + + if (mbrResponse.errors === 0) { + logger.info(`Retried bulk update of ${conflictRequest.length} conflicted alerts succeeded`); + } else { + logger.error( + `Retried bulk update of ${conflictRequest.length} conflicted alerts still had ${mbrResponse.errors} conflicts` + ); + } +} + +interface MakeBulkRequestResponse { + bulkRequest: BulkRequest; + bulkResponse?: BulkResponse; + errors: number; + error?: Error; +} + +// make the bulk request to fix conflicts +async function makeBulkRequest( + esClient: ElasticsearchClient, + bulkRequest: BulkRequest, + conflictRequest: NormalizedBulkRequest[] +): Promise { + const operations = conflictRequest.map((req) => [req.op, req.doc]).flat(); + // just replace the operations from the original request + const updatedBulkRequest = { ...bulkRequest, operations }; + + const bulkResponse = await esClient.bulk(updatedBulkRequest); + + const errors = bulkResponse.items.filter((item) => item.index?.error).length; + return { bulkRequest, bulkResponse, errors }; +} + +/** Update refreshable fields in the conflict requests. */ +async function refreshFieldsInDocs( + conflictRequests: NormalizedBulkRequest[], + freshResponses: MgetResponseItem[] +) { + for (const [conflictRequest, freshResponse] of zip(conflictRequests, freshResponses)) { + if (!conflictRequest?.op.index || !freshResponse) continue; + + // @ts-expect-error @elastic/elasticsearch _source is not in the type! + const freshDoc = freshResponse._source; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const conflictDoc = conflictRequest.doc as Record; + if (!freshDoc || !conflictDoc) continue; + + for (const refreshField of REFRESH_FIELDS_ALWAYS) { + const val = get(freshDoc, refreshField); + set(conflictDoc, refreshField, val); + } + + // structured this way to make sure all conditional refresh + // fields are listed in REFRESH_FIELDS_CONDITIONAL when we mget + for (const refreshField of REFRESH_FIELDS_CONDITIONAL) { + switch (refreshField) { + // hamdling for kibana.alert.status: overwrite conflict doc + // with fresh version if it's not active or recovered (ie, untracked) + case ALERT_STATUS: + const freshStatus = get(freshDoc, ALERT_STATUS); + + if (freshStatus !== ALERT_STATUS_ACTIVE && freshStatus !== ALERT_STATUS_RECOVERED) { + set(conflictDoc, ALERT_STATUS, freshStatus); + } + break; + } + } + } +} + +/** Update the OCC info in the conflict request with the fresh info. */ +async function updateOCC(conflictRequests: NormalizedBulkRequest[], freshDocs: MgetResponseItem[]) { + for (const [req, freshDoc] of zip(conflictRequests, freshDocs)) { + if (!req?.op.index || !freshDoc) continue; + + // @ts-expect-error @elastic/elasticsearch _seq_no is not in the type! + const seqNo: number | undefined = freshDoc._seq_no; + // @ts-expect-error @elastic/elasticsearch _primary_term is not in the type! + const primaryTerm: number | undefined = freshDoc._primary_term; + + if (seqNo === undefined) throw new Error('Unexpected undefined seqNo'); + if (primaryTerm === undefined) throw new Error('Unexpected undefined primaryTerm'); + + req.op.index.if_seq_no = seqNo; + req.op.index.if_primary_term = primaryTerm; + } +} + +/** Get the latest version of the conflicted docs, with fields to refresh. */ +async function getFreshDocs( + esClient: ElasticsearchClient, + conflictRequests: NormalizedBulkRequest[] +): Promise { + const docs: Array<{ _id: string; _index: string }> = []; + + conflictRequests.forEach((req) => { + const [id, index] = [req.op.index?._id, req.op.index?._index]; + if (!id || !index) return; + + docs.push({ _id: id, _index: index }); + }); + + const mgetRes = await esClient.mget({ docs, _source_includes: REFRESH_FIELDS_ALL }); + + if (mgetRes.docs.length !== docs.length) { + throw new Error( + `Unexpected number of mget response docs; expected ${docs.length}, got ${mgetRes.docs.length}` + ); + } + + return mgetRes.docs; +} + +/** Return the bulk request, filtered to those requests that had conflicts. */ +function getConflictRequest( + bulkRequest: BulkRequest, + bulkResponse: BulkResponse +): NormalizedBulkRequest[] { + // first "normalize" the request from it's non-linear form + const request = normalizeRequest(bulkRequest); + + // maybe we didn't unwind it right ... + if (request.length !== bulkResponse.items.length) { + throw new Error('Unexpected number of bulk response items'); + } + + if (request.length === 0) return []; + + // we only want op: index where the status was 409 / conflict + const conflictRequest = zip(request, bulkResponse.items) + .filter(([_, res]) => res?.index?.status === 409) + .map(([req, _]) => req!); + + return conflictRequest; +} + +/** Convert a bulk request (op | doc)[] to an array of { op, doc }[] */ +function normalizeRequest(bulkRequest: BulkRequest) { + if (!bulkRequest.operations) return []; + const result: NormalizedBulkRequest[] = []; + + let index = 0; + while (index < bulkRequest.operations.length) { + // the "op" data + const op = bulkRequest.operations[index] as BulkOperationContainer; + + // now the "doc" data, if there is any (none for delete) + if (op.create || op.index || op.update) { + index++; + const doc = bulkRequest.operations[index]; + result.push({ op, doc }); + } else if (op.delete) { + // no doc for delete op + } else { + throw new Error(`Unsupported bulk operation: ${JSON.stringify(op)}`); + } + + index++; + } + + return result; +} + +interface ResponseStatsResult { + success: number; + conflicts: number; + errors: number; + messages: string[]; +} + +// generate a summary of the original bulk request attempt, for logging +function getResponseStats(bulkResponse: BulkResponse): ResponseStatsResult { + const stats: ResponseStatsResult = { success: 0, conflicts: 0, errors: 0, messages: [] }; + for (const item of bulkResponse.items) { + const op = item.create || item.index || item.update || item.delete; + if (op?.error) { + if (op?.status === 409 && op === item.index) { + stats.conflicts++; + } else { + stats.errors++; + stats.messages.push(op?.error?.reason || 'no bulk reason provided'); + } + } else { + stats.success++; + } + } + return stats; +} diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts index 5003acd160f29..72b3b7b34476f 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/alert_types.ts @@ -7,7 +7,7 @@ import { v4 as uuidv4 } from 'uuid'; import { Logger } from '@kbn/logging'; -import { CoreSetup } from '@kbn/core/server'; +import { CoreSetup, ElasticsearchClient } from '@kbn/core/server'; import { schema, TypeOf } from '@kbn/config-schema'; import { curry, range, times } from 'lodash'; import { @@ -941,6 +941,136 @@ function getAlwaysFiringAlertAsDataRuleType( }); } +function getWaitingRuleType(logger: Logger) { + const ParamsType = schema.object({ + source: schema.string(), + alerts: schema.number(), + }); + type ParamsType = TypeOf; + interface State extends RuleTypeState { + runCount?: number; + } + const id = 'test.waitingRule'; + + const result: RuleType< + ParamsType, + never, + State, + {}, + {}, + 'default', + 'recovered', + { runCount: number } + > = { + id, + name: 'Test: Rule that waits for a signal before finishing', + actionGroups: [{ id: 'default', name: 'Default' }], + producer: 'alertsFixture', + defaultActionGroupId: 'default', + minimumLicenseRequired: 'basic', + isExportable: true, + doesSetRecoveryContext: true, + validate: { params: ParamsType }, + alerts: { + context: id.toLowerCase(), + shouldWrite: true, + mappings: { + fieldMap: { + runCount: { required: false, type: 'long' }, + }, + }, + }, + async executor(alertExecutorOptions) { + const { services, state, params } = alertExecutorOptions; + const { source, alerts } = params; + + const alertsClient = services.alertsClient; + if (!alertsClient) throw new Error(`Expected alertsClient!`); + + const runCount = (state.runCount || 0) + 1; + const es = services.scopedClusterClient.asInternalUser; + + await sendSignal(logger, es, id, source, `rule-starting-${runCount}`); + await waitForSignal(logger, es, id, source, `rule-complete-${runCount}`); + + for (let i = 0; i < alerts; i++) { + alertsClient.report({ + id: `alert-${i}`, + actionGroup: 'default', + payload: { runCount }, + }); + } + + return { state: { runCount } }; + }, + }; + + return result; +} + +async function sendSignal( + logger: Logger, + es: ElasticsearchClient, + id: string, + source: string, + reference: string +) { + logger.info(`rule type ${id} sending signal ${reference}`); + await es.index({ index: ES_TEST_INDEX_NAME, refresh: 'true', body: { source, reference } }); +} + +async function waitForSignal( + logger: Logger, + es: ElasticsearchClient, + id: string, + source: string, + reference: string +) { + let docs: unknown[] = []; + for (let attempt = 0; attempt < 20; attempt++) { + docs = await getSignalDocs(es, source, reference); + if (docs.length > 0) { + logger.info(`rule type ${id} received signal ${reference}`); + break; + } + + logger.info(`rule type ${id} waiting for signal ${reference}`); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + if (docs.length === 0) { + throw new Error(`Expected to find docs with source ${source}`); + } +} + +async function getSignalDocs(es: ElasticsearchClient, source: string, reference: string) { + const body = { + query: { + bool: { + must: [ + { + term: { + source, + }, + }, + { + term: { + reference, + }, + }, + ], + }, + }, + }; + const params = { + index: ES_TEST_INDEX_NAME, + size: 1000, + _source: false, + body, + }; + const result = await es.search(params, { meta: true }); + return result?.body?.hits?.hits || []; +} + export function defineAlertTypes( core: CoreSetup, { alerting, ruleRegistry }: Pick, @@ -1162,4 +1292,5 @@ export function defineAlertTypes( alerting.registerType(getAlwaysFiringAlertAsDataRuleType(logger, { ruleRegistry })); alerting.registerType(getPatternFiringAutoRecoverFalseAlertType()); alerting.registerType(getPatternFiringAlertsAsDataRuleType()); + alerting.registerType(getWaitingRuleType(logger)); } diff --git a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts index 0809a4a5b71c7..7a257d214f26a 100644 --- a/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/plugins/alerts/server/plugin.ts @@ -88,6 +88,7 @@ export class FixturePlugin implements Plugin, b: SearchHit) { + return a._source!.kibana.alert.instance.id.localeCompare(b._source!.kibana.alert.instance.id); +} + +// eslint-disable-next-line import/no-default-export +export default function createAlertsAsDataInstallResourcesTest({ getService }: FtrProviderContext) { + const es = getService('es'); + const retry = getService('retry'); + const supertestWithoutAuth = getService('supertestWithoutAuth'); + const objectRemover = new ObjectRemover(supertestWithoutAuth); + const esTestIndexTool = new ESTestIndexTool(es, retry); + + describe('document conflicts during rule execution', () => { + before(async () => { + await esTestIndexTool.destroy(); + await esTestIndexTool.setup(); + }); + + after(async () => { + await objectRemover.removeAll(); + await esTestIndexTool.destroy(); + }); + + const ruleType = 'test.waitingRule'; + const aadIndex = `.alerts-${ruleType.toLowerCase()}.alerts-default`; + + describe(`should be handled for alerting framework based AaD`, () => { + it('for a single conflicted alert', async () => { + const source = uuidv4(); + const count = 1; + const params = { source, alerts: count }; + const createdRule = await supertestWithoutAuth + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + name: `${basename(__filename)} ${ruleType} ${source}}`, + rule_type_id: ruleType, + schedule: { interval: '1s' }, + throttle: null, + params, + actions: [], + }) + ); + + if (createdRule.status !== 200) { + log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`); + } + expect(createdRule.status).to.eql(200); + + const ruleId = createdRule.body.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + // this rule type uses esTextIndexTool documents to communicate + // with the rule executor. Once the rule starts executing, it + // "sends" `rule-starting-`, which this code waits for. It + // then updates the alert doc, and "sends" `rule-complete-`. + // which the rule executor is waiting on, to complete the rule + // execution. + log(`signal the rule to finish the first run`); + await esTestIndexTool.indexDoc(source, 'rule-complete-1'); + + log(`wait for the first alert doc to be created`); + const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count); + expect(initialDocs.length).to.eql(count); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-2'); + + log(`ad-hoc update the alert doc`); + await adHocUpdate(es, aadIndex, initialDocs[0]._id); + + log(`signal the rule to finish`); + await esTestIndexTool.indexDoc(source, 'rule-complete-2'); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-3'); + + log(`get the updated alert doc`); + const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count); + expect(updatedDocs.length).to.eql(1); + + log(`signal the rule to finish, then delete it`); + await esTestIndexTool.indexDoc(source, 'rule-complete-3'); + await objectRemover.removeAll(); + + // compare the initial and updated alert docs + compareAlertDocs(initialDocs[0], updatedDocs[0], true); + }); + + it('for a mix of successful and conflicted alerts', async () => { + const source = uuidv4(); + const count = 5; + const params = { source, alerts: count }; + const createdRule = await supertestWithoutAuth + .post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`) + .set('kbn-xsrf', 'foo') + .send( + getTestRuleData({ + name: `${basename(__filename)} ${ruleType} ${source}}`, + rule_type_id: ruleType, + schedule: { interval: '1s' }, + throttle: null, + params, + actions: [], + }) + ); + + if (createdRule.status !== 200) { + log(`error creating rule: ${JSON.stringify(createdRule, null, 4)}`); + } + expect(createdRule.status).to.eql(200); + + const ruleId = createdRule.body.id; + objectRemover.add(Spaces.space1.id, ruleId, 'rule', 'alerting'); + + log(`signal the rule to finish the first run`); + await esTestIndexTool.indexDoc(source, 'rule-complete-1'); + + log(`wait for the first alert doc to be created`); + const initialDocs = await waitForAlertDocs(aadIndex, ruleId, count); + initialDocs.sort(sortAlertDocsByInstanceId); + expect(initialDocs.length).to.eql(5); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-2'); + + log(`ad-hoc update the 2nd and 4th alert docs`); + await adHocUpdate(es, aadIndex, initialDocs[1]._id); + await adHocUpdate(es, aadIndex, initialDocs[3]._id); + + log(`signal the rule to finish`); + await esTestIndexTool.indexDoc(source, 'rule-complete-2'); + + log(`wait for the start of the next execution`); + await esTestIndexTool.waitForDocs(source, 'rule-starting-3'); + + log(`get the updated alert doc`); + const updatedDocs = await waitForAlertDocs(aadIndex, ruleId, count); + updatedDocs.sort(sortAlertDocsByInstanceId); + expect(updatedDocs.length).to.eql(5); + + log(`signal the rule to finish, then delete it`); + await esTestIndexTool.indexDoc(source, 'rule-complete-3'); + await objectRemover.removeAll(); + + // compare the initial and updated alert docs + compareAlertDocs(initialDocs[0], updatedDocs[0], false); + compareAlertDocs(initialDocs[1], updatedDocs[1], true); + compareAlertDocs(initialDocs[2], updatedDocs[2], false); + compareAlertDocs(initialDocs[3], updatedDocs[3], true); + compareAlertDocs(initialDocs[4], updatedDocs[4], false); + }); + }); + }); + + // waits for a specified number of alert documents + async function waitForAlertDocs( + index: string, + ruleId: string, + count: number = 1 + ): Promise>> { + return await retry.try(async () => { + const searchResult = await es.search({ + index, + size: count, + body: { + query: { + bool: { + must: [{ term: { 'kibana.alert.rule.uuid': ruleId } }], + }, + }, + }, + }); + + const docs = searchResult.hits.hits as Array>; + if (docs.length < count) throw new Error(`only ${docs.length} out of ${count} docs found`); + + return docs; + }); + } +} + +// general comparator for initial / updated alert documents +function compareAlertDocs( + initialDoc: SearchHit, + updatedDoc: SearchHit, + conflicted: boolean +) { + // ensure both rule run updates and other updates persisted + if (!initialDoc) throw new Error('not enough initial docs'); + if (!updatedDoc) throw new Error('not enough updated docs'); + + const initialAlert = initialDoc._source!; + const updatedAlert = updatedDoc._source!; + + expect(initialAlert.runCount).to.be.greaterThan(0); + expect(updatedAlert.runCount).not.to.eql(-1); + expect(updatedAlert.runCount).to.be.greaterThan(initialAlert.runCount); + + if (conflicted) { + expect(get(updatedAlert, 'kibana.alert.case_ids')).to.eql( + get(DocUpdate, 'kibana.alert.case_ids') + ); + expect(get(updatedAlert, 'kibana.alert.workflow_tags')).to.eql( + get(DocUpdate, 'kibana.alert.workflow_tags') + ); + expect(get(updatedAlert, 'kibana.alert.workflow_status')).to.eql( + get(DocUpdate, 'kibana.alert.workflow_status') + ); + + expect(get(initialAlert, 'kibana.alert.status')).to.be('active'); + expect(get(updatedAlert, 'kibana.alert.status')).to.be('untracked'); + } + + const initial = omit(initialAlert, SkipFields); + const updated = omit(updatedAlert, SkipFields); + + expect(initial).to.eql(updated); +} + +// perform an adhoc update to an alert doc +async function adHocUpdate(es: Client, index: string, id: string) { + const body = { doc: DocUpdate }; + await es.update({ index, id, body, refresh: true }); +} + +// we'll do the adhoc updates with this data +const DocUpdate = { + runCount: -1, // rule-specific field, will be overwritten by rule execution + kibana: { + alert: { + action_group: 'not-the-default', // will be overwritten by rule execution + // below are all fields that will NOT be overwritten by rule execution + workflow_status: 'a-ok!', + workflow_tags: ['fee', 'fi', 'fo', 'fum'], + case_ids: ['123', '456', '789'], + status: 'untracked', + }, + }, +}; + +const SkipFields = [ + // dynamically changing fields we have no control over + '@timestamp', + 'event.action', + 'kibana.alert.duration.us', + 'kibana.alert.flapping_history', + 'kibana.alert.rule.execution.uuid', + + // fields under our control we test separately + 'runCount', + 'kibana.alert.status', + 'kibana.alert.case_ids', + 'kibana.alert.workflow_tags', + 'kibana.alert.workflow_status', +]; + +function log(message: string) { + // eslint-disable-next-line no-console + console.log(`${new Date().toISOString()} ${message}`); +} diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts index 9156fb9e8ec37..20342e053016d 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/alerts_as_data/index.ts @@ -13,5 +13,6 @@ export default function alertsAsDataTests({ loadTestFile }: FtrProviderContext) loadTestFile(require.resolve('./install_resources')); loadTestFile(require.resolve('./alerts_as_data')); loadTestFile(require.resolve('./alerts_as_data_flapping')); + loadTestFile(require.resolve('./alerts_as_data_conflicts')); }); }