Skip to content

Commit

Permalink
[Fleet] Bulk action refactor and small fixes (elastic#142299)
Browse files Browse the repository at this point in the history
* refactor to report errors in agent update for all actions

* fixed tests

* fixed types

* refactor to reduce duplication

* fixed test

* fix for cypress test

* passing error reason

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
2 people authored and WafaaNasr committed Oct 14, 2022
1 parent e1adec3 commit fcf8200
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 145 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/cypress/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const plugin: Cypress.PluginConfig = (on, config) => {
query,
ignore_unavailable: ignoreUnavailable,
refresh: true,
conflicts: 'proceed',
});
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,22 @@ export const TagsAddRemove: React.FC<Props> = ({
// sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated
const updatedTagsToAdd = tagsToAdd.concat(
labels
.filter((tag) => tag.checked === 'on' && !selectedTags.includes(tag.label))
.filter(
(tag) =>
tag.checked === 'on' &&
!selectedTags.includes(tag.label) &&
!tagsToRemove.includes(tag.label)
)
.map((tag) => tag.label)
);
const updatedTagsToRemove = tagsToRemove.concat(
labels
.filter((tag) => tag.checked !== 'on' && selectedTags.includes(tag.label))
.filter(
(tag) =>
tag.checked !== 'on' &&
selectedTags.includes(tag.label) &&
!tagsToAdd.includes(tag.label)
)
.map((tag) => tag.label)
);

Expand Down
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/action_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ export async function getActionStatuses(
const matchingBucket = (acks?.aggregations?.ack_counts as any)?.buckets?.find(
(bucket: any) => bucket.key === action.actionId
);
const nbAgentsAck = (matchingBucket?.agent_count as any)?.value ?? 0;
const nbAgentsAck = Math.min(
matchingBucket?.doc_count ?? 0,
(matchingBucket?.agent_count as any)?.value ?? 0
);
const completionTime = (matchingBucket?.max_timestamp as any)?.value_as_string;
const nbAgentsActioned = action.nbAgentsActioned || action.nbAgentsActionCreated;
const complete = nbAgentsAck >= nbAgentsActioned;
Expand Down
12 changes: 8 additions & 4 deletions x-pack/plugins/fleet/server/services/agents/actions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ describe('Agent actions', () => {
await cancelAgentAction(esClient, 'action1');

expect(mockedBulkUpdateAgents).toBeCalled();
expect(mockedBulkUpdateAgents).toBeCalledWith(expect.anything(), [
expect.objectContaining({ agentId: 'agent1' }),
expect.objectContaining({ agentId: 'agent2' }),
]);
expect(mockedBulkUpdateAgents).toBeCalledWith(
expect.anything(),
[
expect.objectContaining({ agentId: 'agent1' }),
expect.objectContaining({ agentId: 'agent2' }),
],
{}
);
});
});
});
50 changes: 39 additions & 11 deletions x-pack/plugins/fleet/server/services/agents/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import uuid from 'uuid';
import type { ElasticsearchClient } from '@kbn/core/server';

