Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] Ensure policies are not out of sync #175065

Merged
merged 3 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions x-pack/plugins/fleet/server/services/agent_policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -928,13 +928,15 @@ class AgentPolicyService {

const policies = await agentPolicyService.getByIDs(soClient, agentPolicyIds);
const policiesMap = keyBy(policies, 'id');
const fullPolicies = await Promise.all(
agentPolicyIds.map((agentPolicyId) =>
// There are some potential performance concerns around using `getFullAgentPolicy` in this context, e.g.
// re-fetching outputs, settings, and upgrade download source URI data for each policy. This could potentially
// be a bottleneck in environments with several thousand agent policies being deployed here.
agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId)
)
const fullPolicies = await pMap(
agentPolicyIds,
// There are some potential performance concerns around using `getFullAgentPolicy` in this context, e.g.
// re-fetching outputs, settings, and upgrade download source URI data for each policy. This could potentially
// be a bottleneck in environments with several thousand agent policies being deployed here.
(agentPolicyId) => agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId),
{
concurrency: 50,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
);

const fleetServerPolicies = fullPolicies.reduce((acc, fullPolicy) => {
Expand Down Expand Up @@ -1045,7 +1047,7 @@ class AgentPolicyService {
}

public async getLatestFleetPolicy(esClient: ElasticsearchClient, agentPolicyId: string) {
const res = await esClient.search({
const res = await esClient.search<FleetServerPolicy>({
index: AGENT_POLICY_INDEX,
ignore_unavailable: true,
rest_total_hits_as_int: true,
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export * from './crud';
export * from './update';
export * from './actions';
export * from './reassign';
export * from './setup';
export * from './update_agent_tags';
export * from './action_status';
export * from './request_diagnostics';
Expand Down
32 changes: 0 additions & 32 deletions x-pack/plugins/fleet/server/services/agents/setup.ts

This file was deleted.

39 changes: 5 additions & 34 deletions x-pack/plugins/fleet/server/services/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import { AUTO_UPDATE_PACKAGES } from '../../common/constants';
import type { PreconfigurationError } from '../../common/constants';
import type { DefaultPackagesInstallationError } from '../../common/types';

import { SO_SEARCH_LIMIT } from '../constants';

import { appContextService } from './app_context';
import { agentPolicyService } from './agent_policy';
import { ensurePreconfiguredPackagesAndPolicies } from './preconfiguration';
import {
ensurePreconfiguredOutputs,
Expand All @@ -36,7 +33,6 @@ import {
import { outputService } from './output';
import { downloadSourceService } from './download_source';

import { ensureDefaultEnrollmentAPIKeyForAgentPolicy } from './api_keys';
import { getRegistryUrl, settingsService } from '.';
import { awaitIfPending } from './setup_utils';
import { ensureFleetFinalPipelineIsInstalled } from './epm/elasticsearch/ingest_pipeline/install';
Expand All @@ -54,6 +50,7 @@ import {
} from './preconfiguration/fleet_server_host';
import { cleanUpOldFileIndices } from './setup/clean_old_fleet_indices';
import type { UninstallTokenInvalidError } from './security/uninstall_token_service';
import { ensureAgentPoliciesFleetServerKeysAndPolicies } from './setup/fleet_server_policies_enrollment_keys';

export interface SetupStatus {
isInitialized: boolean;
Expand Down Expand Up @@ -227,8 +224,10 @@ async function createSetupSideEffects(
stepSpan?.end();

stepSpan = apm.startSpan('Set up enrollment keys for preconfigured policies', 'preconfiguration');
logger.debug('Setting up Fleet enrollment keys for preconfigured policies');
await ensureDefaultEnrollmentAPIKeysExists(soClient, esClient);
logger.debug(
'Setting up Fleet enrollment keys and verifying fleet server policies are not out-of-sync'
);
await ensureAgentPoliciesFleetServerKeysAndPolicies({ soClient, esClient, logger });
stepSpan?.end();

const nonFatalErrors = [
Expand Down Expand Up @@ -293,34 +292,6 @@ export async function ensureFleetGlobalEsAssets(
}
}

async function ensureDefaultEnrollmentAPIKeysExists(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
options?: { forceRecreate?: boolean }
) {
const security = appContextService.getSecurity();
if (!security) {
return;
}

if (!(await security.authc.apiKeys.areAPIKeysEnabled())) {
return;
}

const { items: agentPolicies } = await agentPolicyService.list(soClient, {
perPage: SO_SEARCH_LIMIT,
});

await pMap(
agentPolicies,
(agentPolicy) =>
ensureDefaultEnrollmentAPIKeyForAgentPolicy(soClient, esClient, agentPolicy.id),
{
concurrency: 20,
}
);
}

/**
* Maps the `nonFatalErrors` object returned by the setup process to a more readable
* and predictable format suitable for logging output or UI presentation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { loggingSystemMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/server/mocks';

import { appContextService } from '../app_context';
import { agentPolicyService } from '../agent_policy';
import { ensureDefaultEnrollmentAPIKeyForAgentPolicy } from '../api_keys';

import { ensureAgentPoliciesFleetServerKeysAndPolicies } from './fleet_server_policies_enrollment_keys';

jest.mock('../app_context');
jest.mock('../agent_policy');
jest.mock('../api_keys');

const mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy = jest.mocked(
ensureDefaultEnrollmentAPIKeyForAgentPolicy
);

const mockedAgentPolicyService = jest.mocked(agentPolicyService);

describe('ensureAgentPoliciesFleetServerKeysAndPolicies', () => {
beforeEach(() => {
jest.mocked(appContextService).getSecurity.mockReturnValue({
authc: { apiKeys: { areAPIKeysEnabled: async () => true } },
} as any);

mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy.mockReset();
mockedAgentPolicyService.getLatestFleetPolicy.mockReset();
mockedAgentPolicyService.deployPolicies.mockImplementation(async () => {});
mockedAgentPolicyService.list.mockResolvedValue({
items: [
{
id: 'policy1',
revision: 1,
},
{
id: 'policy2',
revision: 2,
},
],
} as any);
});

it('should do nothing with policies already deployed', async () => {
const logger = loggingSystemMock.createLogger();
const esClient = elasticsearchServiceMock.createInternalClient();
const soClient = savedObjectsClientMock.create();
mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => {
if (agentPolicyId === 'policy1') {
return {
revision_idx: 1,
} as any;
}

if (agentPolicyId === 'policy2') {
return {
revision_idx: 2,
} as any;
}

return null;
});

await ensureAgentPoliciesFleetServerKeysAndPolicies({
logger,
esClient,
soClient,
});

expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2);
expect(mockedAgentPolicyService.deployPolicies).not.toBeCalled();
});

it('should do deploy policies out of sync', async () => {
const logger = loggingSystemMock.createLogger();
const esClient = elasticsearchServiceMock.createInternalClient();
const soClient = savedObjectsClientMock.create();
mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => {
if (agentPolicyId === 'policy1') {
return {
revision_idx: 1,
} as any;
}

if (agentPolicyId === 'policy2') {
return {
revision_idx: 1,
} as any;
}

return null;
});

await ensureAgentPoliciesFleetServerKeysAndPolicies({
logger,
esClient,
soClient,
});

expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2);
expect(mockedAgentPolicyService.deployPolicies).toBeCalledWith(expect.anything(), ['policy2']);
});

it('should do deploy policies never deployed', async () => {
const logger = loggingSystemMock.createLogger();
const esClient = elasticsearchServiceMock.createInternalClient();
const soClient = savedObjectsClientMock.create();
mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => {
if (agentPolicyId === 'policy1') {
return {
revision_idx: 1,
} as any;
}

return null;
});

await ensureAgentPoliciesFleetServerKeysAndPolicies({
logger,
esClient,
soClient,
});

expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2);
expect(mockedAgentPolicyService.deployPolicies).toBeCalledWith(expect.anything(), ['policy2']);
});

it('handle errors when deploying policies', async () => {
const logger = loggingSystemMock.createLogger();
const esClient = elasticsearchServiceMock.createInternalClient();
const soClient = savedObjectsClientMock.create();
mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => {
if (agentPolicyId === 'policy1') {
return {
revision_idx: 1,
} as any;
}

return null;
});
mockedAgentPolicyService.deployPolicies.mockRejectedValue(new Error('test rejection'));

await ensureAgentPoliciesFleetServerKeysAndPolicies({
logger,
esClient,
soClient,
});

expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2);
expect(mockedAgentPolicyService.deployPolicies).toBeCalledWith(expect.anything(), ['policy2']);

expect(logger.warn).toBeCalledWith(
'Error deploying policies: test rejection',
expect.anything()
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ElasticsearchClient, SavedObjectsClientContract, Logger } from '@kbn/core/server';
import pMap from 'p-map';

import { agentPolicyService } from '../agent_policy';
import { ensureDefaultEnrollmentAPIKeyForAgentPolicy } from '../api_keys';
import { SO_SEARCH_LIMIT } from '../../constants';
import { appContextService } from '../app_context';

export async function ensureAgentPoliciesFleetServerKeysAndPolicies({
logger,
soClient,
esClient,
}: {
logger: Logger;
soClient: SavedObjectsClientContract;
esClient: ElasticsearchClient;
}) {
const security = appContextService.getSecurity();
if (!security) {
return;
}

if (!(await security.authc.apiKeys.areAPIKeysEnabled())) {
return;
}

const { items: agentPolicies } = await agentPolicyService.list(soClient, {
perPage: SO_SEARCH_LIMIT,
});

const outdatedAgentPolicyIds: string[] = [];

await pMap(
agentPolicies,
async (agentPolicy) => {
const [latestFleetPolicy] = await Promise.all([
agentPolicyService.getLatestFleetPolicy(esClient, agentPolicy.id),
ensureDefaultEnrollmentAPIKeyForAgentPolicy(soClient, esClient, agentPolicy.id),
]);

if ((latestFleetPolicy?.revision_idx ?? -1) < agentPolicy.revision) {
outdatedAgentPolicyIds.push(agentPolicy.id);
}
},
{
concurrency: 20,
}
);

if (outdatedAgentPolicyIds.length) {
await agentPolicyService.deployPolicies(soClient, outdatedAgentPolicyIds).catch((error) => {
logger.warn(`Error deploying policies: ${error.message}`, { error });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no need to crash the setup if this fail, it's why I am swallowing the error here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we add a unit test for the error scenario? I usually combine async-await with try-catch, I suppose it should work with catch too.

});
}
}
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/server/services/setup/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@

export { upgradePackageInstallVersion } from './upgrade_package_install_version';
export { upgradeAgentPolicySchemaVersion } from './upgrade_agent_policy_schema_version';
export { ensureAgentPoliciesFleetServerKeysAndPolicies } from './fleet_server_policies_enrollment_keys';
Loading