Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor bulk update tags to fix issue with concurrently removing multiple tags #143543

Merged
merged 12 commits into from
Nov 8, 2022
14 changes: 12 additions & 2 deletions x-pack/plugins/fleet/server/services/agents/action_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,18 @@ export abstract class ActionRunner {
} else {
appContextService.getLogger().error(`Action failed: ${error.message}`);
}
const taskId = await this.bulkActionsResolver!.run(
const taskId = this.bulkActionsResolver!.getTaskId(
this.actionParams.actionId!,
this.getTaskType()
);
await this.bulkActionsResolver!.run(
this.actionParams,
{
...this.retryParams,
retryCount: (this.retryParams.retryCount ?? 0) + 1,
},
this.getTaskType()
this.getTaskType(),
taskId
);

appContextService.getLogger().info(`Retrying in task: ${taskId}`);
Expand All @@ -135,13 +140,18 @@ export abstract class ActionRunner {
}

private async createCheckResultTask() {
const taskId = this.bulkActionsResolver!.getTaskId(
this.actionParams.actionId!,
this.getTaskType() + ':check'
);
return await this.bulkActionsResolver!.run(
this.actionParams,
{
...this.retryParams,
retryCount: 1,
},
this.getTaskType(),
taskId,
moment(new Date()).add(5, 'm').toDate()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ export class BulkActionsResolver {
this.taskManager = taskManager;
}

getTaskId(actionId: string, type: string) {
public getTaskId(actionId: string, type: string) {
return `${type}:${actionId}`;
}

public async run(
actionParams: ActionParams,
retryParams: RetryParams,
taskType: string,
taskId: string,
runAt?: Date
) {
const taskId = this.getTaskId(actionParams.actionId!, taskType);
await this.taskManager?.ensureScheduled({
id: taskId,
taskType,
Expand Down
24 changes: 24 additions & 0 deletions x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,30 @@ export async function getAgentTags(
}
}

export function getElasticsearchQuery(
kuery: string,
showInactive = false,
includeHosted = false,
hostedPolicies: string[] = []
): estypes.QueryDslQueryContainer | undefined {
const filters = [];

if (kuery && kuery !== '') {
filters.push(kuery);
}

if (showInactive === false) {
filters.push(ACTIVE_AGENT_CONDITION);
}

if (!includeHosted && hostedPolicies.length > 0) {
filters.push('NOT (policy_id:{policyIds})'.replace('{policyIds}', hostedPolicies.join(',')));
}

const kueryNode = _joinFilters(filters);
return kueryNode ? toElasticsearchQuery(kueryNode) : undefined;
}

export async function getAgentsByKuery(
esClient: ElasticsearchClient,
options: ListWithKuery & {
Expand Down
165 changes: 92 additions & 73 deletions x-pack/plugins/fleet/server/services/agents/update_agent_tags.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,26 @@ import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/serv
import { createClientMock } from './action.mock';
import { updateAgentTags } from './update_agent_tags';

jest.mock('./filter_hosted_agents', () => ({
filterHostedPolicies: jest
.fn()
.mockImplementation((soClient, givenAgents) => Promise.resolve(givenAgents)),
}));
jest.mock('../app_context', () => {
return {
appContextService: {
getLogger: jest.fn().mockReturnValue({
debug: jest.fn(),
warn: jest.fn(),
info: jest.fn(),
error: jest.fn(),
} as any),
},
};
});

jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
},
};
});

const mockRunAsync = jest.fn().mockResolvedValue({});
jest.mock('./update_agent_tags_action_runner', () => ({
Expand All @@ -42,82 +57,60 @@ describe('update_agent_tags', () => {
} as any,
],
});
esClient.bulk.mockReset();
esClient.bulk.mockResolvedValue({
items: [],
} as any);

esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 1 } as any);

mockRunAsync.mockClear();
});

function expectTagsInEsBulk(tags: string[]) {
expect(esClient.bulk).toHaveBeenCalledWith(
it('should remove duplicate tags', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one', 'one'], ['two']);

expect(esClient.updateByQuery).toHaveBeenCalledWith(
expect.objectContaining({
body: [
expect.anything(),
{
doc: expect.objectContaining({
tags,
}),
},
],
conflicts: 'abort',
index: '.fleet-agents',
query: { terms: { _id: ['agent1'] } },
script: expect.objectContaining({
lang: 'painless',
params: expect.objectContaining({
tagsToAdd: ['one'],
tagsToRemove: ['two'],
updatedAt: expect.anything(),
}),
source: expect.anything(),
}),
})
);
}

it('should replace tag in middle place when one add and one remove tag', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['two']);

expectTagsInEsBulk(['one', 'newName', 'three']);
});

it('should replace tag in first place when one add and one remove tag', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['one']);

expectTagsInEsBulk(['newName', 'two', 'three']);
});

