From ee875a2c7caa62163f5270a77a25924dc50608a7 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Wed, 2 Aug 2023 23:21:30 -0700 Subject: [PATCH] Add support for CollectorManager wrapping with profiling with concurrent aggregation Signed-off-by: Ticheng Lin --- .../AggregationCollectorManager.java | 17 +++----- .../ConcurrentAggregationProcessor.java | 10 ++--- .../DefaultAggregationProcessor.java | 11 ++++- .../search/query/QueryCollectorContext.java | 40 ++++++++++++++----- 4 files changed, 51 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 1f60ff6503ca8..b30cef40faa87 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -12,7 +12,6 @@ import org.apache.lucene.search.CollectorManager; import org.opensearch.common.CheckedFunction; import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.profile.query.InternalProfileCollector; import org.opensearch.search.query.ReduceableSearchResult; import java.io.IOException; @@ -42,12 +41,16 @@ class AggregationCollectorManager implements CollectorManager collectors) throws IOException { final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); @@ -77,17 +80,9 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO } } - static Collector createCollector(SearchContext context, List collectors, String reason) throws IOException { + static Collector createCollector(List collectors) throws IOException { Collector collector = MultiBucketCollector.wrap(collectors); ((BucketCollector) collector).preCollection(); - if (context.getProfilers() != null) { - collector = new InternalProfileCollector( - collector, - reason, - // TODO: report on child aggs as well - Collections.emptyList() - ); - } return collector; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java index 336ad8739eb41..ed36b030a6785 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/ConcurrentAggregationProcessor.java @@ -65,12 +65,12 @@ public void postProcess(SearchContext context) { try { if (globalCollectorManager != null) { Query query = context.buildFilteredQuery(Queries.newMatchAllQuery()); - globalCollectorManager = new InternalProfileCollectorManager( - globalCollectorManager, - CollectorResult.REASON_AGGREGATION_GLOBAL, - Collections.emptyList() - ); if (context.getProfilers() != null) { + globalCollectorManager = new InternalProfileCollectorManager( + globalCollectorManager, + CollectorResult.REASON_AGGREGATION_GLOBAL, + Collections.emptyList() + ); context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager); } final ReduceableSearchResult result = context.searcher().search(query, globalCollectorManager); diff --git a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java index 24b05ebcf3a61..c674f4b9b673a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/DefaultAggregationProcessor.java @@ -11,10 +11,11 @@ import org.apache.lucene.search.Query; import org.opensearch.common.lucene.search.Queries; import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.profile.query.InternalProfileComponent; +import org.opensearch.search.profile.query.InternalProfileCollector; import org.opensearch.search.query.QueryPhaseExecutionException; import java.io.IOException; +import java.util.Collections; import java.util.List; /** @@ -74,7 +75,13 @@ public void postProcess(SearchContext context) { if (context.getProfilers() != null) { context.getProfilers() .addQueryProfiler() - .setCollector((InternalProfileComponent) globalCollectorManager.newCollector()); + .setCollector( + new InternalProfileCollector( + globalCollectorManager.newCollector(), + globalCollectorManager.getCollectorReason(), + Collections.emptyList() + ) + ); } context.searcher().search(query, globalCollectorManager.newCollector()); globalCollectorManager.reduce(List.of()).reduce(context.queryResult()); diff --git a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java index c611587e879d6..cb01681a02e1a 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java +++ b/server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java @@ -42,6 +42,7 @@ import org.apache.lucene.search.Weight; import org.opensearch.common.lucene.MinimumScoreCollector; import org.opensearch.common.lucene.search.FilteredCollector; +import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.InternalProfileCollector; import org.opensearch.search.profile.query.InternalProfileCollectorManager; @@ -99,11 +100,15 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i */ protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) throws IOException { final CollectorManager manager = createManager(in); - return new InternalProfileCollectorManager( - manager, - profilerName, - in != null ? Collections.singletonList(in) : Collections.emptyList() - ); + if (manager instanceof InternalProfileCollectorManager) { + return (InternalProfileCollectorManager) manager; + } else { + return new InternalProfileCollectorManager( + manager, + profilerName, + in != null ? Collections.singletonList(in) : Collections.emptyList() + ); + } } /** @@ -198,16 +203,33 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i for (CollectorManager manager : subs) { final Collector collector = manager.newCollector(); - if (!(collector instanceof InternalProfileCollector)) { - throw new IllegalArgumentException("non-profiling collector"); - } - subCollectors.add((InternalProfileCollector) collector); + subCollectors.add(new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, Collections.emptyList())); } final Collector collector = MultiCollector.wrap(subCollectors); return new InternalProfileCollector(collector, REASON_SEARCH_MULTI, subCollectors); } + @Override + protected InternalProfileCollectorManager createWithProfiler(InternalProfileCollectorManager in) { + final List> managers = new ArrayList<>(); + final List children = new ArrayList<>(); + managers.add(in); + children.add(in); + for (CollectorManager manager : subs) { + InternalProfileCollectorManager subCollectorManager = new InternalProfileCollectorManager( + manager, + CollectorResult.REASON_AGGREGATION, + Collections.emptyList() + ); + managers.add(subCollectorManager); + children.add(subCollectorManager); + } + CollectorManager multiCollectorManager = QueryCollectorManagerContext + .createMultiCollectorManager(managers); + return new InternalProfileCollectorManager(multiCollectorManager, REASON_SEARCH_MULTI, children); + } + @Override CollectorManager createManager( CollectorManager in