Skip to content

Commit

Permalink
[ResponseOps] resolve conflicts when updating alert docs after rule e…
Browse files Browse the repository at this point in the history
…xecution (#166283)

resolves: #158403

When conflicts are detected while updating alert docs after a rule runs,
we'll try to resolve the conflict by `mget()`'ing the alert documents
again, to get the updated OCC info `_seq_no` and `_primary_term`. We'll
also get the current versions of "ad-hoc" updated fields (which caused
the conflict), like workflow status, case assignments, etc. And then
attempt to update the alert doc again, with that info, which should get
it back up-to-date.

Note that the rule registry was not touched here. During this PR's
development, I added the retry support to it, but then my function tests
were failing because there were never any conflicts happening. Turns out
rule registry mget's the alerts before it updates them, to get the
latest values. So they won't need this fix.

It's also not clear to me if this can be exercised in serverless, since
it requires the use of an alerting framework based AaD implementation
AND the ability to ad-hoc update alerts. I think this can only be done
with Elasticsearch Query and Index Threshold, and only when used in
metrics scope, so it will show up in the metrics UX, which is where you
can add the alerts to the case.

## manual testing

It's hard! I've seen the conflict messages before, but it's quite
difficult to get them to go off whenever you want. The basic idea is to
get a rule that uses alerting framework AAD (not rule registry, which is
not affected the same way with conflicts (they mget alerts right before
updating them), set it to run on a `1s` interval, and probably also
configure TM to run a `1s` interval, via the following configs:

```
xpack.alerting.rules.minimumScheduleInterval.value: "1s"
xpack.task_manager.poll_interval: 1000
```

You want to get the rule to execute often and generate a lot of alerts,
and run for as long as possible. Then while it's running, add the
generated alerts to cases. Here's the EQ rule definition I used:


![image](https://github.com/elastic/kibana/assets/25117/56c69d50-a76c-48d4-9a45-665a0008b248)

I selected the alerts from the o11y alerts page, since you can't add
alerts to cases from the stack page. Hmm. :-). Sort the alert list by
low-high duration, so the newest alerts will be at the top. Refresh,
select all the rules (set page to show 100), then add to case from the
`...` menu. If you force a conflict, you should see something like this
in the Kibana logs:

```
[ERROR] [plugins.alerting] Error writing alerts: 168 successful, 100 conflicts, 0 errors:
[INFO ] [plugins.alerting] Retrying bulk update of 100 conflicted alerts
[INFO ] [plugins.alerting] Retried bulk update of 100 conflicted alerts succeeded
```

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
pmuellr and kibanamachine authored Sep 28, 2023
1 parent 3ad5add commit e6e3e2d
Show file tree
Hide file tree
Showing 9 changed files with 1,039 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ describe('Alerts Client', () => {

expect(clusterClient.bulk).toHaveBeenCalled();
expect(logger.error).toHaveBeenCalledWith(
`Error writing 1 out of 2 alerts - [{\"type\":\"action_request_validation_exception\",\"reason\":\"Validation Failed: 1: index is missing;2: type is missing;\"}]`
`Error writing alerts: 1 successful, 0 conflicts, 1 errors: Validation Failed: 1: index is missing;2: type is missing;`
);
});

Expand Down
21 changes: 12 additions & 9 deletions x-pack/plugins/alerting/server/alerts_client/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
getContinualAlertsQuery,
} from './lib';
import { isValidAlertIndexName } from '../alerts_service';
import { resolveAlertConflicts } from './lib/alert_conflict_resolver';

// Term queries can take up to 10,000 terms
const CHUNK_SIZE = 10000;
Expand Down Expand Up @@ -467,15 +468,17 @@ export class AlertsClient<

// If there were individual indexing errors, they will be returned in the success response
if (response && response.errors) {
const errorsInResponse = (response.items ?? [])
.map((item) => item?.index?.error || item?.create?.error)
.filter((item) => item != null);

this.options.logger.error(
`Error writing ${errorsInResponse.length} out of ${
alertsToIndex.length
} alerts - ${JSON.stringify(errorsInResponse)}`
);
await resolveAlertConflicts({
logger: this.options.logger,
esClient,
bulkRequest: {
refresh: 'wait_for',
index: this.indexTemplateAndPattern.alias,
require_alias: !this.isUsingDataStreams(),
operations: bulkBody,
},
bulkResponse: response,
});
}
} catch (err) {
this.options.logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { loggingSystemMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import {
BulkRequest,
BulkResponse,
BulkResponseItem,
BulkOperationType,
} from '@elastic/elasticsearch/lib/api/types';

import { resolveAlertConflicts } from './alert_conflict_resolver';

const logger = loggingSystemMock.create().get();
const esClient = elasticsearchServiceMock.createElasticsearchClient();

const alertDoc = {
event: { action: 'active' },
kibana: {
alert: {
status: 'untracked',
workflow_status: 'a-ok!',
workflow_tags: ['fee', 'fi', 'fo', 'fum'],
case_ids: ['123', '456', '789'],
},
},
};

describe('alert_conflict_resolver', () => {
beforeEach(() => {
jest.resetAllMocks();
});

describe('handles errors gracefully', () => {
test('when mget fails', async () => {
const { bulkRequest, bulkResponse } = getReqRes('ic');

esClient.mget.mockRejectedValueOnce(new Error('mget failed'));

await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });

expect(logger.error).toHaveBeenNthCalledWith(
2,
'Error resolving alert conflicts: mget failed'
);
});

test('when bulk fails', async () => {
const { bulkRequest, bulkResponse } = getReqRes('ic');

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(0, alertDoc)],
});
esClient.bulk.mockRejectedValueOnce(new Error('bulk failed'));

await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });

expect(logger.error).toHaveBeenNthCalledWith(
2,
'Error resolving alert conflicts: bulk failed'
);
});
});

