From 4e2857232be2371790d20c4bfe91db828c982972 Mon Sep 17 00:00:00 2001 From: Ticheng Lin <51488860+ticheng-aws@users.noreply.github.com> Date: Tue, 15 Aug 2023 12:58:21 -0700 Subject: [PATCH] Add support for wrapping CollectorManager with profiling during concurrent execution (#9129) * Add support for wrapping CollectorManager with profiling during concurrent execution (#9129) Signed-off-by: Ticheng Lin * Add more collectorResult test and work on the PR comments (#9129) Signed-off-by: Ticheng Lin --------- Signed-off-by: Ticheng Lin Signed-off-by: Kiran Reddy --- CHANGELOG.md | 3 +- .../aggregation/AggregationProfilerIT.java | 130 +++++++++++++++++- .../AggregationCollectorManager.java | 24 ++-- .../ConcurrentAggregationProcessor.java | 11 +- .../DefaultAggregationProcessor.java | 11 +- .../GlobalAggCollectorManager.java | 7 + ...ggCollectorManagerWithSingleCollector.java | 7 + .../NonGlobalAggCollectorManager.java | 7 + ...ggCollectorManagerWithSingleCollector.java | 7 + .../InternalProfileCollectorManager.java | 24 +++- .../search/query/QueryCollectorContext.java | 39 +++++- .../AggregationCollectorManagerTests.java | 13 +- .../AggregationProcessorTests.java | 31 ++++- 13 files changed, 280 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f231d1db8036b..38ade6bf8ee0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -136,6 +136,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085)) - Add attributes to startSpan methods ([#9199](https://github.com/opensearch-project/OpenSearch/pull/9199)) - Add base class for parameterizing the search based tests #9083 ([#9083](https://github.com/opensearch-project/OpenSearch/pull/9083)) +- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) ### Deprecated @@ -147,4 +148,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java index 9d0c30c5a488f..456aa7a2aa673 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java @@ -44,6 +44,7 @@ import org.opensearch.search.aggregations.metrics.Stats; import org.opensearch.search.profile.ProfileResult; import org.opensearch.search.profile.ProfileShardResult; +import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.QueryProfileShardResult; import org.opensearch.test.OpenSearchIntegTestCase; @@ -67,8 +68,11 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.containsString; @OpenSearchIntegTestCase.SuiteScopeTestCase public class AggregationProfilerIT extends OpenSearchIntegTestCase { @@ -150,6 +154,8 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase { private static final String TAG_FIELD = "tag"; private static final String STRING_FIELD = "string_field"; private final int numDocs = 5; + private static final String REASON_SEARCH_TOP_HITS = "search_top_hits"; + private static final String REASON_AGGREGATION = "aggregation"; @Override protected int numberOfShards() { @@ -217,8 +223,14 @@ public void testSimpleProfile() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); assertThat(breakdown.get(COLLECT), greaterThan(0L)); @@ -265,8 +277,14 @@ public void testMultiLevelProfile() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(COLLECT), greaterThan(0L)); @@ -366,8 +384,14 @@ public void testMultiLevelProfileBreadthFirst() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(COLLECT), greaterThan(0L)); @@ -452,8 +476,14 @@ public void testDiversifiedAggProfile() { if (diversifyAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); @@ -532,8 +562,14 @@ public void testComplexProfile() { if (histoAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } } else { assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } } assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L)); assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L)); @@ -792,7 +828,6 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { .get(); assertSearchResponse(response); - Global global = response.getAggregations().get("global"); assertThat(global, IsNull.notNullValue()); assertThat(global.getName(), equalTo("global")); @@ -843,8 +878,14 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { if (globalAggResult.getMaxSliceTime() != null) { // concurrent segment search enabled assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet()); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 0); + } } else { assertEquals(BREAKDOWN_KEYS, breakdown.keySet()); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 0); + } } assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); assertThat(breakdown.get(COLLECT), greaterThan(0L)); @@ -852,4 +893,91 @@ public void testGlobalAggWithStatsSubAggregatorProfile() { assertEquals(0, breakdown.get(REDUCE).intValue()); } } + + public void testMultipleAggregationsProfile() { + SearchResponse response = client().prepareSearch("idx") + .setProfile(true) + .addAggregation(histogram("histo_1").field(NUMBER_FIELD).interval(1L)) + .addAggregation(histogram("histo_2").field(NUMBER_FIELD).interval(1L)) + .get(); + assertSearchResponse(response); + Map profileResults = response.getProfileResults(); + assertThat(profileResults, notNullValue()); + assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries)); + for (ProfileShardResult profileShardResult : profileResults.values()) { + assertThat(profileShardResult, notNullValue()); + List queryProfilerResults = profileShardResult.getQueryProfileResults(); + assertThat(queryProfilerResults, notNullValue()); + for (QueryProfileShardResult queryProfilerResult : queryProfilerResults) { + CollectorResult collectorResult = queryProfilerResult.getCollectorResult(); + String reason = collectorResult.getReason(); + assertThat(reason, equalTo("search_multi")); + List children = collectorResult.getProfiledChildren(); + assertThat(children.size(), equalTo(2)); + assertThat(children.get(1).getName(), containsString("[histo_1, histo_2]")); + } + AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults(); + assertThat(aggProfileResults, notNullValue()); + List aggProfileResultsList = aggProfileResults.getProfileResults(); + assertThat(aggProfileResultsList, notNullValue()); + assertThat(aggProfileResultsList.size(), equalTo(2)); + for (ProfileResult histoAggResult : aggProfileResultsList) { + assertThat(histoAggResult, notNullValue()); + assertThat(histoAggResult.getQueryName(), equalTo("NumericHistogramAggregator")); + assertThat(histoAggResult.getLuceneDescription(), containsString("histo_")); + assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0)); + assertThat(histoAggResult.getTime(), greaterThan(0L)); + Map breakdown = histoAggResult.getTimeBreakdown(); + assertThat(breakdown, notNullValue()); + if (histoAggResult.getMaxSliceTime() != null) { + // concurrent segment search enabled + assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2); + } + } else { + assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS)); + for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) { + assertCollectorResult(collectorResult, 2); + } + } + assertThat(breakdown.get(INITIALIZE), greaterThan(0L)); + assertThat(breakdown.get(COLLECT), greaterThan(0L)); + assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L)); + assertThat(breakdown.get(REDUCE), equalTo(0L)); + Map debug = histoAggResult.getDebugInfo(); + assertThat(debug, notNullValue()); + assertThat(debug.keySet(), equalTo(Set.of(TOTAL_BUCKETS))); + assertThat(((Number) debug.get(TOTAL_BUCKETS)).longValue(), greaterThan(0L)); + } + } + } + + private void assertCollectorResult(QueryProfileShardResult collectorResult, int expectedChildrenCount) { + long nodeTime = collectorResult.getCollectorResult().getTime(); + assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), equalTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getMinSliceTime(), equalTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), equalTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getReduceTime(), equalTo(0L)); + assertThat(collectorResult.getCollectorResult().getSliceCount(), equalTo(1)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount)); + if (expectedChildrenCount == 2) { + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION)); + } + } + + private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardResult collectorResult, int expectedChildrenCount) { + long nodeTime = collectorResult.getCollectorResult().getTime(); + assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), lessThanOrEqualTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getMinSliceTime(), lessThanOrEqualTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), lessThanOrEqualTo(nodeTime)); + assertThat(collectorResult.getCollectorResult().getReduceTime(), greaterThan(0L)); + assertThat(collectorResult.getCollectorResult().getSliceCount(), greaterThanOrEqualTo(1)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount)); + if (expectedChildrenCount == 2) { + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS)); + assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION)); + } + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index ae06f80516e3e..0bb2d1d7ca933 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -12,20 +12,20 @@ import org.apache.lucene.search.CollectorManager; import org.opensearch.common.CheckedFunction; import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.profile.query.InternalProfileCollector; import org.opensearch.search.query.ReduceableSearchResult; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; /** * Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global * aggregation operators + * + * @opensearch.internal */ -class AggregationCollectorManager implements CollectorManager { +public abstract class AggregationCollectorManager implements CollectorManager { protected final SearchContext context; private final CheckedFunction, IOException> aggProvider; private final String collectorReason; @@ -42,12 +42,18 @@ class AggregationCollectorManager implements CollectorManager collectors) throws IOException { final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); @@ -70,17 +76,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre return new AggregationReduceableSearchResult(internalAggregations); } - static Collector createCollector(SearchContext context, List collectors, String reason) throws IOException { + static Collector createCollector(List collectors) throws IOException { Collector collector = MultiBucketCollector.wrap(collectors); ((BucketCollector) collector).preCollection(); - if (context.getProfilers() != null) { - collector = new InternalProfileCollector( - collector, - reason, - // TODO: report on child aggs as well - Collections.emptyList() - ); - } return collector; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java index 336ad8739eb41..fbeb583984ac5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java @@ -13,7 +13,6 @@ import org.apache.lucene.search.Query; import org.opensearch.common.lucene.search.Queries; import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.InternalProfileCollectorManager; import org.opensearch.search.profile.query.InternalProfileComponent; import org.opensearch.search.query.QueryPhaseExecutionException; @@ -65,12 +64,12 @@ public void postProcess(SearchContext context) { try { if (globalCollectorManager != null) { Query query = context.buildFilteredQuery(Queries.newMatchAllQuery()); - globalCollectorManager = new InternalProfileCollectorManager( - globalCollectorManager, - CollectorResult.REASON_AGGREGATION_GLOBAL, - Collections.emptyList() - ); if (context.getProfilers() != null) { + globalCollectorManager = new InternalProfileCollectorManager( + globalCollectorManager, + ((AggregationCollectorManager) globalCollectorManager).getCollectorReason(), + Collections.emptyList() + ); context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager); } final ReduceableSearchResult result = context.searcher().search(query, globalCollectorManager); diff --git a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java index 24b05ebcf3a61..c674f4b9b673a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java @@ -11,10 +11,11 @@ import org.apache.lucene.search.Query; import org.opensearch.common.lucene.search.Queries; import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.profile.query.InternalProfileComponent; +import org.opensearch.search.profile.query.InternalProfileCollector; import org.opensearch.search.query.QueryPhaseExecutionException; import java.io.IOException; +import java.util.Collections; import java.util.List; /** @@ -74,7 +75,13 @@ public void postProcess(SearchContext context) { if (context.getProfilers() != null) { context.getProfilers() .addQueryProfiler() - .setCollector((InternalProfileComponent) globalCollectorManager.newCollector()); + .setCollector( + new InternalProfileCollector( + globalCollectorManager.newCollector(), + globalCollectorManager.getCollectorReason(), + Collections.emptyList() + ) + ); } context.searcher().search(query, globalCollectorManager.newCollector()); globalCollectorManager.reduce(List.of()).reduce(context.queryResult()); diff --git a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java index 41e8aba895480..8814cc3c435e1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java @@ -23,10 +23,12 @@ public class GlobalAggCollectorManager extends AggregationCollectorManager { private Collector collector; + private final String collectorName; public GlobalAggCollectorManager(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -48,4 +50,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) ); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java index f126f27c68855..973749c0d5189 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java @@ -26,10 +26,12 @@ public class GlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager { private final Collector collector; + private final String collectorName; public GlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -42,4 +44,9 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO assert collectors.isEmpty() : "Reduce on GlobalAggregationCollectorManagerWithCollector called with non-empty collectors"; return super.reduce(List.of(collector)); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java index 984eefb9b52a4..8b0a1530b5505 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java @@ -23,10 +23,12 @@ public class NonGlobalAggCollectorManager extends AggregationCollectorManager { private Collector collector; + private final String collectorName; public NonGlobalAggCollectorManager(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -48,4 +50,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard()) ); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java index 433f6b6a05b22..a6eb00f2d70f7 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java @@ -26,10 +26,12 @@ public class NonGlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager { private final Collector collector; + private final String collectorName; public NonGlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException { super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION); collector = Objects.requireNonNull(super.newCollector(), "collector instance is null"); + collectorName = collector.toString(); } @Override @@ -42,4 +44,9 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO assert collectors.isEmpty() : "Reduce on NonGlobalAggregationCollectorManagerWithCollector called with non-empty collectors"; return super.reduce(List.of(collector)); } + + @Override + public String getCollectorName() { + return collectorName; + } } diff --git a/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java b/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java index 074738d2491ec..4156e442c9254 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/profile/query/InternalProfileCollectorManager.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.CollectorManager; +import org.opensearch.search.aggregations.AggregationCollectorManager; import org.opensearch.search.query.EarlyTerminatingListener; import org.opensearch.search.query.ReduceableSearchResult; @@ -38,6 +39,7 @@ public class InternalProfileCollectorManager private long minSliceTime = Long.MAX_VALUE; private long avgSliceTime = 0; private int sliceCount = 0; + private String collectorManagerName; public InternalProfileCollectorManager( CollectorManager manager, @@ -46,9 +48,29 @@ public InternalProfileCollectorManager( ) { this.manager = manager; this.reason = reason; + this.collectorManagerName = deriveCollectorManagerName(manager); this.children = children; } + /** + * Creates a human-friendly representation of the CollectorManager name. + * + * @param manager The CollectorManager to derive a name from + * @return A (hopefully) prettier name + */ + private String deriveCollectorManagerName(CollectorManager manager) { + String name = manager.getClass().getSimpleName(); + if (name.equals("")) { + name = manager.getClass().getEnclosingClass().getSimpleName(); + } + + // Include the user-defined agg name + if (manager instanceof AggregationCollectorManager) { + name += ": [" + ((AggregationCollectorManager) manager).getCollectorName() + "]"; + } + return name; + } + @Override public InternalProfileCollector newCollector() throws IOException { return new InternalProfileCollector(manager.newCollector(), reason, children); @@ -117,7 +139,7 @@ public Collection children() { @Override public String getName() { - return manager.getClass().getSimpleName(); + return collectorManagerName; } @Override diff --git a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java index c611587e879d6..10c070ea7dceb 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java +++ b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java @@ -42,6 +42,9 @@ import org.apache.lucene.search.Weight; import org.opensearch.common.lucene.MinimumScoreCollector; import org.opensearch.common.lucene.search.FilteredCollector; +import org.opensearch.search.aggregations.AggregationCollectorManager; +import org.opensearch.search.aggregations.BucketCollector; +import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.InternalProfileCollector; import org.opensearch.search.profile.query.InternalProfileCollectorManager; @@ -198,16 +201,46 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i for (CollectorManager manager : subs) { final Collector collector = manager.newCollector(); - if (!(collector instanceof InternalProfileCollector)) { - throw new IllegalArgumentException("non-profiling collector"); + if (collector instanceof BucketCollector) { + subCollectors.add( + new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, Collections.emptyList()) + ); + } else { + subCollectors.add( + new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_MULTI, Collections.emptyList()) + ); } - subCollectors.add((InternalProfileCollector) collector); } final Collector collector = MultiCollector.wrap(subCollectors); return new InternalProfileCollector(collector, REASON_SEARCH_MULTI, subCollectors); } + @Override + protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) { + final List> managers = new ArrayList<>(); + final List children = new ArrayList<>(); + managers.add(in); + children.add(in); + for (CollectorManager manager : subs) { + final InternalProfileCollectorManager subCollectorManager; + if (manager instanceof AggregationCollectorManager) { + subCollectorManager = new InternalProfileCollectorManager( + manager, + ((AggregationCollectorManager) manager).getCollectorReason(), + Collections.emptyList() + ); + } else { + subCollectorManager = new InternalProfileCollectorManager(manager, REASON_SEARCH_MULTI, Collections.emptyList()); + } + managers.add(subCollectorManager); + children.add(subCollectorManager); + } + CollectorManager multiCollectorManager = QueryCollectorManagerContext + .createMultiCollectorManager(managers); + return new InternalProfileCollectorManager(multiCollectorManager, REASON_SEARCH_MULTI, children); + } + @Override CollectorManager createManager( CollectorManager in diff --git a/server/src/test/java/org/opensearch/search/aggregations/AggregationCollectorManagerTests.java b/server/src/test/java/org/opensearch/search/aggregations/AggregationCollectorManagerTests.java index 7fcf2216040c9..47ce0f120334b 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/AggregationCollectorManagerTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/AggregationCollectorManagerTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.Collector; import org.opensearch.search.aggregations.bucket.global.GlobalAggregator; +import org.opensearch.search.profile.query.CollectorResult; import java.util.ArrayList; import java.util.List; @@ -31,9 +32,12 @@ public void testNonGlobalCollectorManagers() throws Exception { assertTrue(aggCollector instanceof MultiBucketCollector); assertEquals(expectedAggCount, ((MultiBucketCollector) aggCollector).getCollectors().length); testCollectorManagerCommon(testAggCollectorManager); + assertEquals(CollectorResult.REASON_AGGREGATION, testAggCollectorManager.getCollectorReason()); // test NonGlobalCollectorManager which will be used in concurrent segment search case - testCollectorManagerCommon(new NonGlobalAggCollectorManager(context)); + final NonGlobalAggCollectorManager testNonGlobalAggCollectorManager = new NonGlobalAggCollectorManager(context); + testCollectorManagerCommon(testNonGlobalAggCollectorManager); + assertEquals(CollectorResult.REASON_AGGREGATION, testAggCollectorManager.getCollectorReason()); } public void testGlobalCollectorManagers() throws Exception { @@ -45,11 +49,14 @@ public void testGlobalCollectorManagers() throws Exception { context.aggregations(contextAggregations); final AggregationCollectorManager testAggCollectorManager = new GlobalAggCollectorManagerWithSingleCollector(context); testCollectorManagerCommon(testAggCollectorManager); + assertEquals(CollectorResult.REASON_AGGREGATION_GLOBAL, testAggCollectorManager.getCollectorReason()); Collector aggCollector = testAggCollectorManager.newCollector(); assertTrue(aggCollector instanceof BucketCollector); // test GlobalAggCollectorManager which will be used in concurrent segment search case - testCollectorManagerCommon(new GlobalAggCollectorManager(context)); + final GlobalAggCollectorManager testGlobalAggCollectorManager = new GlobalAggCollectorManager(context); + testCollectorManagerCommon(testGlobalAggCollectorManager); + assertEquals(CollectorResult.REASON_AGGREGATION_GLOBAL, testAggCollectorManager.getCollectorReason()); } public void testAggCollectorManagersWithBothGlobalNonGlobalAggregators() throws Exception { @@ -70,7 +77,9 @@ public void testAggCollectorManagersWithBothGlobalNonGlobalAggregators() throws assertTrue(globalAggCollector instanceof GlobalAggregator); testCollectorManagerCommon(testAggCollectorManager); + assertEquals(CollectorResult.REASON_AGGREGATION, testAggCollectorManager.getCollectorReason()); testCollectorManagerCommon(testGlobalAggCollectorManager); + assertEquals(CollectorResult.REASON_AGGREGATION_GLOBAL, testGlobalAggCollectorManager.getCollectorReason()); } public void testAssertionWhenCollectorManagerCreatesNoOPCollector() throws Exception { diff --git a/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java b/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java index cff83b36ce884..522d69b22c38a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/AggregationProcessorTests.java @@ -15,6 +15,7 @@ import org.mockito.ArgumentMatchers; import org.opensearch.search.aggregations.bucket.global.GlobalAggregator; import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.query.ReduceableSearchResult; import org.opensearch.test.TestSearchContext; @@ -48,15 +49,19 @@ public void testPreProcessWithOnlyNonGlobalAggregators() throws Exception { } public void testPostProcessWithNonGlobalAggregatorsAndSingleSlice() throws Exception { - testPostProcessCommon(multipleNonGlobalAggs, 1, 0, 2); + testPostProcessCommon(multipleNonGlobalAggs, 1, 0, 2, false); } public void testPostProcessWithNonGlobalAggregatorsAndMultipleSlices() throws Exception { - testPostProcessCommon(multipleNonGlobalAggs, randomIntBetween(2, 5), 0, 2); + testPostProcessCommon(multipleNonGlobalAggs, randomIntBetween(2, 5), 0, 2, false); } public void testPostProcessGlobalAndNonGlobalAggregators() throws Exception { - testPostProcessCommon(globalNonGlobalAggs, randomIntBetween(2, 5), 1, 1); + testPostProcessCommon(globalNonGlobalAggs, randomIntBetween(2, 5), 1, 1, false); + } + + public void testPostProcessGlobalAndNonGlobalAggregatorsWithProfilers() throws Exception { + testPostProcessCommon(globalNonGlobalAggs, randomIntBetween(2, 5), 1, 1, true); } private void testPreProcessCommon(String agg, int expectedGlobalAggs, int expectedNonGlobalAggs) throws Exception { @@ -127,8 +132,13 @@ private void testPreProcessCommon( } } - private void testPostProcessCommon(String aggs, int numSlices, int expectedGlobalAggs, int expectedNonGlobalAggsPerSlice) - throws Exception { + private void testPostProcessCommon( + String aggs, + int numSlices, + int expectedGlobalAggs, + int expectedNonGlobalAggsPerSlice, + boolean withProfilers + ) throws Exception { final Collection nonGlobalCollectors = new ArrayList<>(); final Collection globalCollectors = new ArrayList<>(); testPreProcessCommon(aggs, expectedGlobalAggs, expectedNonGlobalAggsPerSlice, nonGlobalCollectors, globalCollectors); @@ -157,11 +167,22 @@ private void testPostProcessCommon(String aggs, int numSlices, int expectedGloba .thenReturn(result); } assertTrue(context.queryResult().hasAggs()); + if (withProfilers) { + ((TestSearchContext) context).withProfilers(); + } testAggregationProcessor.postProcess(context); assertTrue(context.queryResult().hasAggs()); // for global aggs verify that search.search is called with CollectionManager if (expectedGlobalAggs > 0) { verify(testSearcher, times(1)).search(nullable(Query.class), ArgumentMatchers.>any()); + if (withProfilers) { + // First profiler is from withProfilers() call, second one is from postProcess() call + assertEquals(2, context.getProfilers().getQueryProfilers().size()); + assertEquals( + CollectorResult.REASON_AGGREGATION_GLOBAL, + context.getProfilers().getQueryProfilers().get(1).getCollector().getReason() + ); + } } // after shard level reduce it should have only 1 InternalAggregation instance for each agg in request and internal aggregation // will be equal to sum of expected global and nonglobal aggs