Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] Bulk action refactor and small fixes #142299

Merged
merged 11 commits into from
Oct 3, 2022
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/cypress/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const plugin: Cypress.PluginConfig = (on, config) => {
query,
ignore_unavailable: ignoreUnavailable,
refresh: true,
conflicts: 'proceed',
});
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,22 @@ export const TagsAddRemove: React.FC<Props> = ({
// sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated
const updatedTagsToAdd = tagsToAdd.concat(
labels
.filter((tag) => tag.checked === 'on' && !selectedTags.includes(tag.label))
.filter(
(tag) =>
tag.checked === 'on' &&
!selectedTags.includes(tag.label) &&
!tagsToRemove.includes(tag.label)
)
.map((tag) => tag.label)
);
const updatedTagsToRemove = tagsToRemove.concat(
labels
.filter((tag) => tag.checked !== 'on' && selectedTags.includes(tag.label))
.filter(
(tag) =>
tag.checked !== 'on' &&
selectedTags.includes(tag.label) &&
!tagsToAdd.includes(tag.label)
)
.map((tag) => tag.label)
);

Expand Down
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/action_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ export async function getActionStatuses(
const matchingBucket = (acks?.aggregations?.ack_counts as any)?.buckets?.find(
(bucket: any) => bucket.key === action.actionId
);
const nbAgentsAck = (matchingBucket?.agent_count as any)?.value ?? 0;
const nbAgentsAck = Math.min(
matchingBucket?.doc_count ?? 0,
(matchingBucket?.agent_count as any)?.value ?? 0
);
const completionTime = (matchingBucket?.max_timestamp as any)?.value_as_string;
const nbAgentsActioned = action.nbAgentsActioned || action.nbAgentsActionCreated;
const complete = nbAgentsAck >= nbAgentsActioned;
Expand Down
12 changes: 8 additions & 4 deletions x-pack/plugins/fleet/server/services/agents/actions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ describe('Agent actions', () => {
await cancelAgentAction(esClient, 'action1');

expect(mockedBulkUpdateAgents).toBeCalled();
expect(mockedBulkUpdateAgents).toBeCalledWith(expect.anything(), [
expect.objectContaining({ agentId: 'agent1' }),
expect.objectContaining({ agentId: 'agent2' }),
]);
expect(mockedBulkUpdateAgents).toBeCalledWith(
expect.anything(),
[
expect.objectContaining({ agentId: 'agent1' }),
expect.objectContaining({ agentId: 'agent2' }),
],
{}
);
});
});
});
25 changes: 24 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import uuid from 'uuid';
import type { ElasticsearchClient } from '@kbn/core/server';

import { appContextService } from '../app_context';
import type {
Agent,
AgentAction,
Expand Down Expand Up @@ -101,6 +102,27 @@ export async function bulkCreateAgentActions(
return actions;
}

export async function createErrorActionResults(
esClient: ElasticsearchClient,
actionId: string,
errors: Record<Agent['id'], Error>
) {
const errorCount = Object.keys(errors).length;
if (errorCount > 0) {
appContextService.getLogger().info(`Writing error action results of ${errorCount} agents`);
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved

// writing out error result for those agents that have errors, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
}

export async function bulkCreateAgentActionResults(
esClient: ElasticsearchClient,
results: Array<{
Expand Down Expand Up @@ -246,7 +268,8 @@ export async function cancelAgentAction(esClient: ElasticsearchClient, actionId:
upgraded_at: null,
upgrade_started_at: null,
},
}))
})),
{}
);
}
}
Expand Down
21 changes: 10 additions & 11 deletions x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { SavedObjectsClientContract, ElasticsearchClient } from '@kbn/core/
import type { KueryNode } from '@kbn/es-query';
import { fromKueryExpression, toElasticsearchQuery } from '@kbn/es-query';

import type { AgentSOAttributes, Agent, BulkActionResult, ListWithKuery } from '../../types';
import type { AgentSOAttributes, Agent, ListWithKuery } from '../../types';
import { appContextService, agentPolicyService } from '..';
import type { FleetServerAgent } from '../../../common/types';
import { SO_SEARCH_LIMIT } from '../../../common/constants';
Expand Down Expand Up @@ -395,10 +395,11 @@ export async function bulkUpdateAgents(
updateData: Array<{
agentId: string;
data: Partial<AgentSOAttributes>;
}>
): Promise<{ items: BulkActionResult[] }> {
}>,
errors: { [key: string]: Error }
): Promise<void> {
if (updateData.length === 0) {
return { items: [] };
return;
}

const body = updateData.flatMap(({ agentId, data }) => [
Expand All @@ -419,14 +420,12 @@ export async function bulkUpdateAgents(
refresh: 'wait_for',
});

return {
items: res.items.map((item) => ({
id: item.update!._id as string,
success: !item.update!.error,
res.items
.filter((item) => item.update!.error)
.forEach((item) => {
// @ts-expect-error it not assignable to ErrorCause
error: item.update!.error as Error,
})),
};
errors[item.update!._id as string] = item.update!.error as Error;
});
}

export async function deleteAgent(esClient: ElasticsearchClient, agentId: string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { appContextService } from '../app_context';
import { ActionRunner } from './action_runner';

import { bulkUpdateAgents } from './crud';
import { bulkCreateAgentActionResults, createAgentAction } from './actions';
import { createErrorActionResults, createAgentAction } from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
import { BulkActionTaskType } from './bulk_actions_resolver';

Expand Down Expand Up @@ -72,26 +72,20 @@ export async function reassignBatch(
throw new AgentReassignmentError('No agents to reassign, already assigned or hosted agents');
}

const res = await bulkUpdateAgents(
await bulkUpdateAgents(
esClient,
agentsToUpdate.map((agent) => ({
agentId: agent.id,
data: {
policy_id: options.newAgentPolicyId,
policy_revision: null,
},
}))
})),
errors
);