describe('is successful with', () => {
test('no bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('');
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).not.toHaveBeenCalled();
});

test('no errors in bulk results', async () => {
const { bulkRequest, bulkResponse } = getReqRes('c is is c is');
await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });
expect(logger.error).not.toHaveBeenCalled();
});

test('one conflicted doc', async () => {
const { bulkRequest, bulkResponse } = getReqRes('ic');

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(0, alertDoc)],
});

esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(0)],
});

await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });

expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts: 0 successful, 1 conflicts, 0 errors: `
);
expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 1 conflicted alerts succeeded`
);
});

test('one conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ie');

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc)],
});

esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(2)],
});

await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });

expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts: 2 successful, 1 conflicts, 1 errors: hallo`
);
expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 1 conflicted alerts`);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 1 conflicted alerts succeeded`
);
});

test('multiple conflicted doc amonst other successes and errors', async () => {
const { bulkRequest, bulkResponse } = getReqRes('is c ic ic ie ic');

esClient.mget.mockResolvedValueOnce({
docs: [getMGetResDoc(2, alertDoc), getMGetResDoc(3, alertDoc), getMGetResDoc(5, alertDoc)],
});

esClient.bulk.mockResolvedValueOnce({
errors: false,
took: 0,
items: [getBulkResItem(2), getBulkResItem(3), getBulkResItem(5)],
});

await resolveAlertConflicts({ logger, esClient, bulkRequest, bulkResponse });

expect(logger.error).toHaveBeenNthCalledWith(
1,
`Error writing alerts: 2 successful, 3 conflicts, 1 errors: hallo`
);
expect(logger.info).toHaveBeenNthCalledWith(1, `Retrying bulk update of 3 conflicted alerts`);
expect(logger.info).toHaveBeenNthCalledWith(
2,
`Retried bulk update of 3 conflicted alerts succeeded`
);
});
});
});

function getBulkResItem(id: number) {
return {
index: {
_index: `index-${id}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
status: 200,
},
};
}

function getMGetResDoc(id: number, doc: unknown) {
return {
_index: `index-${id}}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
found: true,
_source: doc,
};
}

interface GetReqResResult {
bulkRequest: BulkRequest<unknown, unknown>;
bulkResponse: BulkResponse;
}

/**
* takes as input a string of c, is, ic, ie tokens and builds appropriate
* bulk request and response objects to use in the tests:
* - c: create, ignored by the resolve logic
* - is: index with success
* - ic: index with conflict
* - ie: index with error but not conflict
*/
function getReqRes(bulkOps: string): GetReqResResult {
const ops = bulkOps.trim().split(/\s+/g);

const bulkRequest = getBulkRequest();
const bulkResponse = getBulkResponse();

bulkRequest.operations = [];
bulkResponse.items = [];
bulkResponse.errors = false;

if (ops[0] === '') return { bulkRequest, bulkResponse };

const createOp = { create: {} };

let id = 0;
for (const op of ops) {
id++;
switch (op) {
// create, ignored by the resolve logic
case 'c':
bulkRequest.operations.push(createOp, alertDoc);
bulkResponse.items.push(getResponseItem('create', id, false, 200));
break;

// index with success
case 'is':
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, false, 200));
break;

// index with conflict
case 'ic':
bulkResponse.errors = true;
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, true, 409));
break;

// index with error but not conflict
case 'ie':
bulkResponse.errors = true;
bulkRequest.operations.push(getIndexOp(id), alertDoc);
bulkResponse.items.push(getResponseItem('index', id, true, 418)); // I'm a teapot
break;

// developer error
default:
throw new Error('bad input');
}
}

return { bulkRequest, bulkResponse };
}

function getBulkRequest(): BulkRequest<unknown, unknown> {
return {
refresh: 'wait_for',
index: 'some-index',
require_alias: true,
operations: [],
};
}

function getIndexOp(id: number) {
return {
index: {
_id: `id-${id}`,
_index: `index-${id}`,
if_seq_no: 17,
if_primary_term: 1,
require_alias: false,
},
};
}

function getBulkResponse(): BulkResponse {
return {
errors: false,
took: 0,
items: [],
};
}

function getResponseItem(
type: BulkOperationType,
id: number,
error: boolean,
status: number
): Partial<Record<BulkOperationType, BulkResponseItem>> {
if (error) {
return {
[type]: {
_index: `index-${id}`,
_id: `id-${id}`,
error: { reason: 'hallo' },
status,
},
};
}

return {
[type]: {
_index: `index-${id}`,
_id: `id-${id}`,
_seq_no: 18,
_primary_term: 1,
status: 200,
},
};
}
Loading

0 comments on commit e6e3e2d

Please sign in to comment.