From c978573e0628acbbf84ff675cfe27ab74bc0ffe6 Mon Sep 17 00:00:00 2001 From: Shahzad Date: Tue, 3 Dec 2024 13:04:42 +0100 Subject: [PATCH] [Synthetics] Monitors sync request, retry on huge payload !! (#202467) ## Summary Monitors sync request, retry on huge payload by splitting the payload !! Requests will be tried recursively by splitting payload in half !! --- .../monitor_cruds/add_monitor_project.ts | 2 +- .../service_api_client.test.ts | 55 +++++++++++++++++++ .../synthetics_service/service_api_client.ts | 46 ++++++++++++---- .../synthetics_service/synthetics_service.ts | 15 +++-- 4 files changed, 99 insertions(+), 19 deletions(-) diff --git a/x-pack/plugins/observability_solution/synthetics/server/routes/monitor_cruds/add_monitor_project.ts b/x-pack/plugins/observability_solution/synthetics/server/routes/monitor_cruds/add_monitor_project.ts index 8311a6407bf6a..bc55c950a2e81 100644 --- a/x-pack/plugins/observability_solution/synthetics/server/routes/monitor_cruds/add_monitor_project.ts +++ b/x-pack/plugins/observability_solution/synthetics/server/routes/monitor_cruds/add_monitor_project.ts @@ -14,7 +14,7 @@ import { ProjectMonitor } from '../../../common/runtime_types'; import { SYNTHETICS_API_URLS } from '../../../common/constants'; import { ProjectMonitorFormatter } from '../../synthetics_service/project_monitor/project_monitor_formatter'; -const MAX_PAYLOAD_SIZE = 1048576 * 50; // 20MiB +const MAX_PAYLOAD_SIZE = 1048576 * 100; // 20MiB export const addSyntheticsProjectMonitorRoute: SyntheticsRestApiRouteFactory = () => ({ method: 'PUT', diff --git a/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.test.ts b/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.test.ts index 93b3ca2fb3742..cab8157ca3567 100644 --- a/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.test.ts +++ b/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.test.ts @@ -470,6 +470,61 @@ describe('callAPI', () => { url: 'https://service.dev/monitors/sync', }); }); + + it('splits the payload into multiple requests if the payload is too large', async () => { + const requests: number[] = []; + const axiosSpy = (axios as jest.MockedFunction).mockImplementation((req: any) => { + requests.push(req.data.monitors.length); + if (req.data.monitors.length > 100) { + // throw 413 error + return Promise.reject({ response: { status: 413 } }); + } + + return Promise.resolve({} as any); + }); + + const apiClient = new ServiceAPIClient( + logger, + { + manifestUrl: 'http://localhost:8080/api/manifest', + tls: { certificate: 'test-certificate', key: 'test-key' } as any, + }, + { + isDev: true, + stackVersion: '8.7.0', + cloud: { cloudId: 'test-id', deploymentId: 'deployment-id' }, + } as SyntheticsServerSetup + ); + + apiClient.locations = testLocations; + + const output = { hosts: ['https://localhost:9200'], api_key: '12345' }; + + const monitors = new Array(250).fill({ + ...request1[0], + locations: [ + { + id: 'us_central', + isServiceManaged: true, + }, + ], + }); + + await apiClient.syncMonitors({ + monitors, + output, + license: licenseMock.license, + location: { + id: 'us_central', + url: 'https://service.dev', + label: 'Test location', + isServiceManaged: true, + }, + }); + + expect(axiosSpy).toHaveBeenCalledTimes(7); + expect(requests).toEqual([250, 125, 125, 63, 62, 63, 62]); + }); }); const testLocations: PublicLocations = [ diff --git a/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.ts b/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.ts index 73f286e40d310..3f9959440b3e9 100644 --- a/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.ts +++ b/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.ts @@ -6,7 +6,7 @@ */ import axios, { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios'; -import { forkJoin, from as rxjsFrom, Observable, of } from 'rxjs'; +import { concat, forkJoin, from as rxjsFrom, Observable, of } from 'rxjs'; import { catchError, tap } from 'rxjs'; import * as https from 'https'; import { SslConfig } from '@kbn/server-http-tools'; @@ -215,21 +215,47 @@ export class ServiceAPIClient { const monitorsByLocation = this.processServiceData(serviceData); - monitorsByLocation.forEach(({ location: { url, id }, monitors, data }) => { - const promise = this.callServiceEndpoint(data, method, url, endpoint); - promises.push( - rxjsFrom(promise).pipe( + monitorsByLocation.forEach(({ location: { url, id }, data }) => { + const sendRequest = (payload: ServicePayload): Observable => { + const promise = this.callServiceEndpoint(payload, method, url, endpoint); + return rxjsFrom(promise).pipe( tap((result) => { - this.logSuccessMessage(url, method, monitors.length, result); + this.logSuccessMessage(url, method, payload.monitors.length, result); }), catchError((err: AxiosError<{ reason: string; status: number }>) => { + if (err.response?.status === 413 && payload.monitors.length > 1) { + // If payload is too large, split it and retry + const mid = Math.ceil(payload.monitors.length / 2); + const firstHalfMonitors = payload.monitors.slice(0, mid); + const secondHalfMonitors = payload.monitors.slice(mid); + + this.logger.debug( + `Payload of ${payload.monitors.length} monitors is too large for location ${id}, splitting in half, in chunks of ${mid}` + ); + + return concat( + sendRequest({ + ...payload, + monitors: firstHalfMonitors, + }), // Retry with the first half + sendRequest({ + ...payload, + monitors: secondHalfMonitors, + }) // Retry with the second half + ); + } + pushErrors.push({ locationId: id, error: err.response?.data! }); - this.logServiceError(err, url, method, monitors.length); - // we don't want to throw an unhandled exception here + this.logServiceError(err, url, method, payload.monitors.length); + + // Return an empty observable to prevent unhandled exceptions return of(true); }) - ) - ); + ); + }; + + // Start with the initial data payload + promises.push(sendRequest(data)); }); const result = await forkJoin(promises).toPromise(); diff --git a/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/synthetics_service.ts b/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/synthetics_service.ts index be72ca4d9a496..c223de84c42dc 100644 --- a/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/synthetics_service.ts +++ b/x-pack/plugins/observability_solution/synthetics/server/synthetics_service/synthetics_service.ts @@ -152,7 +152,9 @@ export class SyntheticsService { service.locations = result.locations; service.apiClient.locations = result.locations; this.logger.debug( - `Fetched ${service.locations} Synthetics service locations from manifest: ${this.config.manifestUrl}` + `Fetched ${service.locations + .map((loc) => loc.id) + .join(',')} Synthetics service locations from manifest: ${this.config.manifestUrl}` ); } catch (e) { this.logger.error(e); @@ -167,7 +169,7 @@ export class SyntheticsService { [SYNTHETICS_SERVICE_SYNC_MONITORS_TASK_TYPE]: { title: 'Synthetics Service - Sync Saved Monitors', description: 'This task periodically pushes saved monitors to Synthetics Service.', - timeout: '1m', + timeout: '2m', maxAttempts: 3, createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { @@ -670,10 +672,10 @@ export class SyntheticsService { if (lastRunAt) { // log if it has missed last schedule - const diff = moment(lastRunAt).diff(current, 'minutes'); + const diff = moment(current).diff(lastRunAt, 'minutes'); const syncInterval = Number((this.config.syncInterval ?? '5m').split('m')[0]); if (diff > syncInterval) { - const message = `Synthetics monitor sync task has missed its schedule, it last ran ${diff} ago.`; + const message = `Synthetics monitor sync task has missed its schedule, it last ran ${diff} minutes ago.`; this.logger.warn(message); sendErrorTelemetryEvents(this.logger, this.server.telemetry, { message, @@ -681,11 +683,8 @@ export class SyntheticsService { type: 'syncTaskMissedSchedule', stackVersion: this.server.stackVersion, }); - } else { - this.logger.debug( - `Synthetics monitor sync task is running as expected, it last ran ${diff} minutes ago.` - ); } + this.logger.debug(`Synthetics monitor sync task last ran ${diff} minutes ago.`); } state.lastRunAt = current.toISOString(); } catch (e) {