From d46ea3cf5438c9cf26e8d2d798f5d0f28fc3fa93 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 13 Apr 2021 10:28:37 -0400 Subject: [PATCH] Fix profiled global agg (backport of #71575) (#71634) This fixes the `global` aggregator when `profile` is enabled. It does so by removing all of the special case handling for `global` aggs in `AggregationPhase` and having the global aggregator itself perform the scoped collection using the same trick that we use in filter-by-filter mode of the `filters` aggregation. Closes #71098 --- .../AggConstructionContentionBenchmark.java | 5 ++ .../test/search.aggregation/380_global.yml | 64 +++++++++++++++ .../elasticsearch/search/SearchService.java | 3 +- .../search/aggregations/AggregationPhase.java | 77 ++++--------------- .../bucket/global/GlobalAggregator.java | 32 ++++++-- .../support/AggregationContext.java | 17 +++- .../bucket/DocCountProviderTests.java | 12 +-- .../bucket/GlobalAggregatorTests.java | 55 ++++++------- .../index/mapper/MapperServiceTestCase.java | 5 ++ .../aggregations/AggregatorTestCase.java | 3 +- 10 files changed, 166 insertions(+), 107 deletions(-) create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/380_global.yml diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java index 3621404784404..ae4c32da5d849 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java @@ -261,6 +261,11 @@ public Query buildQuery(QueryBuilder builder) throws IOException { throw new UnsupportedOperationException(); } + @Override + public Query filterQuery(Query query) { + throw new UnsupportedOperationException(); + } + @Override public IndexSettings getIndexSettings() { throw new UnsupportedOperationException(); diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/380_global.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/380_global.yml new file mode 100644 index 0000000000000..a7230f8f7e80f --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.aggregation/380_global.yml @@ -0,0 +1,64 @@ +setup: + - do: + bulk: + refresh: true + index: test + body: + - '{"index": {}}' + - '{"name": "one"}' + - '{"index": {}}' + - '{"name": "two"}' + - '{"index": {}}' + - '{"name": "two"}' + +--- +simple: + - do: + search: + index: test + body: + size: 0 + query: + match: + name: two + aggs: + g: + global: {} + aggs: + t: + terms: + field: name.keyword + + - match: { aggregations.g.doc_count: 3 } + - length: { aggregations.g.t.buckets: 2 } + - match: { aggregations.g.t.buckets.0.key: two } + - match: { aggregations.g.t.buckets.1.key: one } + +--- +profile: + - skip: + version: " - 7.99.99" + reason: fixed in 8.0.0 (to be backported to 7.13.0) + + - do: + search: + index: test + body: + profile: true + size: 0 + query: + match: + name: two + aggs: + g: + global: {} + aggs: + t: + terms: + field: name.keyword + + - match: { aggregations.g.doc_count: 3 } + - length: { aggregations.g.t.buckets: 2 } + - match: { aggregations.g.t.buckets.0.key: two } + - match: { aggregations.g.t.buckets.1.key: one } + - match: { profile.shards.0.aggregations.0.description: g } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ba84b60fa0576..87e109ed1cf09 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -983,7 +983,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.bitsetFilterCache(), context.indexShard().shardId().hashCode(), context::getRelativeTimeInMillis, - context::isCancelled + context::isCancelled, + context::buildFilteredQuery ); context.addReleasable(aggContext); try { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 521b6a80a46d3..cf00262fbe4dd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -8,18 +8,13 @@ package org.elasticsearch.search.aggregations; import org.apache.lucene.search.Collector; -import org.apache.lucene.search.Query; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lucene.search.Queries; -import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.profile.query.CollectorResult; import org.elasticsearch.search.profile.query.InternalProfileCollector; -import org.elasticsearch.search.query.QueryPhaseExecutionException; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; /** @@ -32,31 +27,24 @@ public AggregationPhase() { } public void preProcess(SearchContext context) { - if (context.aggregations() != null) { - List collectors = new ArrayList<>(); - Aggregator[] aggregators; - try { - aggregators = context.aggregations().factories().createTopLevelAggregators(); - for (int i = 0; i < aggregators.length; i++) { - if (aggregators[i] instanceof GlobalAggregator == false) { - collectors.add(aggregators[i]); - } - } - context.aggregations().aggregators(aggregators); - if (collectors.isEmpty() == false) { - Collector collector = MultiBucketCollector.wrap(true, collectors); - ((BucketCollector)collector).preCollection(); - if (context.getProfilers() != null) { - collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION, - // TODO: report on child aggs as well - Collections.emptyList()); - } - context.queryCollectors().put(AggregationPhase.class, collector); - } - } catch (IOException e) { - throw new AggregationInitializationException("Could not initialize aggregators", e); - } + if (context.aggregations() == null) { + return; + } + BucketCollector bucketCollector; + try { + context.aggregations().aggregators(context.aggregations().factories().createTopLevelAggregators()); + bucketCollector = MultiBucketCollector.wrap( + true, + org.elasticsearch.common.collect.List.of(context.aggregations().aggregators()) + ); + bucketCollector.preCollection(); + } catch (IOException e) { + throw new AggregationInitializationException("Could not initialize aggregators", e); } + Collector collector = context.getProfilers() == null + ? bucketCollector + : new InternalProfileCollector(bucketCollector, CollectorResult.REASON_AGGREGATION, org.elasticsearch.common.collect.List.of()); + context.queryCollectors().put(AggregationPhase.class, collector); } public void execute(SearchContext context) { @@ -71,37 +59,6 @@ public void execute(SearchContext context) { } Aggregator[] aggregators = context.aggregations().aggregators(); - List globals = new ArrayList<>(); - for (int i = 0; i < aggregators.length; i++) { - if (aggregators[i] instanceof GlobalAggregator) { - globals.add(aggregators[i]); - } - } - - // optimize the global collector based execution - if (globals.isEmpty() == false) { - BucketCollector globalsCollector = MultiBucketCollector.wrap(false, globals); - Query query = context.buildFilteredQuery(Queries.newMatchAllQuery()); - - try { - final Collector collector; - if (context.getProfilers() == null) { - collector = globalsCollector; - } else { - InternalProfileCollector profileCollector = new InternalProfileCollector( - globalsCollector, CollectorResult.REASON_AGGREGATION_GLOBAL, - // TODO: report on sub collectors - Collections.emptyList()); - collector = profileCollector; - // start a new profile with this collector - context.getProfilers().addQueryProfiler().setCollector(profileCollector); - } - globalsCollector.preCollection(); - context.searcher().search(query, collector); - } catch (Exception e) { - throw new QueryPhaseExecutionException(context.shardTarget(), "Failed to execute global aggregators", e); - } - } List aggregations = new ArrayList<>(aggregators.length); if (context.aggregations().factories().context() != null) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java index 185b405b273aa..3a171b10329b8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java @@ -8,11 +8,15 @@ package org.elasticsearch.search.aggregations.bucket.global; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.BulkScorer; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.Weight; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.CardinalityUpperBound; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.support.AggregationContext; @@ -21,22 +25,34 @@ import java.util.Map; public class GlobalAggregator extends BucketsAggregator implements SingleBucketAggregator { + private final Weight weight; public GlobalAggregator(String name, AggregatorFactories subFactories, AggregationContext context, Map metadata) throws IOException { + super(name, subFactories, context, null, CardinalityUpperBound.ONE, metadata); + weight = context.filterQuery(new MatchAllDocsQuery()).createWeight(context.searcher(), scoreMode(), 1.0f); } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - return new LeafBucketCollectorBase(sub, null) { + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + // Run sub-aggregations on child documents + BulkScorer scorer = weight.bulkScorer(ctx); + if (scorer == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + scorer.score(new LeafCollector() { + @Override + public void collect(int doc) throws IOException { + collectBucket(sub, doc, 0); + } + @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0 : "global aggregator can only be a top level aggregator"; - collectBucket(sub, doc, bucket); + public void setScorer(Scorable scorer) throws IOException { + sub.setScorer(scorer); } - }; + }, ctx.reader().getLiveDocs()); + return LeafBucketCollector.NO_OP_COLLECTOR; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java index b08c03ef682a3..cecfea93ed49f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java @@ -160,6 +160,13 @@ public final AggregationUsageService getUsageService() { */ public abstract Query buildQuery(QueryBuilder builder) throws IOException; + /** + * Add filters from slice or filtered aliases. If you make a new query + * and don't combine it with the {@link #query() top level query} then + * you must provide it to this method. + */ + public abstract Query filterQuery(Query query); + /** * The settings for the index against which this search is running. */ @@ -259,6 +266,7 @@ public static class ProductionAggregationContext extends AggregationContext { private final int randomSeed; private final LongSupplier relativeTimeInMillis; private final Supplier isCancelled; + private final Function filterQuery; private final List releaseMe = new ArrayList<>(); @@ -273,7 +281,8 @@ public ProductionAggregationContext( BitsetFilterCache bitsetFilterCache, int randomSeed, LongSupplier relativeTimeInMillis, - Supplier isCancelled + Supplier isCancelled, + Function filterQuery ) { this.context = context; if (bytesToPreallocate == 0) { @@ -303,6 +312,7 @@ public ProductionAggregationContext( this.randomSeed = randomSeed; this.relativeTimeInMillis = relativeTimeInMillis; this.isCancelled = isCancelled; + this.filterQuery = filterQuery; } @Override @@ -378,6 +388,11 @@ public Query buildQuery(QueryBuilder builder) throws IOException { return Rewriteable.rewrite(builder, context, true).toQuery(context); } + @Override + public Query filterQuery(Query query) { + return filterQuery.apply(query); + } + @Override public IndexSettings getIndexSettings() { return context.getIndexSettings(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/DocCountProviderTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/DocCountProviderTests.java index c3e641c9b757e..a7b6f8298026c 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/DocCountProviderTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/DocCountProviderTests.java @@ -18,9 +18,11 @@ import org.elasticsearch.index.mapper.DocCountFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorTestCase; -import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; +import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import java.io.IOException; import org.elasticsearch.common.collect.List; @@ -88,10 +90,10 @@ public void testQueryFiltering() throws IOException { private void testAggregation(Query query, CheckedConsumer indexer, - Consumer verify) throws IOException { - GlobalAggregationBuilder aggregationBuilder = new GlobalAggregationBuilder("_name"); + Consumer verify) throws IOException { + AggregationBuilder builder = new FilterAggregationBuilder("f", new MatchAllQueryBuilder()); MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NUMBER_FIELD, NumberFieldMapper.NumberType.LONG); MappedFieldType docCountFieldType = new DocCountFieldMapper.DocCountFieldType(); - testCase(aggregationBuilder, query, indexer, verify, fieldType, docCountFieldType); + testCase(builder, query, indexer, verify, fieldType, docCountFieldType); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/GlobalAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/GlobalAggregatorTests.java index b4eb8c4d66d1d..ef1e7b53deedf 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/GlobalAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/GlobalAggregatorTests.java @@ -8,19 +8,16 @@ package org.elasticsearch.search.aggregations.bucket; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.RandomIndexWriter; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.store.Directory; +import org.apache.lucene.search.Query; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator; import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal; import org.elasticsearch.search.aggregations.metrics.InternalMin; import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; @@ -28,13 +25,11 @@ import java.io.IOException; import java.util.function.BiConsumer; -import static java.util.Collections.singleton; - public class GlobalAggregatorTests extends AggregatorTestCase { public void testNoDocs() throws IOException { testCase(iw -> { // Intentionally not writing any docs - }, (global, min) -> { + }, new MatchAllDocsQuery(), (global, min) -> { assertEquals(0, global.getDocCount()); assertEquals(Double.POSITIVE_INFINITY, min.getValue(), 0); }); @@ -42,38 +37,36 @@ public void testNoDocs() throws IOException { public void testSomeDocs() throws IOException { testCase(iw -> { - iw.addDocument(singleton(new SortedNumericDocValuesField("number", 7))); - iw.addDocument(singleton(new SortedNumericDocValuesField("number", 1))); - }, (global, min) -> { + iw.addDocument(org.elasticsearch.common.collect.List.of(new SortedNumericDocValuesField("number", 7))); + iw.addDocument(org.elasticsearch.common.collect.List.of(new SortedNumericDocValuesField("number", 1))); + }, new MatchAllDocsQuery(), (global, min) -> { assertEquals(2, global.getDocCount()); assertEquals(1, min.getValue(), 0); }); } - // Note that `global`'s fancy support for ignoring the query comes from special code in AggregationPhase. We don't test that here. - - private void testCase(CheckedConsumer buildIndex, BiConsumer verify) - throws IOException { - Directory directory = newDirectory(); - RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); - buildIndex.accept(indexWriter); - indexWriter.close(); - - IndexReader indexReader = DirectoryReader.open(directory); - IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + public void testIgnoresQuery() throws IOException { + testCase(iw -> { + iw.addDocument(org.elasticsearch.common.collect.List.of(new SortedNumericDocValuesField("number", 7))); + iw.addDocument(org.elasticsearch.common.collect.List.of(new SortedNumericDocValuesField("number", 1))); + }, LongPoint.newRangeQuery("number", 2, Long.MAX_VALUE), (global, min) -> { + assertEquals(2, global.getDocCount()); + assertEquals(1, min.getValue(), 0); + }); + } + private void testCase( + CheckedConsumer buildIndex, + Query topLevelQuery, + BiConsumer verify + ) throws IOException { GlobalAggregationBuilder aggregationBuilder = new GlobalAggregationBuilder("_name"); aggregationBuilder.subAggregation(new MinAggregationBuilder("in_global").field("number")); MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType("number", NumberFieldMapper.NumberType.LONG); - GlobalAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType); - aggregator.preCollection(); - indexSearcher.search(new MatchAllDocsQuery(), aggregator); - aggregator.postCollection(); - InternalGlobal result = (InternalGlobal) aggregator.buildTopLevel(); - verify.accept(result, (InternalMin) result.getAggregations().asMap().get("in_global")); - - indexReader.close(); - directory.close(); + testCase(aggregationBuilder, topLevelQuery, buildIndex, (InternalGlobal result) -> { + InternalMin min = result.getAggregations().get("in_global"); + verify.accept(result, min); + }, fieldType); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java index 36151bdd265d2..a8c61a91eed9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java @@ -407,6 +407,11 @@ public Query buildQuery(QueryBuilder builder) throws IOException { throw new UnsupportedOperationException(); } + @Override + public Query filterQuery(Query query) { + throw new UnsupportedOperationException(); + } + @Override protected IndexFieldData buildFieldData(MappedFieldType ft) { return ft.fielddataBuilder("test", null).build(new IndexFieldDataCache.None(), new NoneCircuitBreakerService()); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 507871902387a..5ccb923a07d6d 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -296,7 +296,8 @@ public void onCache(ShardId shardId, Accountable accountable) {} bitsetFilterCache, randomInt(), () -> 0L, - () -> false + () -> false, + q -> q ); releasables.add(context); return context;