From 1c20caa34de91319be01e410f312e0161fa2c831 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 8 May 2024 14:01:36 +0200 Subject: [PATCH] Add sub groups recursively --- .../action/GetStackTracesActionIT.java | 23 --- .../xpack/profiling/action/SubGroup.java | 11 ++ .../profiling/action/SubGroupCollector.java | 158 ++++++++++++++++++ .../action/TransportGetStackTracesAction.java | 64 ++----- .../action/SubGroupCollectorTests.java | 149 +++++++++++++++++ 5 files changed, 330 insertions(+), 75 deletions(-) create mode 100644 x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroupCollector.java create mode 100644 x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/action/SubGroupCollectorTests.java diff --git a/x-pack/plugin/profiling/src/internalClusterTest/java/org/elasticsearch/xpack/profiling/action/GetStackTracesActionIT.java b/x-pack/plugin/profiling/src/internalClusterTest/java/org/elasticsearch/xpack/profiling/action/GetStackTracesActionIT.java index c6263a4a2ce9..6463cda554e5 100644 --- a/x-pack/plugin/profiling/src/internalClusterTest/java/org/elasticsearch/xpack/profiling/action/GetStackTracesActionIT.java +++ b/x-pack/plugin/profiling/src/internalClusterTest/java/org/elasticsearch/xpack/profiling/action/GetStackTracesActionIT.java @@ -103,29 +103,6 @@ public void testGetStackTracesGroupedByServiceName() throws Exception { assertEquals("vmlinux", response.getExecutables().get("lHp5_WAgpLy2alrUVab6HA")); } - public void testGetStackTracesGroupedByInvalidField() { - GetStackTracesRequest request = new GetStackTracesRequest( - 1000, - 600.0d, - 1.0d, - 1.0d, - null, - null, - null, - // only service.name is supported (note the trailing "s") - "service.names", - null, - null, - null, - null, - null, - null - ); - request.setAdjustSampleCount(true); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, client().execute(GetStackTracesAction.INSTANCE, request)); - assertEquals("Requested custom event aggregation fields [service.names] but only [service.name] is supported.", e.getMessage()); - } - public void testGetStackTracesFromAPMWithMatchNoDownsampling() throws Exception { BoolQueryBuilder query = QueryBuilders.boolQuery(); query.must().add(QueryBuilders.termQuery("transaction.name", "encodeSha1")); diff --git a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroup.java b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroup.java index 13b693123324..25ba70ee7185 100644 --- a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroup.java +++ b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroup.java @@ -44,11 +44,22 @@ public SubGroup addCount(String name, long count) { return this; } + public SubGroup getOrAddChild(String name) { + if (subgroups.containsKey(name) == false) { + this.subgroups.put(name, new SubGroup(name, null, renderLegacyXContent, new HashMap<>())); + } + return this.subgroups.get(name); + } + public Long getCount(String name) { SubGroup subGroup = this.subgroups.get(name); return subGroup != null ? subGroup.count : null; } + public SubGroup getSubGroup(String name) { + return this.subgroups.get(name); + } + public SubGroup copy() { Map copy = new HashMap<>(subgroups.size()); for (Map.Entry subGroup : subgroups.entrySet()) { diff --git a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroupCollector.java b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroupCollector.java new file mode 100644 index 000000000000..b1db999805b9 --- /dev/null +++ b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/SubGroupCollector.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.profiling.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; + +import java.util.Iterator; + +public final class SubGroupCollector { + /** + * Users may provide a custom field via the API that is used to sub-divide profiling events. This is useful in the context of TopN + * where we want to provide additional breakdown of where a certain function has been called (e.g. a certain service or transaction). + */ + static final String CUSTOM_EVENT_SUB_AGGREGATION_NAME = "custom_event_group_"; + + private static final Logger log = LogManager.getLogger(SubGroupCollector.class); + + private final String[] aggregationFields; + private final boolean legacyAggregationField; + private final AbstractAggregationBuilder parentAggregation; + + public static SubGroupCollector attach( + AbstractAggregationBuilder parentAggregation, + String[] aggregationFields, + boolean legacyAggregationField + ) { + SubGroupCollector c = new SubGroupCollector(parentAggregation, aggregationFields, legacyAggregationField); + c.addAggregations(); + return c; + } + + private SubGroupCollector(AbstractAggregationBuilder parentAggregation, String[] aggregationFields, boolean legacyAggregationField) { + this.parentAggregation = parentAggregation; + this.aggregationFields = aggregationFields; + this.legacyAggregationField = legacyAggregationField; + } + + private boolean hasAggregationFields() { + return aggregationFields != null && aggregationFields.length > 0; + } + + private void addAggregations() { + if (hasAggregationFields()) { + // cast to Object to disambiguate this from a varargs call + log.trace("Grouping stacktrace events by {}.", (Object) aggregationFields); + AbstractAggregationBuilder parentAgg = this.parentAggregation; + for (String aggregationField : aggregationFields) { + String aggName = CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationField; + TermsAggregationBuilder agg = new TermsAggregationBuilder(aggName).field(aggregationField); + parentAgg.subAggregation(agg); + parentAgg = agg; + } + } + } + + void collectResults(MultiBucketsAggregation.Bucket bucket, TraceEvent event) { + collectResults(new BucketAdapter(bucket), event); + } + + void collectResults(Bucket bucket, TraceEvent event) { + if (hasAggregationFields()) { + if (event.subGroups == null) { + event.subGroups = SubGroup.root(aggregationFields[0], legacyAggregationField); + } + collectInternal(bucket.getAggregations(), event.subGroups, 0); + } + } + + private void collectInternal(Agg parentAgg, SubGroup parentGroup, int aggField) { + if (aggField == aggregationFields.length) { + return; + } + String aggName = CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationFields[aggField]; + for (Bucket b : parentAgg.getBuckets(aggName)) { + String subGroupName = b.getKey(); + parentGroup.addCount(subGroupName, b.getCount()); + SubGroup currentGroup = parentGroup.getSubGroup(subGroupName); + int nextAggField = aggField + 1; + if (nextAggField < aggregationFields.length) { + collectInternal(b.getAggregations(), currentGroup.getOrAddChild(aggregationFields[nextAggField]), nextAggField); + } + } + } + + // The sole purpose of the code below is to abstract our code from the aggs framework to make it unit-testable + interface Agg { + Iterable getBuckets(String aggName); + + } + + interface Bucket { + String getKey(); + + long getCount(); + + Agg getAggregations(); + } + + static class InternalAggregationAdapter implements Agg { + private final InternalAggregations agg; + + InternalAggregationAdapter(InternalAggregations agg) { + this.agg = agg; + } + + @Override + public Iterable getBuckets(String aggName) { + MultiBucketsAggregation multiBucketsAggregation = agg.get(aggName); + return () -> { + Iterator it = multiBucketsAggregation.getBuckets().iterator(); + return new Iterator<>() { + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Bucket next() { + return new BucketAdapter(it.next()); + } + }; + }; + } + } + + static class BucketAdapter implements Bucket { + private final MultiBucketsAggregation.Bucket bucket; + + BucketAdapter(MultiBucketsAggregation.Bucket bucket) { + this.bucket = bucket; + } + + @Override + public String getKey() { + return bucket.getKeyAsString(); + } + + @Override + public long getCount() { + return bucket.getDocCount(); + } + + @Override + public Agg getAggregations() { + return new InternalAggregationAdapter(bucket.getAggregations()); + } + } +} diff --git a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/TransportGetStackTracesAction.java b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/TransportGetStackTracesAction.java index 0d7888290f36..6efab6e99da6 100644 --- a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/TransportGetStackTracesAction.java +++ b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/TransportGetStackTracesAction.java @@ -30,7 +30,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.countedterms.CountedTermsAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.sampler.random.RandomSamplerAggregationBuilder; @@ -55,7 +54,6 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -256,18 +254,11 @@ private void searchGenericEventGroupedByStackTrace( CountedTermsAggregationBuilder groupByStackTraceId = new CountedTermsAggregationBuilder("group_by").size( MAX_TRACE_EVENTS_RESULT_SIZE ).field(request.getStackTraceIdsField()); - if (request.hasAggregationFields()) { - String[] aggregationFields = request.getAggregationFields(); - // cast to Object to disambiguate this from a varargs call - log.trace("Grouping stacktrace events by {}.", (Object) aggregationFields); - AbstractAggregationBuilder parentAgg = groupByStackTraceId; - for (String aggregationField : aggregationFields) { - String aggName = CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationField; - TermsAggregationBuilder agg = new TermsAggregationBuilder(aggName).field(aggregationField); - parentAgg.subAggregation(agg); - parentAgg = agg; - } - } + SubGroupCollector subGroups = SubGroupCollector.attach( + groupByStackTraceId, + request.getAggregationFields(), + request.isLegacyAggregationField() + ); RandomSamplerAggregationBuilder randomSampler = new RandomSamplerAggregationBuilder("sample").setSeed(request.hashCode()) .setProbability(responseBuilder.getSamplingRate()) .subAggregation(groupByStackTraceId); @@ -308,19 +299,7 @@ private void searchGenericEventGroupedByStackTrace( stackTraceEvents.put(stackTraceID, event); } event.count += count; - if (request.hasAggregationFields()) { - String[] aggregationFields = request.getAggregationFields(); - if (event.subGroups == null) { - event.subGroups = SubGroup.root(aggregationFields[0], request.isLegacyAggregationField()); - } - // TODO: Recursively add sub groups! - Terms eventSubGroup = stacktraceBucket.getAggregations() - .get(CUSTOM_EVENT_SUB_AGGREGATION_NAME + aggregationFields[0]); - for (Terms.Bucket b : eventSubGroup.getBuckets()) { - String subGroupName = b.getKeyAsString(); - event.subGroups.addCount(subGroupName, b.getDocCount()); - } - } + subGroups.collectResults(stacktraceBucket, event); } responseBuilder.setTotalSamples(totalSamples); responseBuilder.setHostEventCounts(hostEventCounts); @@ -346,19 +325,11 @@ private void searchEventGroupedByStackTrace( // Especially with high cardinality fields, this makes aggregations really slow. .executionHint("map") .subAggregation(new SumAggregationBuilder("count").field("Stacktrace.count")); - if (request.hasAggregationFields()) { - String[] aggregationFields = request.getAggregationFields(); - log.trace("Grouping stacktrace events by [{}].", (Object) aggregationFields); - // be strict about the accepted field names to avoid downstream errors or leaking unintended information - if (aggregationFields.length > 1 || aggregationFields[0].equals("service.name") == false) { - throw new IllegalArgumentException( - "Requested custom event aggregation fields " - + Arrays.toString(aggregationFields) - + " but only [service.name] is supported." - ); - } - groupByStackTraceId.subAggregation(new TermsAggregationBuilder(CUSTOM_EVENT_SUB_AGGREGATION_NAME).field(aggregationFields[0])); - } + SubGroupCollector subGroups = SubGroupCollector.attach( + groupByStackTraceId, + request.getAggregationFields(), + request.isLegacyAggregationField() + ); client.prepareSearch(eventsIndex.getName()) .setTrackTotalHits(false) .setSize(0) @@ -420,18 +391,7 @@ The same stacktraces may come from different hosts (eventually from different da stackTraceEvents.put(stackTraceID, event); } event.count += finalCount; - if (request.hasAggregationFields()) { - String[] aggregationFields = request.getAggregationFields(); - if (event.subGroups == null) { - event.subGroups = SubGroup.root(aggregationFields[0], request.isLegacyAggregationField()); - } - // recursion is not needed; we only support a single field - Terms eventSubGroup = stacktraceBucket.getAggregations().get(CUSTOM_EVENT_SUB_AGGREGATION_NAME); - for (Terms.Bucket b : eventSubGroup.getBuckets()) { - String subGroupName = b.getKeyAsString(); - event.subGroups.addCount(subGroupName, b.getDocCount()); - } - } + subGroups.collectResults(stacktraceBucket, event); } } responseBuilder.setTotalSamples(totalFinalCount); diff --git a/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/action/SubGroupCollectorTests.java b/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/action/SubGroupCollectorTests.java new file mode 100644 index 000000000000..5d6022f32276 --- /dev/null +++ b/x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/action/SubGroupCollectorTests.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.profiling.action; + +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.profiling.action.SubGroupCollector.CUSTOM_EVENT_SUB_AGGREGATION_NAME; + +public class SubGroupCollectorTests extends ESTestCase { + public void testNoAggs() { + TermsAggregationBuilder stackTraces = new TermsAggregationBuilder("stacktraces").field("stacktrace.id"); + TraceEvent traceEvent = new TraceEvent("1"); + + SubGroupCollector collector = SubGroupCollector.attach(stackTraces, new String[0], false); + assertTrue("Sub aggregations attached", stackTraces.getSubAggregations().isEmpty()); + + SubGroupCollector.Bucket currentStackTrace = bucket("1", 5); + collector.collectResults(currentStackTrace, traceEvent); + + assertNull(traceEvent.subGroups); + } + + public void testMultipleAggsInSingleStackTrace() { + TermsAggregationBuilder stackTraces = new TermsAggregationBuilder("stacktraces").field("stacktrace.id"); + TraceEvent traceEvent = new TraceEvent("1"); + + SubGroupCollector collector = SubGroupCollector.attach(stackTraces, new String[] { "service.name", "transaction.name" }, false); + assertFalse("No sub aggregations attached", stackTraces.getSubAggregations().isEmpty()); + + StaticAgg services = new StaticAgg(); + SubGroupCollector.Bucket currentStackTrace = bucket("1", 5, services); + // tag::noformat + services.addBuckets(CUSTOM_EVENT_SUB_AGGREGATION_NAME + "service.name", + bucket("basket", 7L, + agg(CUSTOM_EVENT_SUB_AGGREGATION_NAME + "transaction.name", + bucket("add-to-basket", 4L), + bucket("delete-from-basket", 3L) + ) + ), + bucket("checkout", 4L, + agg(CUSTOM_EVENT_SUB_AGGREGATION_NAME + "transaction.name", + bucket("enter-address", 4L), + bucket("submit-order", 3L) + ) + ) + ); + // end::noformat + + collector.collectResults(currentStackTrace, traceEvent); + + assertNotNull(traceEvent.subGroups); + assertEquals(Long.valueOf(7L), traceEvent.subGroups.getCount("basket")); + assertEquals(Long.valueOf(4L), traceEvent.subGroups.getCount("checkout")); + SubGroup basketTransactionNames = traceEvent.subGroups.getSubGroup("basket").getSubGroup("transaction.name"); + assertEquals(Long.valueOf(4L), basketTransactionNames.getCount("add-to-basket")); + assertEquals(Long.valueOf(3L), basketTransactionNames.getCount("delete-from-basket")); + SubGroup checkoutTransactionNames = traceEvent.subGroups.getSubGroup("checkout").getSubGroup("transaction.name"); + assertEquals(Long.valueOf(4L), checkoutTransactionNames.getCount("enter-address")); + assertEquals(Long.valueOf(3L), checkoutTransactionNames.getCount("submit-order")); + } + + public void testSingleAggInMultipleStackTraces() { + TermsAggregationBuilder stackTraces = new TermsAggregationBuilder("stacktraces").field("stacktrace.id"); + TraceEvent traceEvent = new TraceEvent("1"); + + SubGroupCollector collector = SubGroupCollector.attach(stackTraces, new String[] { "service.name" }, false); + assertFalse("No sub aggregations attached", stackTraces.getSubAggregations().isEmpty()); + + StaticAgg services1 = new StaticAgg(); + SubGroupCollector.Bucket currentStackTrace1 = bucket("1", 5, services1); + services1.addBuckets(CUSTOM_EVENT_SUB_AGGREGATION_NAME + "service.name", bucket("basket", 7L)); + + collector.collectResults(currentStackTrace1, traceEvent); + + StaticAgg services2 = new StaticAgg(); + SubGroupCollector.Bucket currentStackTrace2 = bucket("1", 3, services2); + services2.addBuckets(CUSTOM_EVENT_SUB_AGGREGATION_NAME + "service.name", bucket("basket", 1L), bucket("checkout", 5L)); + + collector.collectResults(currentStackTrace2, traceEvent); + + assertNotNull(traceEvent.subGroups); + assertEquals(Long.valueOf(8L), traceEvent.subGroups.getCount("basket")); + assertEquals(Long.valueOf(5L), traceEvent.subGroups.getCount("checkout")); + } + + private SubGroupCollector.Bucket bucket(String key, long count) { + return bucket(key, count, null); + } + + private SubGroupCollector.Bucket bucket(String key, long count, SubGroupCollector.Agg aggregations) { + return new StaticBucket(key, count, aggregations); + } + + private SubGroupCollector.Agg agg(String name, SubGroupCollector.Bucket... buckets) { + StaticAgg a = new StaticAgg(); + a.addBuckets(name, buckets); + return a; + } + + private static class StaticBucket implements SubGroupCollector.Bucket { + private final String key; + private final long count; + private SubGroupCollector.Agg aggregations; + + private StaticBucket(String key, long count, SubGroupCollector.Agg aggregations) { + this.key = key; + this.count = count; + this.aggregations = aggregations; + } + + @Override + public String getKey() { + return key; + } + + @Override + public long getCount() { + return count; + } + + @Override + public SubGroupCollector.Agg getAggregations() { + return aggregations; + } + } + + private static class StaticAgg implements SubGroupCollector.Agg { + private final Map> buckets = new HashMap<>(); + + public void addBuckets(String name, SubGroupCollector.Bucket... buckets) { + this.buckets.put(name, List.of(buckets)); + } + + @Override + public Iterable getBuckets(String aggName) { + return buckets.get(aggName); + } + } +}