res.items
.filter((item) => !item.success)
.forEach((item) => {
errors[item.id] = item.error!;
});

const actionId = options.actionId ?? uuid();
const errorCount = Object.keys(errors).length;
const total = options.total ?? agentsToUpdate.length + errorCount;
const total = options.total ?? givenAgents.length;

const now = new Date().toISOString();
await createAgentAction(esClient, {
Expand All @@ -105,23 +99,7 @@ export async function reassignBatch(
},
});

if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (already assigned or assigned to hosted policy)`
);

// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
await createErrorActionResults(esClient, actionId, errors);

return { actionId };
}
16 changes: 8 additions & 8 deletions x-pack/plugins/fleet/server/services/agents/unenroll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ describe('unenrollAgents (plural)', () => {

// calls ES update with correct values
const onlyRegular = [agentInRegularDoc._id, agentInRegularDoc2._id];
const calledWith = esClient.bulk.mock.calls[1][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
Expand All @@ -128,7 +128,7 @@ describe('unenrollAgents (plural)', () => {
}

// hosted policy is updated in action results with error
const calledWithActionResults = esClient.bulk.mock.calls[0][0] as estypes.BulkRequest;
const calledWithActionResults = esClient.bulk.mock.calls[1][0] as estypes.BulkRequest;
// bulk write two line per create
expect(calledWithActionResults.body?.length).toBe(2);
const expectedObject = expect.objectContaining({
Expand Down Expand Up @@ -170,7 +170,7 @@ describe('unenrollAgents (plural)', () => {
});

expect(esClient.bulk.mock.calls.length).toEqual(3);
const bulkBody = (esClient.bulk.mock.calls[1][0] as estypes.BulkRequest)?.body?.[1] as any;
const bulkBody = (esClient.bulk.mock.calls[2][0] as estypes.BulkRequest)?.body?.[1] as any;
expect(bulkBody.agent_id).toEqual(agentInRegularDoc._id);
expect(bulkBody.action_id).toEqual('other-action');
});
Expand Down Expand Up @@ -227,7 +227,7 @@ describe('unenrollAgents (plural)', () => {

// calls ES update with correct values
const onlyRegular = [agentInRegularDoc._id, agentInRegularDoc2._id];
const calledWith = esClient.bulk.mock.calls[2][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
Expand All @@ -239,13 +239,13 @@ describe('unenrollAgents (plural)', () => {
expect(doc).toHaveProperty('unenrolled_at');
}

const errorResults = esClient.bulk.mock.calls[1][0];
const errorResults = esClient.bulk.mock.calls[2][0];
const errorIds = (errorResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(errorIds).toEqual([agentInHostedDoc._id]);

const actionResults = esClient.bulk.mock.calls[0][0];
const actionResults = esClient.bulk.mock.calls[1][0];
const resultIds = (actionResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
Expand Down Expand Up @@ -290,7 +290,7 @@ describe('unenrollAgents (plural)', () => {
expect(unenrolledResponse.actionId).toBeDefined();

// calls ES update with correct values
const calledWith = esClient.bulk.mock.calls[1][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
Expand All @@ -302,7 +302,7 @@ describe('unenrollAgents (plural)', () => {
expect(doc).toHaveProperty('unenrolled_at');
}

const actionResults = esClient.bulk.mock.calls[0][0];
const actionResults = esClient.bulk.mock.calls[1][0];
const resultIds = (actionResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { bulkUpdateAgents } from './crud';
import {
bulkCreateAgentActionResults,
createAgentAction,
createErrorActionResults,
getUnenrollAgentActions,
} from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
Expand Down Expand Up @@ -81,13 +82,24 @@ export async function unenrollBatch(
return agents;
}, []);

const now = new Date().toISOString();

// Update the necessary agents
const updateData = options.revoke
? { unenrolled_at: now, active: false }
: { unenrollment_started_at: now };

await bulkUpdateAgents(
esClient,
agentsToUpdate.map(({ id }) => ({ agentId: id, data: updateData })),
outgoingErrors
);

const actionId = options.actionId ?? uuid();
const errorCount = Object.keys(outgoingErrors).length;
const total = options.total ?? givenAgents.length;

const agentIds = agentsToUpdate.map((agent) => agent.id);

const now = new Date().toISOString();
if (options.revoke) {
// Get all API keys that need to be invalidated
await invalidateAPIKeysForAgents(agentsToUpdate);
Expand All @@ -104,33 +116,7 @@ export async function unenrollBatch(
});
}

if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (cannot unenroll from a hosted policy or already unenrolled)`
);

// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(outgoingErrors).map((agentId) => ({
agentId,
actionId,
error: outgoingErrors[agentId].message,
}))
);
}

// Update the necessary agents
const updateData = options.revoke
? { unenrolled_at: now, active: false }
: { unenrollment_started_at: now };

await bulkUpdateAgents(
esClient,
agentsToUpdate.map(({ id }) => ({ agentId: id, data: updateData }))
);
await createErrorActionResults(esClient, actionId, outgoingErrors);

return {
actionId,
Expand Down
Loading