Skip to content

Commit

Permalink
[Synthetics] Monitors sync request, retry on huge payload !! (#202467)
Browse files Browse the repository at this point in the history
## Summary

Monitors sync request, retry on huge payload by splitting the payload !!

Requests will be tried recursively by splitting payload in half !!
  • Loading branch information
shahzad31 authored Dec 3, 2024
1 parent ea4ebb4 commit 839a927
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ProjectMonitor } from '../../../common/runtime_types';
import { SYNTHETICS_API_URLS } from '../../../common/constants';
import { ProjectMonitorFormatter } from '../../synthetics_service/project_monitor/project_monitor_formatter';

const MAX_PAYLOAD_SIZE = 1048576 * 50; // 20MiB
const MAX_PAYLOAD_SIZE = 1048576 * 100; // 20MiB

export const addSyntheticsProjectMonitorRoute: SyntheticsRestApiRouteFactory = () => ({
method: 'PUT',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,61 @@ describe('callAPI', () => {
url: 'https://service.dev/monitors/sync',
});
});

it('splits the payload into multiple requests if the payload is too large', async () => {
const requests: number[] = [];
const axiosSpy = (axios as jest.MockedFunction<typeof axios>).mockImplementation((req: any) => {
requests.push(req.data.monitors.length);
if (req.data.monitors.length > 100) {
// throw 413 error
return Promise.reject({ response: { status: 413 } });
}

return Promise.resolve({} as any);
});

const apiClient = new ServiceAPIClient(
logger,
{
manifestUrl: 'http://localhost:8080/api/manifest',
tls: { certificate: 'test-certificate', key: 'test-key' } as any,
},
{
isDev: true,
stackVersion: '8.7.0',
cloud: { cloudId: 'test-id', deploymentId: 'deployment-id' },
} as SyntheticsServerSetup
);

apiClient.locations = testLocations;

const output = { hosts: ['https://localhost:9200'], api_key: '12345' };

const monitors = new Array(250).fill({
...request1[0],
locations: [
{
id: 'us_central',
isServiceManaged: true,
},
],
});

await apiClient.syncMonitors({
monitors,
output,
license: licenseMock.license,
location: {
id: 'us_central',
url: 'https://service.dev',
label: 'Test location',
isServiceManaged: true,
},
});

expect(axiosSpy).toHaveBeenCalledTimes(7);
expect(requests).toEqual([250, 125, 125, 63, 62, 63, 62]);
});
});

const testLocations: PublicLocations = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import axios, { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios';
import { forkJoin, from as rxjsFrom, Observable, of } from 'rxjs';
import { concat, forkJoin, from as rxjsFrom, Observable, of } from 'rxjs';
import { catchError, tap } from 'rxjs';
import * as https from 'https';
import { SslConfig } from '@kbn/server-http-tools';
Expand Down Expand Up @@ -215,21 +215,47 @@ export class ServiceAPIClient {

const monitorsByLocation = this.processServiceData(serviceData);

monitorsByLocation.forEach(({ location: { url, id }, monitors, data }) => {
const promise = this.callServiceEndpoint(data, method, url, endpoint);
promises.push(
rxjsFrom(promise).pipe(
monitorsByLocation.forEach(({ location: { url, id }, data }) => {
const sendRequest = (payload: ServicePayload): Observable<any> => {
const promise = this.callServiceEndpoint(payload, method, url, endpoint);
return rxjsFrom(promise).pipe(
tap((result) => {
this.logSuccessMessage(url, method, monitors.length, result);
this.logSuccessMessage(url, method, payload.monitors.length, result);
}),
catchError((err: AxiosError<{ reason: string; status: number }>) => {
if (err.response?.status === 413 && payload.monitors.length > 1) {
// If payload is too large, split it and retry
const mid = Math.ceil(payload.monitors.length / 2);
const firstHalfMonitors = payload.monitors.slice(0, mid);
const secondHalfMonitors = payload.monitors.slice(mid);

this.logger.debug(
`Payload of ${payload.monitors.length} monitors is too large for location ${id}, splitting in half, in chunks of ${mid}`
);

return concat(
sendRequest({
...payload,
monitors: firstHalfMonitors,
}), // Retry with the first half
sendRequest({
...payload,
monitors: secondHalfMonitors,
}) // Retry with the second half
);
}

pushErrors.push({ locationId: id, error: err.response?.data! });
this.logServiceError(err, url, method, monitors.length);
// we don't want to throw an unhandled exception here
this.logServiceError(err, url, method, payload.monitors.length);

// Return an empty observable to prevent unhandled exceptions
return of(true);
})
)
);
);
};

// Start with the initial data payload
promises.push(sendRequest(data));
});

const result = await forkJoin(promises).toPromise();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ export class SyntheticsService {
service.locations = result.locations;
service.apiClient.locations = result.locations;
this.logger.debug(
`Fetched ${service.locations} Synthetics service locations from manifest: ${this.config.manifestUrl}`
`Fetched ${service.locations
.map((loc) => loc.id)
.join(',')} Synthetics service locations from manifest: ${this.config.manifestUrl}`
);
} catch (e) {
this.logger.error(e);
Expand All @@ -167,7 +169,7 @@ export class SyntheticsService {
[SYNTHETICS_SERVICE_SYNC_MONITORS_TASK_TYPE]: {
title: 'Synthetics Service - Sync Saved Monitors',
description: 'This task periodically pushes saved monitors to Synthetics Service.',
timeout: '1m',
timeout: '2m',
maxAttempts: 3,

createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
Expand Down Expand Up @@ -670,22 +672,19 @@ export class SyntheticsService {

if (lastRunAt) {
// log if it has missed last schedule
const diff = moment(lastRunAt).diff(current, 'minutes');
const diff = moment(current).diff(lastRunAt, 'minutes');
const syncInterval = Number((this.config.syncInterval ?? '5m').split('m')[0]);
if (diff > syncInterval) {
const message = `Synthetics monitor sync task has missed its schedule, it last ran ${diff} ago.`;
const message = `Synthetics monitor sync task has missed its schedule, it last ran ${diff} minutes ago.`;
this.logger.warn(message);
sendErrorTelemetryEvents(this.logger, this.server.telemetry, {
message,
reason: 'Failed to run synthetics sync task on schedule',
type: 'syncTaskMissedSchedule',
stackVersion: this.server.stackVersion,
});
} else {
this.logger.debug(
`Synthetics monitor sync task is running as expected, it last ran ${diff} minutes ago.`
);
}
this.logger.debug(`Synthetics monitor sync task last ran ${diff} minutes ago.`);
}
state.lastRunAt = current.toISOString();
} catch (e) {
Expand Down

0 comments on commit 839a927

Please sign in to comment.