Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] refactored bulk update tags retry #147594

Merged
merged 13 commits into from
Dec 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,16 @@ describe('TagsAddRemove', () => {

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2', 'newTag'],
['newTag'],
[],
expect.anything(),
'Tag created',
'Tag creation failed'
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2'],
[],
expect.anything(),
'Tag created',
Expand All @@ -316,7 +325,16 @@ describe('TagsAddRemove', () => {
expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2', 'tag1'],
['tag1'],
expect.anything(),
undefined,
undefined
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2'],
expect.anything(),
undefined,
undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,10 @@ export const TagsAddRemove: React.FC<Props> = ({
errorMessage
);
} else {
// sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated
const updatedTagsToAdd = tagsToAdd.concat(
labels
.filter(
(tag) =>
tag.checked === 'on' &&
!selectedTags.includes(tag.label) &&
!tagsToRemove.includes(tag.label)
)
.map((tag) => tag.label)
);
const updatedTagsToRemove = tagsToRemove.concat(
labels
.filter(
(tag) =>
tag.checked !== 'on' &&
selectedTags.includes(tag.label) &&
!tagsToAdd.includes(tag.label)
)
.map((tag) => tag.label)
);

updateTagsHook.bulkUpdateTags(
agents!,
updatedTagsToAdd,
updatedTagsToRemove,
tagsToAdd,
tagsToRemove,
(hasCompleted) => handleTagsUpdated(tagsToAdd, tagsToRemove, hasCompleted),
successMessage,
errorMessage
Expand Down
15 changes: 9 additions & 6 deletions x-pack/plugins/fleet/server/services/agents/action_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ export async function getActionStatuses(
const nbAgentsActioned = action.nbAgentsActioned || action.nbAgentsActionCreated;
const cardinalityCount = (matchingBucket?.agent_count as any)?.value ?? 0;
const docCount = matchingBucket?.doc_count ?? 0;
const nbAgentsAck = Math.min(
docCount,
// only using cardinality count when count lower than precision threshold
docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount,
nbAgentsActioned
);
const nbAgentsAck =
action.type === 'UPDATE_TAGS'
? Math.min(docCount, nbAgentsActioned)
: Math.min(
docCount,
// only using cardinality count when count lower than precision threshold
docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: Why is cardinalityCount used, can't we always use the docCount here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cardinality was introduced for actions that can potentially be acked multiple times by agents e.g. upgrade. So we count acks by one agent once.

nbAgentsActioned
);
const completionTime = (matchingBucket?.max_timestamp as any)?.value_as_string;
const complete = nbAgentsAck >= nbAgentsActioned;
const cancelledAction = cancelledActions.find((a) => a.actionId === action.actionId);
Expand Down
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ export function getElasticsearchQuery(
kuery: string,
showInactive = false,
includeHosted = false,
hostedPolicies: string[] = []
hostedPolicies: string[] = [],
extraFilters: string[] = []
): estypes.QueryDslQueryContainer | undefined {
const filters = [];

Expand All @@ -171,6 +172,8 @@ export function getElasticsearchQuery(
filters.push('NOT (policy_id:{policyIds})'.replace('{policyIds}', hostedPolicies.join(',')));
}

filters.push(...extraFilters);

const kueryNode = _joinFilters(filters);
return kueryNode ? toElasticsearchQuery(kueryNode) : undefined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/serv

import { createClientMock } from './action.mock';
import { updateAgentTags } from './update_agent_tags';
import { updateTagsBatch } from './update_agent_tags_action_runner';

jest.mock('../app_context', () => {
return {
Expand All @@ -28,6 +29,7 @@ jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
list: jest.fn().mockResolvedValue({ items: [] }),
},
};
});
Expand Down Expand Up @@ -73,7 +75,7 @@ describe('update_agent_tags', () => {

expect(esClient.updateByQuery).toHaveBeenCalledWith(
expect.objectContaining({
conflicts: 'abort',
conflicts: 'proceed',
index: '.fleet-agents',
query: { terms: { _id: ['agent1'] } },
script: expect.objectContaining({
Expand All @@ -90,6 +92,9 @@ describe('update_agent_tags', () => {
});

it('should update action results on success', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 1, total: 1 } as any);

await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []);

const agentAction = esClient.create.mock.calls[0][0] as any;
Expand All @@ -110,11 +115,32 @@ describe('update_agent_tags', () => {
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should write error action results for hosted agent when agentIds are passed', async () => {
it('should update action results on success - kuery', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
}
);

const actionResults = esClient.bulk.mock.calls[0][0] as any;
const agentIds = actionResults?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(agentIds[0]).toHaveLength(36); // uuid
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should skip hosted agent from total when agentIds are passed', async () => {
const { esClient: esClientMock, agentInHostedDoc } = createClientMock();

esClientMock.updateByQuery.mockReset();
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: '0' } as any);
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 0 } as any);

await updateAgentTags(
soClient,
Expand All @@ -130,13 +156,9 @@ describe('update_agent_tags', () => {
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 1,
total: 0,
})
);

const errorResults = esClientMock.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].agent_id).toEqual(agentInHostedDoc._id);
expect(errorResults.body[1].error).toEqual('Cannot modify tags on a hosted agent');
});

it('should write error action results when failures are returned', async () => {
Expand All @@ -152,6 +174,46 @@ describe('update_agent_tags', () => {
expect(errorResults.body[1].error).toEqual('error reason');
});

it('should throw error on version conflicts', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [],
updated: 0,
version_conflicts: 100,
} as any);

await expect(
updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], [])
).rejects.toThrowError('version conflict of 100 agents');
});

it('should write out error results on last retry with version conflicts', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [],
updated: 0,
version_conflicts: 100,
} as any);

await expect(
updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
total: 100,
retryCount: 3,
}
)
).rejects.toThrowError('version conflict of 100 agents');
const errorResults = esClient.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].error).toEqual('version conflict on 3rd retry');
});

