Skip to content

Commit

Permalink
Refactor bulk update tags to fix issue with concurrently removing mul…
Browse files Browse the repository at this point in the history
…tiple tags (#143543)

* update tags refactor

* reverted yarn file

* update tags with abort on conflict and retry

* fixed hosted policy condition

* fixed unit tests

* fixed integration tests, fixed a bug

* fixed unit test

* fixed unit test

* fix cypress test

* small cleanup
  • Loading branch information
juliaElastic authored Nov 8, 2022
1 parent 544c1ff commit 0017a08
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 137 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/cypress/tasks/fleet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function navigateToAgentPolicy(name: string) {
export function navigateToEnrollmentTokens() {
cy.getBySel(ENROLLMENT_TOKENS_TAB).click();
cy.get('.euiBasicTable-loading').should('not.exist');
cy.get('.euiButtonIcon--danger'); // wait for trash icon
cy.getBySel('enrollmentTokenTable.revokeBtn'); // wait for revoke btn
}

export function verifyPolicy(name: string, integrations: string[]) {
Expand Down
14 changes: 12 additions & 2 deletions x-pack/plugins/fleet/server/services/agents/action_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,18 @@ export abstract class ActionRunner {
} else {
appContextService.getLogger().error(`Action failed: ${error.message}`);
}
const taskId = await this.bulkActionsResolver!.run(
const taskId = this.bulkActionsResolver!.getTaskId(
this.actionParams.actionId!,
this.getTaskType()
);
await this.bulkActionsResolver!.run(
this.actionParams,
{
...this.retryParams,
retryCount: (this.retryParams.retryCount ?? 0) + 1,
},
this.getTaskType()
this.getTaskType(),
taskId
);

appContextService.getLogger().info(`Retrying in task: ${taskId}`);
Expand All @@ -135,13 +140,18 @@ export abstract class ActionRunner {
}

private async createCheckResultTask() {
const taskId = this.bulkActionsResolver!.getTaskId(
this.actionParams.actionId!,
this.getTaskType() + ':check'
);
return await this.bulkActionsResolver!.run(
this.actionParams,
{
...this.retryParams,
retryCount: 1,
},
this.getTaskType(),
taskId,
moment(new Date()).add(5, 'm').toDate()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ export class BulkActionsResolver {
this.taskManager = taskManager;
}

getTaskId(actionId: string, type: string) {
public getTaskId(actionId: string, type: string) {
return `${type}:${actionId}`;
}

public async run(
actionParams: ActionParams,
retryParams: RetryParams,
taskType: string,
taskId: string,
runAt?: Date
) {
const taskId = this.getTaskId(actionParams.actionId!, taskType);
await this.taskManager?.ensureScheduled({
id: taskId,
taskType,
Expand Down
24 changes: 24 additions & 0 deletions x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,30 @@ export async function getAgentTags(
}
}

export function getElasticsearchQuery(
kuery: string,
showInactive = false,
includeHosted = false,
hostedPolicies: string[] = []
): estypes.QueryDslQueryContainer | undefined {
const filters = [];

if (kuery && kuery !== '') {
filters.push(kuery);
}

if (showInactive === false) {
filters.push(ACTIVE_AGENT_CONDITION);
}

if (!includeHosted && hostedPolicies.length > 0) {
filters.push('NOT (policy_id:{policyIds})'.replace('{policyIds}', hostedPolicies.join(',')));
}

const kueryNode = _joinFilters(filters);
return kueryNode ? toElasticsearchQuery(kueryNode) : undefined;
}

export async function getAgentsByKuery(
esClient: ElasticsearchClient,
options: ListWithKuery & {
Expand Down
161 changes: 81 additions & 80 deletions x-pack/plugins/fleet/server/services/agents/update_agent_tags.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,26 @@ import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/serv
import { createClientMock } from './action.mock';
import { updateAgentTags } from './update_agent_tags';

jest.mock('./filter_hosted_agents', () => ({
filterHostedPolicies: jest
.fn()
.mockImplementation((soClient, givenAgents) => Promise.resolve(givenAgents)),
}));
jest.mock('../app_context', () => {
return {
appContextService: {
getLogger: jest.fn().mockReturnValue({
debug: jest.fn(),
warn: jest.fn(),
info: jest.fn(),
error: jest.fn(),
} as any),
},
};
});

jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
},
};
});

const mockRunAsync = jest.fn().mockResolvedValue({});
jest.mock('./update_agent_tags_action_runner', () => ({
Expand All @@ -42,84 +57,65 @@ describe('update_agent_tags', () => {
} as any,
],
});
esClient.bulk.mockReset();
esClient.bulk.mockResolvedValue({
items: [],
} as any);

esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 1 } as any);

mockRunAsync.mockClear();
});

function expectTagsInEsBulk(tags: string[]) {
expect(esClient.bulk).toHaveBeenCalledWith(
it('should remove duplicate tags', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one', 'one'], ['two']);

expect(esClient.updateByQuery).toHaveBeenCalledWith(
expect.objectContaining({
body: [
expect.anything(),
{
doc: expect.objectContaining({
tags,
}),
},
],
conflicts: 'abort',
index: '.fleet-agents',
query: { terms: { _id: ['agent1'] } },
script: expect.objectContaining({
lang: 'painless',
params: expect.objectContaining({
tagsToAdd: ['one'],
tagsToRemove: ['two'],
updatedAt: expect.anything(),
}),
source: expect.anything(),
}),
})
);
}

it('should replace tag in middle place when one add and one remove tag', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['two']);

expectTagsInEsBulk(['one', 'newName', 'three']);
});

it('should replace tag in first place when one add and one remove tag', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['one']);

expectTagsInEsBulk(['newName', 'two', 'three']);
});

it('should replace tag in last place when one add and one remove tag', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['three']);

expectTagsInEsBulk(['one', 'two', 'newName']);
});

