Skip to content

Commit

Permalink
[APM] Circuit breaker and perf improvements for service map (elastic#…
Browse files Browse the repository at this point in the history
…159883)

Closes elastic#101920

This PR does three things:

- add a `terminate_after` parameter to the search request for the
scripted metric agg. This is a configurable setting
(`xpack.apm.serviceMapTerminateAfter`) and defaults to 100k. This is a
shard-level parameter, so there's still the possibility of lots of
shards individually returning 100k documents and the coordinating node
running out of memory because it is collecting all these docs from
individual shards. However, I suspect that there is already some
protection in the reduce phase that will terminate the request with a
stack_overflow_error without OOMing, I've reached out to the ES team to
confirm whether this is the case.
- add `xpack.apm.serviceMapMaxTraces`: this tells the max traces to
inspect in total, not just per search request. IE, if
`xpack.apm.serviceMapMaxTracesPerRequest` is 1, we simply chunk the
traces in n chunks, so it doesn't really help with memory management.
`serviceMapMaxTraces` refers to the total amount of traces to inspect.
- rewrite `getConnections` to use local mutation instead of
immutability. I saw huge CPU usage (with admittedly a pathological
scenario where there are 100s of services) in the `getConnections`
function, because it uses a deduplication mechanism that is O(n²), so I
rewrote it to O(n). Here's a before :


![image](https://github.com/elastic/kibana/assets/352732/6c24a7a2-3b48-4c95-9db2-563160a57aef)

and after:

![image](https://github.com/elastic/kibana/assets/352732/c00b8428-3026-4610-aa8b-c0046e8f0e08)

To reproduce an OOM, start ES with a much smaller amount of memory:
`$ ES_JAVA_OPTS='-Xms236m -Xmx236m' yarn es snapshot`

Then run the synthtrace Service Map OOM scenario:
`$ node scripts/synthtrace.js service_map_oom --from=now-15m --to=now
--clean`

Finally, navigate to `service-100` in the UI, and click on Service Map.
This should trigger an OOM.
  • Loading branch information
dgieselaar authored Jun 20, 2023
1 parent 85ba9e9 commit 1a9b241
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 162 deletions.
64 changes: 64 additions & 0 deletions packages/kbn-apm-synthtrace/src/scenarios/service_map_oom.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { ApmFields, httpExitSpan } from '@kbn/apm-synthtrace-client';
import { service } from '@kbn/apm-synthtrace-client/src/lib/apm/service';
import { Transaction } from '@kbn/apm-synthtrace-client/src/lib/apm/transaction';
import { Scenario } from '../cli/scenario';
import { RunOptions } from '../cli/utils/parse_run_cli_flags';
import { getSynthtraceEnvironment } from '../lib/utils/get_synthtrace_environment';

const environment = getSynthtraceEnvironment(__filename);

const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
const numServices = 500;

const tracesPerMinute = 10;

return {
generate: ({ range }) => {
const services = new Array(numServices)
.fill(undefined)
.map((_, idx) => {
return service(`service-${idx}`, 'prod', environment).instance('service-instance');
})
.reverse();

return range.ratePerMinute(tracesPerMinute).generator((timestamp) => {
const rootTransaction = services.reduce((prev, currentService) => {
const tx = currentService
.transaction(`GET /my/function`, 'request')
.timestamp(timestamp)
.duration(1000)
.children(
...(prev
? [
currentService
.span(
httpExitSpan({
spanName: `exit-span-${currentService.fields['service.name']}`,
destinationUrl: `http://address-to-exit-span-${currentService.fields['service.name']}`,
})
)
.timestamp(timestamp)
.duration(1000)
.children(prev),
]
: [])
);

return tx;
}, undefined as Transaction | undefined);

return rootTransaction!;
});
},
};
};

export default scenario;
2 changes: 2 additions & 0 deletions x-pack/plugins/apm/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const configSchema = schema.object({
serviceMapTraceIdBucketSize: schema.number({ defaultValue: 65 }),
serviceMapTraceIdGlobalBucketSize: schema.number({ defaultValue: 6 }),
serviceMapMaxTracesPerRequest: schema.number({ defaultValue: 50 }),
serviceMapTerminateAfter: schema.number({ defaultValue: 100_000 }),
serviceMapMaxTraces: schema.number({ defaultValue: 1000 }),
ui: schema.object({
enabled: schema.boolean({ defaultValue: true }),
maxTraceItems: schema.number({ defaultValue: 5000 }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ import {
} from '../../../common/service_map';
import { APMEventClient } from '../../lib/helpers/create_es_client/create_apm_event_client';

export async function fetchServicePathsFromTraceIds(
apmEventClient: APMEventClient,
traceIds: string[],
start: number,
end: number
) {
export async function fetchServicePathsFromTraceIds({
apmEventClient,
traceIds,
start,
end,
terminateAfter,
}: {
apmEventClient: APMEventClient;
traceIds: string[];
start: number;
end: number;
terminateAfter: number;
}) {
// make sure there's a range so ES can skip shards
const dayInMs = 24 * 60 * 60 * 1000;
const startRange = start - dayInMs;
Expand All @@ -30,6 +37,7 @@ export async function fetchServicePathsFromTraceIds(
apm: {
events: [ProcessorEvent.span, ProcessorEvent.transaction],
},
terminate_after: terminateAfter,
body: {
track_total_hits: false,
size: 0,
Expand Down
25 changes: 23 additions & 2 deletions x-pack/plugins/apm/server/routes/service_map/get_service_map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ async function getConnectionData({
end,
serviceGroupKuery,
kuery,
logger,
}: IEnvOptions) {
return withApmSpan('get_service_map_connections', async () => {
logger.debug('Getting trace sample IDs');
const { traceIds } = await getTraceSampleIds({
config,
apmEventClient,
Expand All @@ -59,6 +61,8 @@ async function getConnectionData({
kuery,
});

logger.debug(`Found ${traceIds.length} traces to inspect`);

const chunks = chunk(traceIds, config.serviceMapMaxTracesPerRequest);

const init = {
Expand All @@ -70,6 +74,8 @@ async function getConnectionData({
return init;
}

logger.debug(`Executing scripted metric agg (${chunks.length} chunks)`);

const chunkedResponses = await withApmSpan(
'get_service_paths_from_all_trace_ids',
() =>
Expand All @@ -80,19 +86,27 @@ async function getConnectionData({
traceIds: traceIdsChunk,
start,
end,
terminateAfter: config.serviceMapTerminateAfter,
logger,
})
)
)
);

return chunkedResponses.reduce((prev, current) => {
logger.debug('Received chunk responses');

const mergedResponses = chunkedResponses.reduce((prev, current) => {
return {
connections: prev.connections.concat(current.connections),
discoveredServices: prev.discoveredServices.concat(
current.discoveredServices
),
};
});

logger.debug('Merged responses');

return mergedResponses;
});
}

Expand All @@ -119,12 +133,19 @@ export function getServiceMap(
getServiceStats(options),
anomaliesPromise,
]);
return transformServiceMapResponses({

logger.debug('Received and parsed all responses');

const transformedResponse = transformServiceMapResponses({
response: {
...connectionData,
services: servicesData,
anomalies,
},
});

logger.debug('Transformed service map response');

return transformedResponse;
});
}
Loading

0 comments on commit 1a9b241

Please sign in to comment.