it('should replace tag in last place when one add and one remove tag', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['three']);

expectTagsInEsBulk(['one', 'two', 'newName']);
});

it('should add tag when tagsToRemove does not exist', async () => {
esClient.mget.mockResolvedValue({
docs: [
{
_id: 'agent1',
_source: {},
} as any,
],
});
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['three']);

expectTagsInEsBulk(['newName']);
});

it('should remove duplicate tags', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], ['two']);

expectTagsInEsBulk(['one', 'three']);
});

it('should add tag at the end when no tagsToRemove', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], []);
it('should update action results on success', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []);

expectTagsInEsBulk(['one', 'two', 'three', 'newName']);
const agentAction = esClient.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: ['agent1'],
type: 'UPDATE_TAGS',
total: 1,
})
);

const actionResults = esClient.bulk.mock.calls[1][0] as any;
const resultIds = actionResults?.body
const actionResults = esClient.bulk.mock.calls[0][0] as any;
const agentIds = actionResults?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(resultIds).toEqual(['agent1']);

const action = esClient.create.mock.calls[0][0] as any;
expect(action.body.type).toEqual('UPDATE_TAGS');
expect(agentIds).toEqual(['agent1']);
expect(actionResults.body[1].error).not.toBeDefined();
});

it('should write error action results for hosted agent', async () => {
it('should write error action results for hosted agent when agentIds are passed', async () => {
const { esClient: esClientMock, agentInHostedDoc } = createClientMock();

await updateAgentTags(
Expand All @@ -128,27 +121,53 @@ describe('update_agent_tags', () => {
[]
);

const errorResults = esClientMock.bulk.mock.calls[1][0] as any;
const agentAction = esClientMock.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 1,
})
);

const errorResults = esClientMock.bulk.mock.calls[0][0] as any;
const errorIds = errorResults?.body?.filter((i: any) => i.agent_id).map((i: any) => i.agent_id);
expect(errorIds).toEqual([agentInHostedDoc._id]);
expect(errorResults.body[1].error).toEqual(
'Cannot modify tags on a hosted agent in Fleet because the agent policy is managed by an external orchestration solution, such as Elastic Cloud, Kubernetes, etc. Please make changes using your orchestration solution.'
);
});

it('should add tag at the end when tagsToRemove not in existing tags', async () => {
await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['newName'], ['dummy']);
it('should write error action results when failures are returned', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [{ cause: { reason: 'error reason' } }],
updated: 0,
} as any);

await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []);

expectTagsInEsBulk(['one', 'two', 'three', 'newName']);
const errorResults = esClient.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].error).toEqual('error reason');
});

it('should add tag at the end when multiple tagsToRemove', async () => {
it('should write error action results when less agents updated than total', async () => {
const { esClient: esClientMock, agentInRegularDoc, agentInRegularDoc2 } = createClientMock();

esClientMock.updateByQuery.mockReset();
esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: '1' } as any);

await updateAgentTags(
soClient,
esClient,
{ agentIds: ['agent1'] },
['newName'],
['one', 'two']
esClientMock,
{ agentIds: [agentInRegularDoc._id, agentInRegularDoc2._id] },
['one'],
[]
);

expectTagsInEsBulk(['three', 'newName']);
const errorResults = esClientMock.bulk.mock.calls[0][0] as any;
expect(errorResults.body[1].error).toEqual('Cannot modify tags on a hosted agent');
});

it('should run add tags async when actioning more agents than batch size', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { AgentReassignmentError } from '../../errors';

import { SO_SEARCH_LIMIT } from '../../constants';

import { getAgentDocuments, getAgentsByKuery, openPointInTime } from './crud';
import { getAgentDocuments, getAgentsByKuery } from './crud';
import type { GetAgentsOptions } from '.';
import { searchHitToAgent } from './helpers';
import { UpdateAgentTagsActionRunner, updateTagsBatch } from './update_agent_tags_action_runner';
Expand Down Expand Up @@ -61,10 +61,11 @@ export async function updateAgentTags(
...options,
batchSize,
total: res.total,
kuery: options.kuery,
tagsToAdd,
tagsToRemove,
},
{ pitId: await openPointInTime(esClient) }
{ pitId: '' }
).runActionAsyncWithRetry();
}
}
Expand Down
Loading