import { appContextService } from '../app_context';
import type {
Agent,
AgentAction,
Expand Down Expand Up @@ -101,6 +102,32 @@ export async function bulkCreateAgentActions(
return actions;
}

export async function createErrorActionResults(
esClient: ElasticsearchClient,
actionId: string,
errors: Record<Agent['id'], Error>,
errorMessage: string
) {
const errorCount = Object.keys(errors).length;
if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Writing error action results of ${errorCount} agents. Possibly failed validation: ${errorMessage}.`
);

// writing out error result for those agents that have errors, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
}

export async function bulkCreateAgentActionResults(
esClient: ElasticsearchClient,
results: Array<{
Expand Down Expand Up @@ -227,16 +254,6 @@ export async function cancelAgentAction(esClient: ElasticsearchClient, actionId:
if (!hit._source || !hit._source.agents || !hit._source.action_id) {
continue;
}
await createAgentAction(esClient, {
id: cancelActionId,
type: 'CANCEL',
agents: hit._source.agents,
data: {
target_id: hit._source.action_id,
},
created_at: now,
expiration: hit._source.expiration,
});
if (hit._source.type === 'UPGRADE') {
await bulkUpdateAgents(
esClient,
Expand All @@ -246,9 +263,20 @@ export async function cancelAgentAction(esClient: ElasticsearchClient, actionId:
upgraded_at: null,
upgrade_started_at: null,
},
}))
})),
{}
);
}
await createAgentAction(esClient, {
id: cancelActionId,
type: 'CANCEL',
agents: hit._source.agents,
data: {
target_id: hit._source.action_id,
},
created_at: now,
expiration: hit._source.expiration,
});
}

return {
Expand Down
21 changes: 10 additions & 11 deletions x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { SavedObjectsClientContract, ElasticsearchClient } from '@kbn/core/
import type { KueryNode } from '@kbn/es-query';
import { fromKueryExpression, toElasticsearchQuery } from '@kbn/es-query';

import type { AgentSOAttributes, Agent, BulkActionResult, ListWithKuery } from '../../types';
import type { AgentSOAttributes, Agent, ListWithKuery } from '../../types';
import { appContextService, agentPolicyService } from '..';
import type { FleetServerAgent } from '../../../common/types';
import { SO_SEARCH_LIMIT } from '../../../common/constants';
Expand Down Expand Up @@ -395,10 +395,11 @@ export async function bulkUpdateAgents(
updateData: Array<{
agentId: string;
data: Partial<AgentSOAttributes>;
}>
): Promise<{ items: BulkActionResult[] }> {
}>,
errors: { [key: string]: Error }
): Promise<void> {
if (updateData.length === 0) {
return { items: [] };
return;
}

const body = updateData.flatMap(({ agentId, data }) => [
Expand All @@ -419,14 +420,12 @@ export async function bulkUpdateAgents(
refresh: 'wait_for',
});

return {
items: res.items.map((item) => ({
id: item.update!._id as string,
success: !item.update!.error,
res.items
.filter((item) => item.update!.error)
.forEach((item) => {
// @ts-expect-error it not assignable to ErrorCause
error: item.update!.error as Error,
})),
};
errors[item.update!._id as string] = item.update!.error as Error;
});
}

export async function deleteAgent(esClient: ElasticsearchClient, agentId: string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { appContextService } from '../app_context';
import { ActionRunner } from './action_runner';

import { bulkUpdateAgents } from './crud';
import { bulkCreateAgentActionResults, createAgentAction } from './actions';
import { createErrorActionResults, createAgentAction } from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
import { BulkActionTaskType } from './bulk_actions_resolver';

Expand Down Expand Up @@ -72,26 +72,20 @@ export async function reassignBatch(
throw new AgentReassignmentError('No agents to reassign, already assigned or hosted agents');
}

const res = await bulkUpdateAgents(
await bulkUpdateAgents(
esClient,
agentsToUpdate.map((agent) => ({
agentId: agent.id,
data: {
policy_id: options.newAgentPolicyId,
policy_revision: null,
},
}))
})),
errors
);

res.items
.filter((item) => !item.success)
.forEach((item) => {
errors[item.id] = item.error!;
});

const actionId = options.actionId ?? uuid();
const errorCount = Object.keys(errors).length;
const total = options.total ?? agentsToUpdate.length + errorCount;
const total = options.total ?? givenAgents.length;

const now = new Date().toISOString();
await createAgentAction(esClient, {
Expand All @@ -105,23 +99,12 @@ export async function reassignBatch(
},
});

if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (already assigned or assigned to hosted policy)`
);

// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(errors).map((agentId) => ({
agentId,
actionId,
error: errors[agentId].message,
}))
);
}
await createErrorActionResults(
esClient,
actionId,
errors,
'already assigned or assigned to hosted policy'
);