it('should add tag when tagsToRemove does not exist', async () => {
esClient.mget.mockResolvedValue({
docs: [
{
_id: 'agent1',
_source: {},
} as any,
],
});
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['three']);

expectTagsInEsBulk(['newName']);
});

it('should remove duplicate tags', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], ['two']);

expectTagsInEsBulk(['one', 'three']);
});

it('should add tag at the end when no tagsToRemove', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], []);
it('should update action results on success', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []);

expectTagsInEsBulk(['one', 'two', 'three', 'newName']);
const agentAction = esClient.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: ['agent1'],
type: 'UPDATE_TAGS',
total: 1,
})
);

const actionResults = esClient.bulk.mock.calls[1][0] as any;
const resultIds = actionResults?.body
const actionResults = esClient.bulk.mock.calls[0][0] as any;
const agentIds = actionResults?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(resultIds).toEqual(['agent1']);

const action = esClient.create.mock.calls[0][0] as any;
expect(action.body.type).toEqual('UPDATE_TAGS');
expect(agentIds).toEqual(['agent1']);
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should write error action results for hosted agent', async () => {
it('should write error action results for hosted agent when agentIds are passed', async () => {
const { esClient: esClientMock, agentInHostedDoc } = createClientMock();

esClientMock.updateByQuery.mockReset();
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: '0' } as any);

await updateAgentTags(
soClient,
esClientMock,
Expand All @@ -128,27 +124,32 @@ describe('update_agent_tags', () => {
[]
);

const errorResults = esClientMock.bulk.mock.calls[1][0] as any;
const errorIds = errorResults?.body?.filter((i: any) => i.agent_id).map((i: any) => i.agent_id);
expect(errorIds).toEqual([agentInHostedDoc._id]);
});

it('should add tag at the end when tagsToRemove not in existing tags', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['dummy']);
const agentAction = esClientMock.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 1,
})
);

expectTagsInEsBulk(['one', 'two', 'three', 'newName']);
const errorResults = esClientMock.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].agent_id).toEqual(agentInHostedDoc._id);
expect(errorResults.body[1].error).toEqual('Cannot modify tags on a hosted agent');
});

it('should add tag at the end when multiple tagsToRemove', async () => {
await updateAgentTags(
soClient,
esClient,
{ agentIds: ['agent1'] },
['newName'],
['one', 'two']
);
it('should write error action results when failures are returned', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [{ cause: { reason: 'error reason' } }],
updated: 0,
} as any);

await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []);

expectTagsInEsBulk(['three', 'newName']);
const errorResults = esClient.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].error).toEqual('error reason');
});

it('should run add tags async when actioning more agents than batch size', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { AgentReassignmentError } from '../../errors';

import { SO_SEARCH_LIMIT } from '../../constants';

import { getAgentDocuments, getAgentsByKuery, openPointInTime } from './crud';
import { getAgentDocuments, getAgentsByKuery } from './crud';
import type { GetAgentsOptions } from '.';
import { searchHitToAgent } from './helpers';
import { UpdateAgentTagsActionRunner, updateTagsBatch } from './update_agent_tags_action_runner';
Expand Down Expand Up @@ -61,10 +61,11 @@ export async function updateAgentTags(
...options,
batchSize,
total: res.total,
kuery: options.kuery,
tagsToAdd,
tagsToRemove,
},
{ pitId: await openPointInTime(esClient) }
{ pitId: '' }
).runActionAsyncWithRetry();
}
}
Expand Down
Loading

0 comments on commit 0017a08

Please sign in to comment.