Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] refactored bulk update tags retry #147594

Merged
merged 13 commits into from
Dec 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,16 @@ describe('TagsAddRemove', () => {

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2', 'newTag'],
['newTag'],
[],
expect.anything(),
'Tag created',
'Tag creation failed'
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2'],
[],
expect.anything(),
'Tag created',
Expand All @@ -316,7 +325,16 @@ describe('TagsAddRemove', () => {
expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2', 'tag1'],
['tag1'],
expect.anything(),
undefined,
undefined
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2'],
expect.anything(),
undefined,
undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,10 @@ export const TagsAddRemove: React.FC<Props> = ({
errorMessage
);
} else {
// sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated
const updatedTagsToAdd = tagsToAdd.concat(
labels
.filter(
(tag) =>
tag.checked === 'on' &&
!selectedTags.includes(tag.label) &&
!tagsToRemove.includes(tag.label)
)
.map((tag) => tag.label)
);
const updatedTagsToRemove = tagsToRemove.concat(
labels
.filter(
(tag) =>
tag.checked !== 'on' &&
selectedTags.includes(tag.label) &&
!tagsToAdd.includes(tag.label)
)
.map((tag) => tag.label)
);

updateTagsHook.bulkUpdateTags(
agents!,
updatedTagsToAdd,
updatedTagsToRemove,
tagsToAdd,
tagsToRemove,
(hasCompleted) => handleTagsUpdated(tagsToAdd, tagsToRemove, hasCompleted),
successMessage,
errorMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import { getAgentActions } from './actions';
import { closePointInTime, getAgentsByKuery } from './crud';
import type { BulkActionsResolver } from './bulk_actions_resolver';

export const MAX_RETRY_COUNT = 3;

export interface ActionParams {
kuery: string;
showInactive?: boolean;
Expand Down Expand Up @@ -110,8 +112,8 @@ export abstract class ActionRunner {
`Retry #${this.retryParams.retryCount} of task ${this.retryParams.taskId} failed: ${error.message}`
);

if (this.retryParams.retryCount === 3) {
const errorMessage = 'Stopping after 3rd retry. Error: ' + error.message;
if (this.retryParams.retryCount === MAX_RETRY_COUNT) {
const errorMessage = `Stopping after ${MAX_RETRY_COUNT}rd retry. Error: ${error.message}`;
appContextService.getLogger().warn(errorMessage);

// clean up tasks after 3rd retry reached
Expand Down
15 changes: 9 additions & 6 deletions x-pack/plugins/fleet/server/services/agents/action_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ export async function getActionStatuses(
const nbAgentsActioned = action.nbAgentsActioned || action.nbAgentsActionCreated;
const cardinalityCount = (matchingBucket?.agent_count as any)?.value ?? 0;
const docCount = matchingBucket?.doc_count ?? 0;
const nbAgentsAck = Math.min(
docCount,
// only using cardinality count when count lower than precision threshold
docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount,
nbAgentsActioned
);
const nbAgentsAck =
action.type === 'UPDATE_TAGS'
? Math.min(docCount, nbAgentsActioned)
: Math.min(
docCount,
// only using cardinality count when count lower than precision threshold
docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: Why is cardinalityCount used, can't we always use the docCount here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cardinality was introduced for actions that can potentially be acked multiple times by agents e.g. upgrade. So we count acks by one agent once.

nbAgentsActioned
);
const completionTime = (matchingBucket?.max_timestamp as any)?.value_as_string;
const complete = nbAgentsAck >= nbAgentsActioned;
const cancelledAction = cancelledActions.find((a) => a.actionId === action.actionId);
Expand Down
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ export function getElasticsearchQuery(
kuery: string,
showInactive = false,
includeHosted = false,
hostedPolicies: string[] = []
hostedPolicies: string[] = [],
extraFilters: string[] = []
): estypes.QueryDslQueryContainer | undefined {
const filters = [];

Expand All @@ -171,6 +172,8 @@ export function getElasticsearchQuery(
filters.push('NOT (policy_id:{policyIds})'.replace('{policyIds}', hostedPolicies.join(',')));
}

filters.push(...extraFilters);

const kueryNode = _joinFilters(filters);
return kueryNode ? toElasticsearchQuery(kueryNode) : undefined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/serv

import { createClientMock } from './action.mock';
import { updateAgentTags } from './update_agent_tags';
import { updateTagsBatch } from './update_agent_tags_action_runner';

jest.mock('../app_context', () => {
return {
Expand All @@ -28,6 +29,7 @@ jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
list: jest.fn().mockResolvedValue({ items: [] }),
},
};
});
Expand Down Expand Up @@ -73,7 +75,7 @@ describe('update_agent_tags', () => {

expect(esClient.updateByQuery).toHaveBeenCalledWith(
expect.objectContaining({
conflicts: 'abort',
conflicts: 'proceed',
index: '.fleet-agents',
query: { terms: { _id: ['agent1'] } },
script: expect.objectContaining({
Expand All @@ -90,6 +92,9 @@ describe('update_agent_tags', () => {
});

it('should update action results on success', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 1, total: 1 } as any);

await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []);

const agentAction = esClient.create.mock.calls[0][0] as any;
Expand All @@ -110,11 +115,32 @@ describe('update_agent_tags', () => {
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should write error action results for hosted agent when agentIds are passed', async () => {
it('should update action results on success - kuery', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
}
);

const actionResults = esClient.bulk.mock.calls[0][0] as any;
const agentIds = actionResults?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(agentIds[0]).toHaveLength(36); // uuid
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should skip hosted agent from total when agentIds are passed', async () => {
const { esClient: esClientMock, agentInHostedDoc } = createClientMock();

esClientMock.updateByQuery.mockReset();
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: '0' } as any);
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 0 } as any);

await updateAgentTags(
soClient,
Expand All @@ -130,13 +156,9 @@ describe('update_agent_tags', () => {
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 1,
total: 0,
})
);

const errorResults = esClientMock.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].agent_id).toEqual(agentInHostedDoc._id);
expect(errorResults.body[1].error).toEqual('Cannot modify tags on a hosted agent');
});

it('should write error action results when failures are returned', async () => {
Expand All @@ -152,6 +174,46 @@ describe('update_agent_tags', () => {
expect(errorResults.body[1].error).toEqual('error reason');
});

it('should throw error on version conflicts', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [],
updated: 0,
version_conflicts: 100,
} as any);

await expect(
updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], [])
).rejects.toThrowError('version conflict of 100 agents');
});

it('should write out error results on last retry with version conflicts', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [],
updated: 0,
version_conflicts: 100,
} as any);

await expect(
updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
total: 100,
retryCount: 3,
}
)
).rejects.toThrowError('version conflict of 100 agents');
const errorResults = esClient.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].error).toEqual('version conflict on 3rd retry');
});

it('should run add tags async when actioning more agents than batch size', async () => {
esClient.search.mockResolvedValue({
hits: {
Expand Down Expand Up @@ -180,4 +242,79 @@ describe('update_agent_tags', () => {

expect(mockRunAsync).toHaveBeenCalled();
});

it('should add tags filter if only one tag to add', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(updateByQuery.query).toEqual({
bool: {
filter: [
{ bool: { minimum_should_match: 1, should: [{ match: { active: true } }] } },
{
bool: {
must_not: { bool: { minimum_should_match: 1, should: [{ match: { tags: 'new' } }] } },
},
},
],
},
});
});

it('should add tags filter if only one tag to remove', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: [],
tagsToRemove: ['remove'],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(JSON.stringify(updateByQuery.query)).toContain(
'{"bool":{"should":[{"match":{"tags":"remove"}}],"minimum_should_match":1}}'
);
});

it('should write total from updateByQuery result if query returns less results', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 50 } as any);

await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
total: 100,
}
);

const agentAction = esClient.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 50,
})
);
});
});
Loading