it('should run add tags async when actioning more agents than batch size', async () => {
esClient.search.mockResolvedValue({
hits: {
Expand Down Expand Up @@ -180,4 +242,79 @@ describe('update_agent_tags', () => {

expect(mockRunAsync).toHaveBeenCalled();
});

it('should add tags filter if only one tag to add', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(updateByQuery.query).toEqual({
bool: {
filter: [
{ bool: { minimum_should_match: 1, should: [{ match: { active: true } }] } },
{
bool: {
must_not: { bool: { minimum_should_match: 1, should: [{ match: { tags: 'new' } }] } },
},
},
],
},
});
});

it('should add tags filter if only one tag to remove', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: [],
tagsToRemove: ['remove'],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(JSON.stringify(updateByQuery.query)).toContain(
'{"bool":{"should":[{"match":{"tags":"remove"}}],"minimum_should_match":1}}'
);
});

it('should write total from updateByQuery result if query returns less results', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 50 } as any);

await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
total: 100,
}
);

const agentAction = esClient.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 50,
})
);
});
});
41 changes: 13 additions & 28 deletions x-pack/plugins/fleet/server/services/agents/update_agent_tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/
import type { Agent } from '../../types';
import { AgentReassignmentError } from '../../errors';

import { SO_SEARCH_LIMIT } from '../../constants';

import { getAgentDocuments, getAgentsByKuery } from './crud';
import { getAgentDocuments } from './crud';
import type { GetAgentsOptions } from '.';
import { searchHitToAgent } from './helpers';
import { UpdateAgentTagsActionRunner, updateTagsBatch } from './update_agent_tags_action_runner';
Expand All @@ -30,7 +28,7 @@ export async function updateAgentTags(
tagsToRemove: string[]
): Promise<{ actionId: string }> {
const outgoingErrors: Record<Agent['id'], Error> = {};
let givenAgents: Agent[] = [];
const givenAgents: Agent[] = [];

if ('agentIds' in options) {
const givenAgentsResults = await getAgentDocuments(esClient, options.agentIds);
Expand All @@ -44,30 +42,17 @@ export async function updateAgentTags(
}
}
} else if ('kuery' in options) {
const batchSize = options.batchSize ?? SO_SEARCH_LIMIT;
const res = await getAgentsByKuery(esClient, {
kuery: options.kuery,
showInactive: options.showInactive ?? false,
page: 1,
perPage: batchSize,
});
if (res.total <= batchSize) {
givenAgents = res.agents;
} else {
return await new UpdateAgentTagsActionRunner(
esClient,
soClient,
{
...options,
batchSize,
total: res.total,
kuery: options.kuery,
tagsToAdd,
tagsToRemove,
},
{ pitId: '' }
).runActionAsyncWithRetry();
}
return await new UpdateAgentTagsActionRunner(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplified the logic to use retry for all update tags kuery actions
as reported here, the version conflict happened even with less than 10k agents, which didn't retry before

Could reproduce in an ECE instance by adding a tag on 5k horde agents and getting this response from bulk API:

{"statusCode":500,"error":"Internal Server Error","message":"version conflict of 1865 agents"}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tested on pr cloud deployment that 5000 agents update tags is done successfully (async)
image

esClient,
soClient,
{
...options,
kuery: options.kuery,
tagsToAdd,
tagsToRemove,
},
{ pitId: '' }
).runActionAsyncWithRetry();
}

return await updateTagsBatch(soClient, esClient, givenAgents, outgoingErrors, {
Expand Down
Loading