return { actionId };
}
16 changes: 8 additions & 8 deletions x-pack/plugins/fleet/server/services/agents/unenroll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ describe('unenrollAgents (plural)', () => {

// calls ES update with correct values
const onlyRegular = [agentInRegularDoc._id, agentInRegularDoc2._id];
const calledWith = esClient.bulk.mock.calls[1][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
Expand All @@ -128,7 +128,7 @@ describe('unenrollAgents (plural)', () => {
}

// hosted policy is updated in action results with error
const calledWithActionResults = esClient.bulk.mock.calls[0][0] as estypes.BulkRequest;
const calledWithActionResults = esClient.bulk.mock.calls[1][0] as estypes.BulkRequest;
// bulk write two line per create
expect(calledWithActionResults.body?.length).toBe(2);
const expectedObject = expect.objectContaining({
Expand Down Expand Up @@ -170,7 +170,7 @@ describe('unenrollAgents (plural)', () => {
});

expect(esClient.bulk.mock.calls.length).toEqual(3);
const bulkBody = (esClient.bulk.mock.calls[1][0] as estypes.BulkRequest)?.body?.[1] as any;
const bulkBody = (esClient.bulk.mock.calls[2][0] as estypes.BulkRequest)?.body?.[1] as any;
expect(bulkBody.agent_id).toEqual(agentInRegularDoc._id);
expect(bulkBody.action_id).toEqual('other-action');
});
Expand Down Expand Up @@ -227,7 +227,7 @@ describe('unenrollAgents (plural)', () => {

// calls ES update with correct values
const onlyRegular = [agentInRegularDoc._id, agentInRegularDoc2._id];
const calledWith = esClient.bulk.mock.calls[2][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
Expand All @@ -239,13 +239,13 @@ describe('unenrollAgents (plural)', () => {
expect(doc).toHaveProperty('unenrolled_at');
}

const errorResults = esClient.bulk.mock.calls[1][0];
const errorResults = esClient.bulk.mock.calls[2][0];
const errorIds = (errorResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
expect(errorIds).toEqual([agentInHostedDoc._id]);

const actionResults = esClient.bulk.mock.calls[0][0];
const actionResults = esClient.bulk.mock.calls[1][0];
const resultIds = (actionResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
Expand Down Expand Up @@ -290,7 +290,7 @@ describe('unenrollAgents (plural)', () => {
expect(unenrolledResponse.actionId).toBeDefined();

// calls ES update with correct values
const calledWith = esClient.bulk.mock.calls[1][0];
const calledWith = esClient.bulk.mock.calls[0][0];
const ids = (calledWith as estypes.BulkRequest)?.body
?.filter((i: any) => i.update !== undefined)
.map((i: any) => i.update._id);
Expand All @@ -302,7 +302,7 @@ describe('unenrollAgents (plural)', () => {
expect(doc).toHaveProperty('unenrolled_at');
}

const actionResults = esClient.bulk.mock.calls[0][0];
const actionResults = esClient.bulk.mock.calls[1][0];
const resultIds = (actionResults as estypes.BulkRequest)?.body
?.filter((i: any) => i.agent_id)
.map((i: any) => i.agent_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { bulkUpdateAgents } from './crud';
import {
bulkCreateAgentActionResults,
createAgentAction,
createErrorActionResults,
getUnenrollAgentActions,
} from './actions';
import { getHostedPolicies, isHostedAgent } from './hosted_agent';
Expand Down Expand Up @@ -81,13 +82,24 @@ export async function unenrollBatch(
return agents;
}, []);

const now = new Date().toISOString();

// Update the necessary agents
const updateData = options.revoke
? { unenrolled_at: now, active: false }
: { unenrollment_started_at: now };

await bulkUpdateAgents(
esClient,
agentsToUpdate.map(({ id }) => ({ agentId: id, data: updateData })),
outgoingErrors
);

const actionId = options.actionId ?? uuid();
const errorCount = Object.keys(outgoingErrors).length;
const total = options.total ?? givenAgents.length;

const agentIds = agentsToUpdate.map((agent) => agent.id);

const now = new Date().toISOString();
if (options.revoke) {
// Get all API keys that need to be invalidated
await invalidateAPIKeysForAgents(agentsToUpdate);
Expand All @@ -104,32 +116,11 @@ export async function unenrollBatch(
});
}

if (errorCount > 0) {
appContextService
.getLogger()
.info(
`Skipping ${errorCount} agents, as failed validation (cannot unenroll from a hosted policy or already unenrolled)`
);

// writing out error result for those agents that failed validation, so the action is not going to stay in progress forever
await bulkCreateAgentActionResults(
esClient,
Object.keys(outgoingErrors).map((agentId) => ({
agentId,
actionId,
error: outgoingErrors[agentId].message,
}))
);
}

// Update the necessary agents
const updateData = options.revoke
? { unenrolled_at: now, active: false }
: { unenrollment_started_at: now };

await bulkUpdateAgents(
await createErrorActionResults(
esClient,
agentsToUpdate.map(({ id }) => ({ agentId: id, data: updateData }))
actionId,
outgoingErrors,
'cannot unenroll from a hosted policy or already unenrolled'
);

return {
Expand Down
Loading

0 comments on commit fcf8200

Please sign in to comment.