Skip to content

Commit

Permalink
[Fleet] refactored bulk update tags retry (#147594)
Browse files Browse the repository at this point in the history
## Summary

Fixes #144161

As discussed
[here](#144161 (comment)),
the existing implementation of update tags doesn't work well with real
agents, as there are many conflicts with checkin, even when trying to
add/remove one tag.
Refactored the logic to make retries more efficient:
- Instead of aborting the whole bulk action on conflicts, changed the
conflict strategy to 'proceed'. This means, if an action of 50k agents
has 1k conflicts, not all 50k is retried, but only the 1k conflicts,
this makes it less likely to conflict on retry.
- Because of this, on retry we have to know which agents don't yet have
the tag added/removed. For this, added an additional filter to the
`updateByQuery` request. Only adding the filter if there is exactly one
`tagsToAdd` or one `tagsToRemove`. This is the main use case from the
UI, and handling other cases would complicate the logic more (each
additional tag to add/remove would result in another OR query, which
would match more agents, making conflicts more likely).
- Added this additional query on the initial request as well (not only
retries) to save on unnecessary work e.g. if the user tries to add a tag
on 50k agents, but 48k already have it, it is enough to update the
remaining 2k agents.
- This improvement has the effect that 'Agent activity' shows the real
updated agent count, not the total selected. I think this is not really
a problem for update tags.
- Cleaned up some of the UI logic, because the conflicts are fully
handled now on the backend.
- Locally I couldn't reproduce the conflict with agent checkins, even
with 1k horde agents. I'll try to test in cloud with more real agents.

To verify:
- Enroll 50k agents (I used 50k with create_agents script, and 1k with
horde). Enroll 50k with horde if possible.
- Select all on UI and try to add/remove one or more tags
- Expect the changes to propagate quickly (up to 1m). It might take a
few refreshes to see the result on agent list and tags list, because the
UI polls the agents every 30s. It is expected that the tags list
temporarily shows incorrect data because the action is async.

E.g. removed `test3` tag and added `add` tag quickly:
<img width="1776" alt="image"
src="https://user-images.githubusercontent.com/90178898/207824481-411f0f70-d7e8-42a6-b73f-ed80e77b7700.png">
<img width="422" alt="image"
src="https://user-images.githubusercontent.com/90178898/207824550-582d43fc-87db-45e1-ba58-15915447fefd.png">

The logs show the details of how many `version_conflicts` were there,
and it decreased with retries.

```
[2022-12-15T10:32:12.937+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 90acd541-19ac-4738-b3d3-db32789233de, total agents: 52000
[2022-12-15T10:32:12.981+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:16.477+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 29e9da70-7194-4e52-8004-2c1b19f6dfd5, total agents: 52000
[2022-12-15T10:32:16.537+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:29e9da70-7194-4e52-8004-2c1b19f6dfd5
[2022-12-15T10:32:22.893+01:00][DEBUG][plugins.fleet] {"took":9886,"timed_out":false,"total":52000,"updated":41143,"deleted":0,"batches":52,"version_conflicts":10857,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]}
[2022-12-15T10:32:26.066+01:00][DEBUG][plugins.fleet] {"took":9518,"timed_out":false,"total":52000,"updated":25755,"deleted":0,"batches":52,"version_conflicts":26245,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]}
[2022-12-15T10:32:27.401+01:00][ERROR][plugins.fleet] Action failed: version conflict of 10857 agents
[2022-12-15T10:32:27.461+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:27.462+01:00][INFO ][plugins.fleet] Retrying in task: fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:29.274+01:00][ERROR][plugins.fleet] Action failed: version conflict of 26245 agents
[2022-12-15T10:32:29.353+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:29e9da70-7194-4e52-8004-2c1b19f6dfd5
[2022-12-15T10:32:29.353+01:00][INFO ][plugins.fleet] Retrying in task: fleet:update_agent_tags:retry:29e9da70-7194-4e52-8004-2c1b19f6dfd5
[2022-12-15T10:32:31.480+01:00][INFO ][plugins.fleet] Running bulk action retry task
[2022-12-15T10:32:31.481+01:00][DEBUG][plugins.fleet] Retry #1 of task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:31.481+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 90acd541-19ac-4738-b3d3-db32789233de, total agents: 52000
[2022-12-15T10:32:31.481+01:00][INFO ][plugins.fleet] Completed bulk action retry task
[2022-12-15T10:32:31.485+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:33.841+01:00][DEBUG][plugins.fleet] {"took":2347,"timed_out":false,"total":10857,"updated":9857,"deleted":0,"batches":11,"version_conflicts":1000,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]}
[2022-12-15T10:32:34.556+01:00][INFO ][plugins.fleet] Running bulk action retry task
[2022-12-15T10:32:34.557+01:00][DEBUG][plugins.fleet] Retry #1 of task fleet:update_agent_tags:retry:29e9da70-7194-4e52-8004-2c1b19f6dfd5
[2022-12-15T10:32:34.557+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 29e9da70-7194-4e52-8004-2c1b19f6dfd5, total agents: 52000
[2022-12-15T10:32:34.557+01:00][INFO ][plugins.fleet] Completed bulk action retry task
[2022-12-15T10:32:34.560+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:29e9da70-7194-4e52-8004-2c1b19f6dfd5
[2022-12-15T10:32:35.388+01:00][ERROR][plugins.fleet] Retry #1 of task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de failed: version conflict of 1000 agents
[2022-12-15T10:32:35.468+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:35.468+01:00][INFO ][plugins.fleet] Retrying in task: fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de
{"took":5509,"timed_out":false,"total":26245,"updated":26245,"deleted":0,"batches":27,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]}
[2022-12-15T10:32:42.722+01:00][INFO ][plugins.fleet] processed 26245 agents, took 5509ms
[2022-12-15T10:32:42.723+01:00][INFO ][plugins.fleet] Removing task fleet:update_agent_tags:retry:check:29e9da70-7194-4e52-8004-2c1b19f6dfd5
[2022-12-15T10:32:46.705+01:00][INFO ][plugins.fleet] Running bulk action retry task
[2022-12-15T10:32:46.706+01:00][DEBUG][plugins.fleet] Retry #2 of task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:46.707+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 90acd541-19ac-4738-b3d3-db32789233de, total agents: 52000
[2022-12-15T10:32:46.707+01:00][INFO ][plugins.fleet] Completed bulk action retry task
[2022-12-15T10:32:46.711+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de
[2022-12-15T10:32:47.099+01:00][DEBUG][plugins.fleet] {"took":379,"timed_out":false,"total":1000,"updated":1000,"deleted":0,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]}
[2022-12-15T10:32:47.623+01:00][INFO ][plugins.fleet] processed 1000 agents, took 379ms
[2022-12-15T10:32:47.623+01:00][INFO ][plugins.fleet] Removing task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de
```

### Checklist

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
juliaElastic and kibanamachine authored Dec 20, 2022
1 parent 03bc7ae commit 687987a
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,16 @@ describe('TagsAddRemove', () => {

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2', 'newTag'],
['newTag'],
[],
expect.anything(),
'Tag created',
'Tag creation failed'
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2'],
[],
expect.anything(),
'Tag created',
Expand All @@ -316,7 +325,16 @@ describe('TagsAddRemove', () => {
expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2', 'tag1'],
['tag1'],
expect.anything(),
undefined,
undefined
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2'],
expect.anything(),
undefined,
undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,10 @@ export const TagsAddRemove: React.FC<Props> = ({
errorMessage
);
} else {
// sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated
const updatedTagsToAdd = tagsToAdd.concat(
labels
.filter(
(tag) =>
tag.checked === 'on' &&
!selectedTags.includes(tag.label) &&
!tagsToRemove.includes(tag.label)
)
.map((tag) => tag.label)
);
const updatedTagsToRemove = tagsToRemove.concat(
labels
.filter(
(tag) =>
tag.checked !== 'on' &&
selectedTags.includes(tag.label) &&
!tagsToAdd.includes(tag.label)
)
.map((tag) => tag.label)
);

updateTagsHook.bulkUpdateTags(
agents!,
updatedTagsToAdd,
updatedTagsToRemove,
tagsToAdd,
tagsToRemove,
(hasCompleted) => handleTagsUpdated(tagsToAdd, tagsToRemove, hasCompleted),
successMessage,
errorMessage
Expand Down
6 changes: 4 additions & 2 deletions x-pack/plugins/fleet/server/services/agents/action_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import { getAgentActions } from './actions';
import { closePointInTime, getAgentsByKuery } from './crud';
import type { BulkActionsResolver } from './bulk_actions_resolver';

export const MAX_RETRY_COUNT = 3;

export interface ActionParams {
kuery: string;
showInactive?: boolean;
Expand Down Expand Up @@ -110,8 +112,8 @@ export abstract class ActionRunner {
`Retry #${this.retryParams.retryCount} of task ${this.retryParams.taskId} failed: ${error.message}`
);

if (this.retryParams.retryCount === 3) {
const errorMessage = 'Stopping after 3rd retry. Error: ' + error.message;
if (this.retryParams.retryCount === MAX_RETRY_COUNT) {
const errorMessage = `Stopping after ${MAX_RETRY_COUNT}rd retry. Error: ${error.message}`;
appContextService.getLogger().warn(errorMessage);

// clean up tasks after 3rd retry reached
Expand Down
15 changes: 9 additions & 6 deletions x-pack/plugins/fleet/server/services/agents/action_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ export async function getActionStatuses(
const nbAgentsActioned = action.nbAgentsActioned || action.nbAgentsActionCreated;
const cardinalityCount = (matchingBucket?.agent_count as any)?.value ?? 0;
const docCount = matchingBucket?.doc_count ?? 0;
const nbAgentsAck = Math.min(
docCount,
// only using cardinality count when count lower than precision threshold
docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount,
nbAgentsActioned
);
const nbAgentsAck =
action.type === 'UPDATE_TAGS'
? Math.min(docCount, nbAgentsActioned)
: Math.min(
docCount,
// only using cardinality count when count lower than precision threshold
docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount,
nbAgentsActioned
);
const completionTime = (matchingBucket?.max_timestamp as any)?.value_as_string;
const complete = nbAgentsAck >= nbAgentsActioned;
const cancelledAction = cancelledActions.find((a) => a.actionId === action.actionId);
Expand Down
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ export function getElasticsearchQuery(
kuery: string,
showInactive = false,
includeHosted = false,
hostedPolicies: string[] = []
hostedPolicies: string[] = [],
extraFilters: string[] = []
): estypes.QueryDslQueryContainer | undefined {
const filters = [];

Expand All @@ -171,6 +172,8 @@ export function getElasticsearchQuery(
filters.push('NOT (policy_id:{policyIds})'.replace('{policyIds}', hostedPolicies.join(',')));
}

filters.push(...extraFilters);

const kueryNode = _joinFilters(filters);
return kueryNode ? toElasticsearchQuery(kueryNode) : undefined;
}
Expand Down
153 changes: 145 additions & 8 deletions x-pack/plugins/fleet/server/services/agents/update_agent_tags.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/serv

import { createClientMock } from './action.mock';
import { updateAgentTags } from './update_agent_tags';
import { updateTagsBatch } from './update_agent_tags_action_runner';

jest.mock('../app_context', () => {
return {
Expand All @@ -28,6 +29,7 @@ jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
list: jest.fn().mockResolvedValue({ items: [] }),
},
};
});
Expand Down Expand Up @@ -73,7 +75,7 @@ describe('update_agent_tags', () => {

expect(esClient.updateByQuery).toHaveBeenCalledWith(
expect.objectContaining({
conflicts: 'abort',
conflicts: 'proceed',
index: '.fleet-agents',
query: { terms: { _id: ['agent1'] } },
script: expect.objectContaining({
Expand All @@ -90,6 +92,9 @@ describe('update_agent_tags', () => {
});

it('should update action results on success', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 1, total: 1 } as any);

await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []);

const agentAction = esClient.create.mock.calls[0][0] as any;
Expand All @@ -110,11 +115,32 @@ describe('update_agent_tags', () => {
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should write error action results for hosted agent when agentIds are passed', async () => {
it('should update action results on success - kuery', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
}
);

const actionResults = esClient.bulk.mock.calls[0][0] as any;
const agentIds = actionResults?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(agentIds[0]).toHaveLength(36); // uuid
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should skip hosted agent from total when agentIds are passed', async () => {
const { esClient: esClientMock, agentInHostedDoc } = createClientMock();

esClientMock.updateByQuery.mockReset();
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: '0' } as any);
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 0 } as any);

await updateAgentTags(
soClient,
Expand All @@ -130,13 +156,9 @@ describe('update_agent_tags', () => {
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 1,
total: 0,
})
);

const errorResults = esClientMock.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].agent_id).toEqual(agentInHostedDoc._id);
expect(errorResults.body[1].error).toEqual('Cannot modify tags on a hosted agent');
});

it('should write error action results when failures are returned', async () => {
Expand All @@ -152,6 +174,46 @@ describe('update_agent_tags', () => {
expect(errorResults.body[1].error).toEqual('error reason');
});

it('should throw error on version conflicts', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [],
updated: 0,
version_conflicts: 100,
} as any);

await expect(
updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], [])
).rejects.toThrowError('version conflict of 100 agents');
});

it('should write out error results on last retry with version conflicts', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [],
updated: 0,
version_conflicts: 100,
} as any);

await expect(
updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
total: 100,
retryCount: 3,
}
)
).rejects.toThrowError('version conflict of 100 agents');
const errorResults = esClient.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].error).toEqual('version conflict on 3rd retry');
});

it('should run add tags async when actioning more agents than batch size', async () => {
esClient.search.mockResolvedValue({
hits: {
Expand Down Expand Up @@ -180,4 +242,79 @@ describe('update_agent_tags', () => {

expect(mockRunAsync).toHaveBeenCalled();
});

it('should add tags filter if only one tag to add', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(updateByQuery.query).toEqual({
bool: {
filter: [
{ bool: { minimum_should_match: 1, should: [{ match: { active: true } }] } },
{
bool: {
must_not: { bool: { minimum_should_match: 1, should: [{ match: { tags: 'new' } }] } },
},
},
],
},
});
});

it('should add tags filter if only one tag to remove', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: [],
tagsToRemove: ['remove'],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(JSON.stringify(updateByQuery.query)).toContain(
'{"bool":{"should":[{"match":{"tags":"remove"}}],"minimum_should_match":1}}'
);
});

it('should write total from updateByQuery result if query returns less results', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 50 } as any);

await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
total: 100,
}
);

const agentAction = esClient.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 50,
})
);
});
});
Loading

0 comments on commit 687987a

Please sign in to comment.