Skip to content

Commit

Permalink
[ResponseOps] resolve conflicts when updating alert docs after rule e…
Browse files Browse the repository at this point in the history
…xecution

resolves: elastic#158403

When conflicts are detected while updating alert docs after a rule
runs, we'll try to resolve the conflict by `mget()`'ing the alert
documents again, to get the updated OCC info `_seq_no` and
`_primary_term`.  We'll also get the current versions of "ad-hoc"
updated fields (which caused the conflict), like workflow status,
case assignments, etc.  And then attempt to update the alert doc again,
with that info, which should get it back up-to-date.

- hard-code the fields to refresh
- add skeletal version of a function test
- add some debugging for CI-only/not-local test failure
- change new rule type to wait for signal to finish
- a little clean up, no clue why this passes locally and fails in CI though
- dump rule type registry to see if rule type there
- remove diagnostic code from FT
- a real test that passes locally, for alerting framework
- add test for RR, but it's failing as it doesn't resolve conflicts yet
- fix some stuff, add support for upcoming untracked alert status
- change recover algorithm to retry subsequent times corectly
- remove RR support (not needed), so simplify other things
- remove more RR bits (TransportStuff) and add jest tests
- add multi-alert bulk update function test
- rebase main
  • Loading branch information
pmuellr committed Sep 22, 2023
1 parent e2a7157 commit c4bbddb
Show file tree
Hide file tree
Showing 8 changed files with 902 additions and 10 deletions.
24 changes: 15 additions & 9 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { ElasticsearchClient } from '@kbn/core/server';
import { ALERT_RULE_UUID, ALERT_UUID } from '@kbn/rule-data-utils';
import { BulkRequest } from '@elastic/elasticsearch/lib/api/types';
import { chunk, flatMap, isEmpty, keys } from 'lodash';
import { SearchRequest } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { Alert } from '@kbn/alerts-as-data-utils';
Expand Down Expand Up @@ -49,6 +50,7 @@ import {
getContinualAlertsQuery,
} from './lib';
import { isValidAlertIndexName } from '../alerts_service';
import { resolveAlertConflicts } from './lib/alert_conflict_resolver';

// Term queries can take up to 10,000 terms
const CHUNK_SIZE = 10000;
Expand Down Expand Up @@ -406,6 +408,13 @@ export class AlertsClient<
])
);

const bulkRequest: BulkRequest = {
refresh: 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
operations: bulkBody,
};

try {
const response = await esClient.bulk({
refresh: 'wait_for',
Expand All @@ -416,15 +425,12 @@ export class AlertsClient<

// If there were individual indexing errors, they will be returned in the success response
if (response && response.errors) {
const errorsInResponse = (response.items ?? [])
.map((item) => item?.index?.error || item?.create?.error)
.filter((item) => item != null);

this.options.logger.error(
`Error writing ${errorsInResponse.length} out of ${
alertsToIndex.length
} alerts - ${JSON.stringify(errorsInResponse)}`
);
await resolveAlertConflicts({
logger: this.options.logger,
esClient,
bulkRequest,
bulkResponse: response,
});
}
} catch (err) {
this.options.logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { loggingSystemMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import {
BulkRequest,
BulkResponse,
BulkResponseItem,
BulkOperationType,
} from '@elastic/elasticsearch/lib/api/types';

import { resolveAlertConflicts } from './alert_conflict_resolver';

const logger = loggingSystemMock.create().get();
const esClient = elasticsearchServiceMock.createElasticsearchClient();

const alertDoc = {
event: { action: 'active' },
kibana: {
alert: {
status: 'untracked',
workflow_status: 'a-ok!',
workflow_tags: ['fee', 'fi', 'fo', 'fum'],
case_ids: ['123', '456', '789'],
},
},
};

describe('alert_conflict_resolver', () => {
beforeEach(() => {
jest.resetAllMocks();
});

describe('is successful with', () => {
test('no bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('');
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).not.toHaveBeenCalled();
});

test('no errors in bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('c is is c is');
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).not.toHaveBeenCalled();
});

test('one conflicted doc', async () => {
const { bulkRequest, bulkResponse } = getReqRes('ic');

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(0, alertDoc)],
});

esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(0)],
});

await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });

expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing 1 out of 1 alerts - [{"message":"hallo"}]`
);
expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 1 conflicted alerts succeeded`
);
});
});
});

function getBulkResItem(id: number) {
return {
index: {
_index: `index-${id}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
status: 200,
},
};
}

function getMGetResDoc(id: number, doc: unknown) {
return {
_index: `index-${id}}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
found: true,
_source: doc,
};
}

interface GetReqResResult {
bulkRequest: BulkRequest<unknown, unknown>;
bulkResponse: BulkResponse;
}

/**
* takes as input a string of c, is, ic, ie tokens and builds appropriate
* bulk request and response objects
*/
function getReqRes(bulkOps: string): GetReqResResult {
const ops = bulkOps.trim().split(/\s+/g);

const bulkRequest = getBulkRequest();
const bulkResponse = getBulkResponse();

bulkRequest.operations = [];
bulkResponse.items = [];
bulkResponse.errors = false;

if (ops[0] === '') return { bulkRequest, bulkResponse };

const createOp = { create: {} };

let id = 0;
for (const op of ops) {
id++;
switch (op) {
// create, ignored
case 'c':
bulkRequest.operations.push(createOp, alertDoc);
bulkResponse.items.push(getResponseItem('create', id, false, 200));
break;

// index with success
case 'is':
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, false, 200));
break;

// index with conflict
case 'ic':
bulkResponse.errors = true;
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, true, 409));
break;

// index with error but not conflict
case 'ie':
bulkResponse.errors = true;
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, true, 418)); // I'm a teapot
break;

default:
throw new Error('bad input');
}
}

return { bulkRequest, bulkResponse };
}

function getBulkRequest(): BulkRequest<unknown, unknown> {
return {
refresh: 'wait_for',
index: 'some-index',
require_alias: true,
operations: [],
};
}

function getIndexOp(id: number) {
return {
index: {
_id: `id-${id}`,
_index: `index-${id}`,
if_seq_no: 17,
if_primary_term: 1,
require_alias: false,
},
};
}

function getBulkResponse(): BulkResponse {
return {
errors: false,
took: 0,
items: [],
};
}

function getResponseItem(
type: BulkOperationType,
id: number,
error: boolean,
status: number
): Partial<Record<BulkOperationType, BulkResponseItem>> {
if (error) {
return {
[type]: {
_index: `index-${id}`,
_id: `id-${id}`,
error: { message: 'hallo' },
status,
},
};
}

return {
[type]: {
_index: `index-${id}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
status: 200,
},
};
}
Loading

0 comments on commit c4bbddb

Please sign in to comment.