From 7a6826be3f3352944f887591bb439d9dd5ed9ed2 Mon Sep 17 00:00:00 2001 From: Shahzad Date: Wed, 18 Oct 2023 10:42:44 +0200 Subject: [PATCH] [Synthetics] Service sync refactoring (#168587) Co-authored-by: Justin Kambic --- .../synthetics_service/service_api_client.ts | 34 ++- .../synthetics_service.test.ts | 168 ++++++++--- .../synthetics_service/synthetics_service.ts | 276 ++++++++---------- .../server/synthetics_service/utils/mocks.ts | 67 +++-- 4 files changed, 331 insertions(+), 214 deletions(-) diff --git a/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts b/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts index 02779526c8421..e30b9a18872a9 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/service_api_client.ts @@ -18,7 +18,12 @@ import { DataStreamConfig, } from './formatters/public_formatters/convert_to_data_stream'; import { sendErrorTelemetryEvents } from '../routes/telemetry/monitor_upgrade_sender'; -import { MonitorFields, PublicLocations, ServiceLocationErrors } from '../../common/runtime_types'; +import { + MonitorFields, + PublicLocations, + ServiceLocation, + ServiceLocationErrors, +} from '../../common/runtime_types'; import { ServiceConfig } from '../../common/config'; const TEST_SERVICE_USERNAME = 'localKibanaIntegrationTestsUser'; @@ -32,6 +37,7 @@ export interface ServiceData { endpoint?: 'monitors' | 'runOnce' | 'sync'; isEdit?: boolean; license: LicenseGetLicenseInformation; + location?: ServiceLocation; } export interface ServicePayload { @@ -161,10 +167,14 @@ export class ServiceAPIClient { } async syncMonitors(data: ServiceData) { - return (await this.callAPI('PUT', { ...data, endpoint: 'sync' })).pushErrors; + try { + return (await this.callAPI('PUT', { ...data, endpoint: 'sync' })).pushErrors; + } catch (e) { + this.logger.error(e); + } } - processServiceData({ monitors, ...restOfData }: ServiceData) { + processServiceData({ monitors, location, ...restOfData }: ServiceData) { // group monitors by location const monitorsByLocation: Array<{ location: { id: string; url: string }; @@ -172,12 +182,14 @@ export class ServiceAPIClient { data: ServicePayload; }> = []; this.locations.forEach(({ id, url }) => { - const locMonitors = monitors.filter(({ locations }) => - locations?.find((loc) => loc.id === id && loc.isServiceManaged) - ); - if (locMonitors.length > 0) { - const data = this.getRequestData({ ...restOfData, monitors: locMonitors }); - monitorsByLocation.push({ location: { id, url }, monitors: locMonitors, data }); + if (!location || location.id === id) { + const locMonitors = monitors.filter(({ locations }) => + locations?.find((loc) => loc.id === id && loc.isServiceManaged) + ); + if (locMonitors.length > 0) { + const data = this.getRequestData({ ...restOfData, monitors: locMonitors }); + monitorsByLocation.push({ location: { id, url }, monitors: locMonitors, data }); + } } }); return monitorsByLocation; @@ -275,7 +287,9 @@ export class ServiceAPIClient { result: AxiosResponse | ServicePayload ) { if ('status' in result || 'request' in result) { - this.logger.debug(result.data); + if (result.data) { + this.logger.debug(result.data); + } this.logger.debug( `Successfully called service location ${url}${result.request?.path} with method ${method} with ${numMonitors} monitors` ); diff --git a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts index e98508a5e4116..66c25a8bb5ca1 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts @@ -23,22 +23,24 @@ const taskManagerSetup = taskManagerMock.createSetup(); const mockCoreStart = coreMock.createStart() as CoreStart; -mockCoreStart.elasticsearch.client.asInternalUser.license.get = jest.fn().mockResolvedValue({ - license: { - status: 'active', - uid: 'c5788419-1c6f-424a-9217-da7a0a9151a0', - type: 'platinum', - issue_date: '2022-11-29T00:00:00.000Z', - issue_date_in_millis: 1669680000000, - expiry_date: '2024-12-31T23:59:59.999Z', - expiry_date_in_millis: 1735689599999, - max_nodes: 100, - max_resource_units: null, - issued_to: 'Elastic - INTERNAL (development environments)', - issuer: 'API', - start_date_in_millis: 1669680000000, - }, -}); +const mockLicense = () => { + mockCoreStart.elasticsearch.client.asInternalUser.license.get = jest.fn().mockResolvedValue({ + license: { + status: 'active', + uid: 'c5788419-1c6f-424a-9217-da7a0a9151a0', + type: 'platinum', + issue_date: '2022-11-29T00:00:00.000Z', + issue_date_in_millis: 1669680000000, + expiry_date: '2024-12-31T23:59:59.999Z', + expiry_date_in_millis: 1735689599999, + max_nodes: 100, + max_resource_units: null, + issued_to: 'Elastic - INTERNAL (development environments)', + issuer: 'API', + start_date_in_millis: 1669680000000, + }, + }); +}; const getFakePayload = (locations: HeartbeatConfig['locations']) => { return { @@ -87,6 +89,16 @@ describe('SyntheticsService', () => { savedObjectsClient: savedObjectsClientMock.create()!, } as unknown as SyntheticsServerSetup; + const mockConfig = { + service: { + devUrl: 'http://localhost', + manifestUrl: 'https://test-manifest.com', + }, + enabled: true, + }; + + mockLicense(); + const getMockedService = (locationsNum: number = 1) => { const locations = times(locationsNum).map((n) => { return { @@ -101,13 +113,7 @@ describe('SyntheticsService', () => { status: LocationStatus.GA, }; }); - serverMock.config = { - service: { - devUrl: 'http://localhost', - manifestUrl: 'https://test-manifest.com', - }, - enabled: true, - }; + serverMock.config = mockConfig; if (serverMock.savedObjectsClient) { serverMock.savedObjectsClient.find = jest.fn().mockResolvedValue({ saved_objects: [ @@ -133,8 +139,10 @@ describe('SyntheticsService', () => { const service = new SyntheticsService(serverMock); service.apiClient.locations = locations; + service.locations = locations; jest.spyOn(service, 'getOutput').mockResolvedValue({ hosts: ['es'], api_key: 'i:k' }); + jest.spyOn(service, 'getSyntheticsParams').mockResolvedValue({}); return { service, locations }; }; @@ -222,7 +230,7 @@ describe('SyntheticsService', () => { const { service } = getMockedService(); jest.spyOn(service, 'getOutput').mockRestore(); - serverMock.encryptedSavedObjects = mockEncryptedSO(null) as any; + serverMock.encryptedSavedObjects = mockEncryptedSO(); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -240,8 +248,12 @@ describe('SyntheticsService', () => { jest.spyOn(service, 'getOutput').mockRestore(); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: getFakePayload([locations[0]]), - }) as any; + monitors: [ + { + attributes: getFakePayload([locations[0]]), + }, + ], + }); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -309,8 +321,12 @@ describe('SyntheticsService', () => { const { service, locations } = getMockedService(); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: getFakePayload([locations[0]]), - }) as any; + monitors: [ + { + attributes: getFakePayload([locations[0]]), + }, + ], + }); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -357,8 +373,10 @@ describe('SyntheticsService', () => { }); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: getFakePayload([locations[0]]), - }) as any; + monitors: { + attributes: getFakePayload([locations[0]]), + }, + }); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); @@ -370,13 +388,18 @@ describe('SyntheticsService', () => { describe('getSyntheticsParams', () => { it('returns the params for all spaces', async () => { const { service } = getMockedService(); + jest.spyOn(service, 'getSyntheticsParams').mockReset(); (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: { key: 'username', value: 'elastic' }, - namespaces: ['*'], - }) as any; + params: [ + { + attributes: { key: 'username', value: 'elastic' }, + namespaces: ['*'], + }, + ], + }); const params = await service.getSyntheticsParams(); @@ -389,6 +412,16 @@ describe('SyntheticsService', () => { it('returns the params for specific space', async () => { const { service } = getMockedService(); + jest.spyOn(service, 'getSyntheticsParams').mockReset(); + + serverMock.encryptedSavedObjects = mockEncryptedSO({ + params: [ + { + attributes: { key: 'username', value: 'elastic' }, + namespaces: ['*'], + }, + ], + }); const params = await service.getSyntheticsParams({ spaceId: 'default' }); @@ -403,11 +436,16 @@ describe('SyntheticsService', () => { }); it('returns the space limited params', async () => { const { service } = getMockedService(); + jest.spyOn(service, 'getSyntheticsParams').mockReset(); serverMock.encryptedSavedObjects = mockEncryptedSO({ - attributes: { key: 'username', value: 'elastic' }, - namespaces: ['default'], - }) as any; + params: [ + { + attributes: { key: 'username', value: 'elastic' }, + namespaces: ['default'], + }, + ], + }); const params = await service.getSyntheticsParams({ spaceId: 'default' }); @@ -418,4 +456,62 @@ describe('SyntheticsService', () => { }); }); }); + + describe('pagination', () => { + const service = new SyntheticsService(serverMock); + + const locations = times(5).map((n) => { + return { + id: `loc-${n}`, + label: `Location ${n}`, + url: `https://example.com/${n}`, + geo: { + lat: 0, + lon: 0, + }, + isServiceManaged: true, + status: LocationStatus.GA, + }; + }); + service.apiClient.locations = locations; + service.locations = locations; + jest.spyOn(service, 'getOutput').mockResolvedValue({ hosts: ['es'], api_key: 'i:k' }); + jest.spyOn(service, 'getSyntheticsParams').mockResolvedValue({}); + + it('paginates the results', async () => { + serverMock.config = mockConfig; + + mockLicense(); + + const syncSpy = jest.spyOn(service.apiClient, 'syncMonitors'); + + let num = -1; + const data = times(10000).map((n) => { + if (num === 4) { + num = -1; + } + num++; + if (locations?.[num + 1]) { + return { + attributes: getFakePayload([locations[num], locations[num + 1]]), + }; + } + return { + attributes: getFakePayload([locations[num]]), + }; + }); + + serverMock.encryptedSavedObjects = mockEncryptedSO({ monitors: data }); + + (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); + + await service.pushConfigs(); + + expect(syncSpy).toHaveBeenCalledTimes(72); + expect(axios).toHaveBeenCalledTimes(72); + expect(logger.debug).toHaveBeenCalledTimes(112); + expect(logger.info).toHaveBeenCalledTimes(0); + expect(logger.error).toHaveBeenCalledTimes(0); + }); + }); }); diff --git a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts index da0434201eaa2..4f5739e933e0a 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts @@ -7,18 +7,16 @@ /* eslint-disable max-classes-per-file */ -import { Logger, SavedObject, ElasticsearchClient } from '@kbn/core/server'; +import { ElasticsearchClient, Logger, SavedObject } from '@kbn/core/server'; import { ConcreteTaskInstance, TaskInstance, TaskManagerSetupContract, TaskManagerStartContract, } from '@kbn/task-manager-plugin/server'; -import { concatMap, Subject } from 'rxjs'; -import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server'; -import pMap from 'p-map'; import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common'; import { ALL_SPACES_ID } from '@kbn/spaces-plugin/common/constants'; +import pMap from 'p-map'; import { registerCleanUpTask } from './private_location/clean_up_task'; import { SyntheticsServerSetup } from '../types'; import { syntheticsMonitorType, syntheticsParamType } from '../../common/types/saved_objects'; @@ -31,11 +29,9 @@ import { ServiceAPIClient, ServiceData } from './service_api_client'; import { ConfigKey, - EncryptedSyntheticsMonitorAttributes, MonitorFields, ServiceLocationErrors, ServiceLocations, - SyntheticsMonitorWithId, SyntheticsMonitorWithSecretsAttributes, SyntheticsParams, ThrottlingOptions, @@ -279,6 +275,18 @@ export class SyntheticsService { return license; } + private async getSOClientFinder({ pageSize }: { pageSize: number }) { + const encryptedClient = this.server.encryptedSavedObjects.getClient(); + + return await encryptedClient.createPointInTimeFinderDecryptedAsInternalUser( + { + type: syntheticsMonitorType, + perPage: pageSize, + namespaces: [ALL_SPACES_ID], + } + ); + } + private getESClient() { if (!this.server.coreStart) { return; @@ -373,55 +381,95 @@ export class SyntheticsService { async pushConfigs() { const license = await this.getLicense(); const service = this; - const subject = new Subject(); + + const PER_PAGE = 250; + service.syncErrors = []; let output: ServiceData['output'] | null = null; - subject - .pipe( - concatMap(async (monitors) => { - try { - if (monitors.length === 0 || !this.config.manifestUrl) { - return; - } + const paramsBySpace = await this.getSyntheticsParams(); + const finder = await this.getSOClientFinder({ pageSize: PER_PAGE }); - if (!output) { - output = await this.getOutput(); + const bucketsByLocation: Record = {}; + this.locations.forEach((location) => { + bucketsByLocation[location.id] = []; + }); - if (!output) { - sendErrorTelemetryEvents(service.logger, service.server.telemetry, { - reason: 'API key is not valid.', - message: 'Failed to push configs. API key is not valid.', - type: 'invalidApiKey', - stackVersion: service.server.stackVersion, - }); - return; - } - } + const syncAllLocations = async (perBucket = 0) => { + await pMap( + this.locations, + async (location) => { + if (bucketsByLocation[location.id].length > perBucket && output) { + const locMonitors = bucketsByLocation[location.id].splice(0, PER_PAGE); - this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`); + this.logger.debug( + `${locMonitors.length} monitors will be pushed to synthetics service for location ${location.id}.` + ); - service.syncErrors = await this.apiClient.syncMonitors({ - monitors, + const syncErrors = await this.apiClient.syncMonitors({ + monitors: locMonitors, output, license, + location, }); - } catch (e) { + + this.syncErrors = [...(this.syncErrors ?? []), ...(syncErrors ?? [])]; + } + }, + { + stopOnError: false, + } + ); + }; + + for await (const result of finder.find()) { + try { + if (!output) { + output = await this.getOutput(); + if (!output) { sendErrorTelemetryEvents(service.logger, service.server.telemetry, { - reason: 'Failed to push configs to service', - message: e?.message, - type: 'pushConfigsError', - code: e?.code, - status: e.status, + reason: 'API key is not valid.', + message: 'Failed to push configs. API key is not valid.', + type: 'invalidApiKey', stackVersion: service.server.stackVersion, }); - this.logger.error(e); + return; } - }) - ) - .subscribe(); + } + + const monitors = result.saved_objects.filter(({ error }) => !error); + const formattedConfigs = this.normalizeConfigs(monitors, paramsBySpace); + + this.logger.debug( + `${formattedConfigs.length} monitors will be pushed to synthetics service.` + ); + + formattedConfigs.forEach((monitor) => { + monitor.locations.forEach((location) => { + if (location.isServiceManaged) { + bucketsByLocation[location.id]?.push(monitor); + } + }); + }); + + await syncAllLocations(PER_PAGE); + } catch (e) { + sendErrorTelemetryEvents(service.logger, service.server.telemetry, { + reason: 'Failed to push configs to service', + message: e?.message, + type: 'pushConfigsError', + code: e?.code, + status: e.status, + stackVersion: service.server.stackVersion, + }); + this.logger.error(e); + } + } - await this.getMonitorConfigs(subject); + // execute the remaining monitors + await syncAllLocations(); + + await finder.close(); } async runOnceConfigs(configs?: ConfigData) { @@ -481,123 +529,28 @@ export class SyntheticsService { async deleteAllConfigs() { const license = await this.getLicense(); - const subject = new Subject(); - - subject - .pipe( - concatMap(async (monitors) => { - const hasPublicLocations = monitors.some((config) => - config.locations.some(({ isServiceManaged }) => isServiceManaged) - ); - - if (hasPublicLocations) { - const output = await this.getOutput(); - if (!output) { - return; - } - - const data = { - output, - monitors, - license, - }; - return await this.apiClient.delete(data); - } - }) - ) - .subscribe(); - - await this.getMonitorConfigs(subject); - } - - async getMonitorConfigs(subject: Subject) { - const soClient = this.server.savedObjectsClient; - const encryptedClient = this.server.encryptedSavedObjects.getClient(); - - if (!soClient?.find) { - return [] as SyntheticsMonitorWithId[]; - } - const paramsBySpace = await this.getSyntheticsParams(); - - const finder = soClient.createPointInTimeFinder({ - type: syntheticsMonitorType, - perPage: 100, - namespaces: [ALL_SPACES_ID], - }); + const finder = await this.getSOClientFinder({ pageSize: 100 }); + const output = await this.getOutput(); + if (!output) { + return; + } for await (const result of finder.find()) { - const monitors = await this.decryptMonitors(result.saved_objects, encryptedClient); - - const configDataList: ConfigData[] = (monitors ?? []).map((monitor) => { - const attributes = monitor.attributes as unknown as MonitorFields; - const monitorSpace = monitor.namespaces?.[0] ?? DEFAULT_SPACE_ID; - - const params = paramsBySpace[monitorSpace]; + const monitors = this.normalizeConfigs(result.saved_objects, paramsBySpace); + const hasPublicLocations = monitors.some((config) => + config.locations.some(({ isServiceManaged }) => isServiceManaged) + ); - return { - params: { ...params, ...(paramsBySpace?.[ALL_SPACES_ID] ?? {}) }, - monitor: normalizeSecrets(monitor).attributes, - configId: monitor.id, - heartbeatId: attributes[ConfigKey.MONITOR_QUERY_ID], + if (hasPublicLocations) { + const data = { + output, + monitors, + license, }; - }); - - const formattedConfigs = this.formatConfigs(configDataList); - - subject.next(formattedConfigs as MonitorFields[]); + return await this.apiClient.delete(data); + } } - - await finder.close(); - } - - async decryptMonitors( - monitors: Array>, - encryptedClient: EncryptedSavedObjectsClient - ) { - const start = performance.now(); - - const decryptedMonitors = await pMap( - monitors, - (monitor) => - new Promise((resolve) => { - encryptedClient - .getDecryptedAsInternalUser( - syntheticsMonitorType, - monitor.id, - { - namespace: monitor.namespaces?.[0], - } - ) - .then((decryptedMonitor) => resolve(decryptedMonitor)) - .catch((e) => { - this.logger.error(e); - sendErrorTelemetryEvents(this.logger, this.server.telemetry, { - reason: 'Failed to decrypt monitor', - message: e?.message, - type: 'runTaskError', - code: e?.code, - status: e.status, - stackVersion: this.server.stackVersion, - }); - resolve(null); - }); - }) - ); - - const end = performance.now(); - const duration = end - start; - - this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, { - event: { - duration, - }, - monitors: monitors.length, - }); - - return decryptedMonitors.filter((monitor) => monitor !== null) as Array< - SavedObject - >; } async getSyntheticsParams({ @@ -671,6 +624,27 @@ export class SyntheticsService { ); }); } + + normalizeConfigs( + monitors: Array>, + paramsBySpace: Record> + ) { + const configDataList = (monitors ?? []).map((monitor) => { + const attributes = monitor.attributes as unknown as MonitorFields; + const monitorSpace = monitor.namespaces?.[0] ?? DEFAULT_SPACE_ID; + + const params = paramsBySpace[monitorSpace]; + + return { + params: { ...params, ...(paramsBySpace?.[ALL_SPACES_ID] ?? {}) }, + monitor: normalizeSecrets(monitor).attributes, + configId: monitor.id, + heartbeatId: attributes[ConfigKey.MONITOR_QUERY_ID], + }; + }); + + return this.formatConfigs(configDataList) as MonitorFields[]; + } } class IndexTemplateInstallationError extends Error { diff --git a/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts b/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts index f881d242aeb52..b9d6b54961f59 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/utils/mocks.ts @@ -6,21 +6,54 @@ */ import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server'; +import { cloneDeep } from 'lodash'; +import { syntheticsParamType } from '../../../common/types/saved_objects'; -export const mockEncryptedSO = ( - data: any = { attributes: { key: 'username', value: 'elastic' }, namespaces: ['*'] } -) => ({ - getClient: jest.fn().mockReturnValue({ - getDecryptedAsInternalUser: jest.fn().mockResolvedValue(data), - createPointInTimeFinderDecryptedAsInternalUser: jest.fn().mockImplementation(() => ({ - close: jest.fn(), - find: jest.fn().mockReturnValue({ - async *[Symbol.asyncIterator]() { - yield { - saved_objects: data === null ? [] : [data], - }; - }, - }), - })), - } as jest.Mocked), -}); +export const mockEncryptedSO = ({ + monitors = null, + params, +}: { monitors?: any; params?: any } = {}) => { + const result = cloneDeep(monitors); + const mockParams = params ?? [ + { attributes: { key: 'username', value: 'elastic' }, namespaces: ['*'] }, + ]; + return { + isEncryptionError: jest.fn(), + getClient: jest.fn().mockReturnValue({ + getDecryptedAsInternalUser: jest.fn().mockResolvedValue(monitors), + createPointInTimeFinderDecryptedAsInternalUser: jest + .fn() + .mockImplementation(({ perPage, type: soType }) => ({ + close: jest.fn(), + find: jest.fn().mockReturnValue({ + async *[Symbol.asyncIterator]() { + if (soType === syntheticsParamType) { + yield { + saved_objects: mockParams, + }; + return; + } + if (!perPage) { + yield { + saved_objects: result, + }; + return; + } + if (monitors === null) { + return; + } + do { + const currentPage = result.splice(0, perPage); + if (currentPage.length === 0) { + return; + } + yield { + saved_objects: currentPage, + }; + } while (result.length > 0); + }, + }), + })), + } as jest.Mocked), + }; +};