diff --git a/x-pack/plugins/fleet/server/services/agent_policy.ts b/x-pack/plugins/fleet/server/services/agent_policy.ts index 2a664a2e81cfe..57c876fe1441a 100644 --- a/x-pack/plugins/fleet/server/services/agent_policy.ts +++ b/x-pack/plugins/fleet/server/services/agent_policy.ts @@ -928,13 +928,15 @@ class AgentPolicyService { const policies = await agentPolicyService.getByIDs(soClient, agentPolicyIds); const policiesMap = keyBy(policies, 'id'); - const fullPolicies = await Promise.all( - agentPolicyIds.map((agentPolicyId) => - // There are some potential performance concerns around using `getFullAgentPolicy` in this context, e.g. - // re-fetching outputs, settings, and upgrade download source URI data for each policy. This could potentially - // be a bottleneck in environments with several thousand agent policies being deployed here. - agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId) - ) + const fullPolicies = await pMap( + agentPolicyIds, + // There are some potential performance concerns around using `getFullAgentPolicy` in this context, e.g. + // re-fetching outputs, settings, and upgrade download source URI data for each policy. This could potentially + // be a bottleneck in environments with several thousand agent policies being deployed here. + (agentPolicyId) => agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId), + { + concurrency: 50, + } ); const fleetServerPolicies = fullPolicies.reduce((acc, fullPolicy) => { @@ -1045,7 +1047,7 @@ class AgentPolicyService { } public async getLatestFleetPolicy(esClient: ElasticsearchClient, agentPolicyId: string) { - const res = await esClient.search({ + const res = await esClient.search({ index: AGENT_POLICY_INDEX, ignore_unavailable: true, rest_total_hits_as_int: true, diff --git a/x-pack/plugins/fleet/server/services/agents/index.ts b/x-pack/plugins/fleet/server/services/agents/index.ts index 273e87fc436d4..1852d891232a6 100644 --- a/x-pack/plugins/fleet/server/services/agents/index.ts +++ b/x-pack/plugins/fleet/server/services/agents/index.ts @@ -12,7 +12,6 @@ export * from './crud'; export * from './update'; export * from './actions'; export * from './reassign'; -export * from './setup'; export * from './update_agent_tags'; export * from './action_status'; export * from './request_diagnostics'; diff --git a/x-pack/plugins/fleet/server/services/agents/setup.ts b/x-pack/plugins/fleet/server/services/agents/setup.ts deleted file mode 100644 index a6536812a5e58..0000000000000 --- a/x-pack/plugins/fleet/server/services/agents/setup.ts +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server'; - -import { SO_SEARCH_LIMIT } from '../../constants'; -import { agentPolicyService } from '../agent_policy'; - -/** - * Ensure a .fleet-policy document exist for each agent policy so Fleet server can retrieve it - */ -export async function ensureFleetServerAgentPoliciesExists( - soClient: SavedObjectsClientContract, - esClient: ElasticsearchClient -) { - const { items: agentPolicies } = await agentPolicyService.list(soClient, { - perPage: SO_SEARCH_LIMIT, - }); - - const outdatedAgentPolicyIds = agentPolicies - .filter( - async (agentPolicy) => - !!(await agentPolicyService.getLatestFleetPolicy(esClient, agentPolicy.id)) - ) - .map((agentPolicy) => agentPolicy.id); - - await agentPolicyService.deployPolicies(soClient, outdatedAgentPolicyIds); -} diff --git a/x-pack/plugins/fleet/server/services/setup.ts b/x-pack/plugins/fleet/server/services/setup.ts index a9af93e70f7ab..8d343e56b1035 100644 --- a/x-pack/plugins/fleet/server/services/setup.ts +++ b/x-pack/plugins/fleet/server/services/setup.ts @@ -20,10 +20,7 @@ import { AUTO_UPDATE_PACKAGES } from '../../common/constants'; import type { PreconfigurationError } from '../../common/constants'; import type { DefaultPackagesInstallationError } from '../../common/types'; -import { SO_SEARCH_LIMIT } from '../constants'; - import { appContextService } from './app_context'; -import { agentPolicyService } from './agent_policy'; import { ensurePreconfiguredPackagesAndPolicies } from './preconfiguration'; import { ensurePreconfiguredOutputs, @@ -36,7 +33,6 @@ import { import { outputService } from './output'; import { downloadSourceService } from './download_source'; -import { ensureDefaultEnrollmentAPIKeyForAgentPolicy } from './api_keys'; import { getRegistryUrl, settingsService } from '.'; import { awaitIfPending } from './setup_utils'; import { ensureFleetFinalPipelineIsInstalled } from './epm/elasticsearch/ingest_pipeline/install'; @@ -54,6 +50,7 @@ import { } from './preconfiguration/fleet_server_host'; import { cleanUpOldFileIndices } from './setup/clean_old_fleet_indices'; import type { UninstallTokenInvalidError } from './security/uninstall_token_service'; +import { ensureAgentPoliciesFleetServerKeysAndPolicies } from './setup/fleet_server_policies_enrollment_keys'; export interface SetupStatus { isInitialized: boolean; @@ -227,8 +224,10 @@ async function createSetupSideEffects( stepSpan?.end(); stepSpan = apm.startSpan('Set up enrollment keys for preconfigured policies', 'preconfiguration'); - logger.debug('Setting up Fleet enrollment keys for preconfigured policies'); - await ensureDefaultEnrollmentAPIKeysExists(soClient, esClient); + logger.debug( + 'Setting up Fleet enrollment keys and verifying fleet server policies are not out-of-sync' + ); + await ensureAgentPoliciesFleetServerKeysAndPolicies({ soClient, esClient, logger }); stepSpan?.end(); const nonFatalErrors = [ @@ -293,34 +292,6 @@ export async function ensureFleetGlobalEsAssets( } } -async function ensureDefaultEnrollmentAPIKeysExists( - soClient: SavedObjectsClientContract, - esClient: ElasticsearchClient, - options?: { forceRecreate?: boolean } -) { - const security = appContextService.getSecurity(); - if (!security) { - return; - } - - if (!(await security.authc.apiKeys.areAPIKeysEnabled())) { - return; - } - - const { items: agentPolicies } = await agentPolicyService.list(soClient, { - perPage: SO_SEARCH_LIMIT, - }); - - await pMap( - agentPolicies, - (agentPolicy) => - ensureDefaultEnrollmentAPIKeyForAgentPolicy(soClient, esClient, agentPolicy.id), - { - concurrency: 20, - } - ); -} - /** * Maps the `nonFatalErrors` object returned by the setup process to a more readable * and predictable format suitable for logging output or UI presentation. diff --git a/x-pack/plugins/fleet/server/services/setup/fleet_server_policies_enrollment_keys.test.ts b/x-pack/plugins/fleet/server/services/setup/fleet_server_policies_enrollment_keys.test.ts new file mode 100644 index 0000000000000..3a90fb5a90fa3 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/setup/fleet_server_policies_enrollment_keys.test.ts @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/server/mocks'; + +import { appContextService } from '../app_context'; +import { agentPolicyService } from '../agent_policy'; +import { ensureDefaultEnrollmentAPIKeyForAgentPolicy } from '../api_keys'; + +import { ensureAgentPoliciesFleetServerKeysAndPolicies } from './fleet_server_policies_enrollment_keys'; + +jest.mock('../app_context'); +jest.mock('../agent_policy'); +jest.mock('../api_keys'); + +const mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy = jest.mocked( + ensureDefaultEnrollmentAPIKeyForAgentPolicy +); + +const mockedAgentPolicyService = jest.mocked(agentPolicyService); + +describe('ensureAgentPoliciesFleetServerKeysAndPolicies', () => { + beforeEach(() => { + jest.mocked(appContextService).getSecurity.mockReturnValue({ + authc: { apiKeys: { areAPIKeysEnabled: async () => true } }, + } as any); + + mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy.mockReset(); + mockedAgentPolicyService.getLatestFleetPolicy.mockReset(); + mockedAgentPolicyService.deployPolicies.mockImplementation(async () => {}); + mockedAgentPolicyService.list.mockResolvedValue({ + items: [ + { + id: 'policy1', + revision: 1, + }, + { + id: 'policy2', + revision: 2, + }, + ], + } as any); + }); + + it('should do nothing with policies already deployed', async () => { + const logger = loggingSystemMock.createLogger(); + const esClient = elasticsearchServiceMock.createInternalClient(); + const soClient = savedObjectsClientMock.create(); + mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => { + if (agentPolicyId === 'policy1') { + return { + revision_idx: 1, + } as any; + } + + if (agentPolicyId === 'policy2') { + return { + revision_idx: 2, + } as any; + } + + return null; + }); + + await ensureAgentPoliciesFleetServerKeysAndPolicies({ + logger, + esClient, + soClient, + }); + + expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2); + expect(mockedAgentPolicyService.deployPolicies).not.toBeCalled(); + }); + + it('should do deploy policies out of sync', async () => { + const logger = loggingSystemMock.createLogger(); + const esClient = elasticsearchServiceMock.createInternalClient(); + const soClient = savedObjectsClientMock.create(); + mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => { + if (agentPolicyId === 'policy1') { + return { + revision_idx: 1, + } as any; + } + + if (agentPolicyId === 'policy2') { + return { + revision_idx: 1, + } as any; + } + + return null; + }); + + await ensureAgentPoliciesFleetServerKeysAndPolicies({ + logger, + esClient, + soClient, + }); + + expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2); + expect(mockedAgentPolicyService.deployPolicies).toBeCalledWith(expect.anything(), ['policy2']); + }); + + it('should do deploy policies never deployed', async () => { + const logger = loggingSystemMock.createLogger(); + const esClient = elasticsearchServiceMock.createInternalClient(); + const soClient = savedObjectsClientMock.create(); + mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => { + if (agentPolicyId === 'policy1') { + return { + revision_idx: 1, + } as any; + } + + return null; + }); + + await ensureAgentPoliciesFleetServerKeysAndPolicies({ + logger, + esClient, + soClient, + }); + + expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2); + expect(mockedAgentPolicyService.deployPolicies).toBeCalledWith(expect.anything(), ['policy2']); + }); + + it('handle errors when deploying policies', async () => { + const logger = loggingSystemMock.createLogger(); + const esClient = elasticsearchServiceMock.createInternalClient(); + const soClient = savedObjectsClientMock.create(); + mockedAgentPolicyService.getLatestFleetPolicy.mockImplementation(async (_, agentPolicyId) => { + if (agentPolicyId === 'policy1') { + return { + revision_idx: 1, + } as any; + } + + return null; + }); + mockedAgentPolicyService.deployPolicies.mockRejectedValue(new Error('test rejection')); + + await ensureAgentPoliciesFleetServerKeysAndPolicies({ + logger, + esClient, + soClient, + }); + + expect(mockedEnsureDefaultEnrollmentAPIKeyForAgentPolicy).toBeCalledTimes(2); + expect(mockedAgentPolicyService.deployPolicies).toBeCalledWith(expect.anything(), ['policy2']); + + expect(logger.warn).toBeCalledWith( + 'Error deploying policies: test rejection', + expect.anything() + ); + }); +}); diff --git a/x-pack/plugins/fleet/server/services/setup/fleet_server_policies_enrollment_keys.ts b/x-pack/plugins/fleet/server/services/setup/fleet_server_policies_enrollment_keys.ts new file mode 100644 index 0000000000000..07f368c3e7400 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/setup/fleet_server_policies_enrollment_keys.ts @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient, SavedObjectsClientContract, Logger } from '@kbn/core/server'; +import pMap from 'p-map'; + +import { agentPolicyService } from '../agent_policy'; +import { ensureDefaultEnrollmentAPIKeyForAgentPolicy } from '../api_keys'; +import { SO_SEARCH_LIMIT } from '../../constants'; +import { appContextService } from '../app_context'; + +export async function ensureAgentPoliciesFleetServerKeysAndPolicies({ + logger, + soClient, + esClient, +}: { + logger: Logger; + soClient: SavedObjectsClientContract; + esClient: ElasticsearchClient; +}) { + const security = appContextService.getSecurity(); + if (!security) { + return; + } + + if (!(await security.authc.apiKeys.areAPIKeysEnabled())) { + return; + } + + const { items: agentPolicies } = await agentPolicyService.list(soClient, { + perPage: SO_SEARCH_LIMIT, + }); + + const outdatedAgentPolicyIds: string[] = []; + + await pMap( + agentPolicies, + async (agentPolicy) => { + const [latestFleetPolicy] = await Promise.all([ + agentPolicyService.getLatestFleetPolicy(esClient, agentPolicy.id), + ensureDefaultEnrollmentAPIKeyForAgentPolicy(soClient, esClient, agentPolicy.id), + ]); + + if ((latestFleetPolicy?.revision_idx ?? -1) < agentPolicy.revision) { + outdatedAgentPolicyIds.push(agentPolicy.id); + } + }, + { + concurrency: 20, + } + ); + + if (outdatedAgentPolicyIds.length) { + await agentPolicyService.deployPolicies(soClient, outdatedAgentPolicyIds).catch((error) => { + logger.warn(`Error deploying policies: ${error.message}`, { error }); + }); + } +} diff --git a/x-pack/plugins/fleet/server/services/setup/index.ts b/x-pack/plugins/fleet/server/services/setup/index.ts index a360a59a13930..5f0bc99c13e43 100644 --- a/x-pack/plugins/fleet/server/services/setup/index.ts +++ b/x-pack/plugins/fleet/server/services/setup/index.ts @@ -7,3 +7,4 @@ export { upgradePackageInstallVersion } from './upgrade_package_install_version'; export { upgradeAgentPolicySchemaVersion } from './upgrade_agent_policy_schema_version'; +export { ensureAgentPoliciesFleetServerKeysAndPolicies } from './fleet_server_policies_enrollment_keys';