diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java index 53070495695df..58f341b7b867f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java @@ -389,8 +389,10 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket * Without sub.isNoop we always end up in the `collectXXX` modes even if * the sub-aggregators opt out of traditional collection. */ + segmentsCounted++; collectCount(ctx, live); } else { + segmentsCollected++; collectSubs(ctx, live, sub); } return LeafBucketCollector.NO_OP_COLLECTOR; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index c1b74e9ce3e86..7e78c12d97b39 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -745,7 +745,7 @@ protected int collect(LeafBucketCollector sub, int doc, double value, long ownin } } - private static class FromFilters extends AdaptingAggregator { + static class FromFilters extends AdaptingAggregator { private final DocValueFormat format; private final Range[] ranges; private final boolean keyed; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java index 56145dbe2ad45..4a4f5f0fbbe03 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java @@ -23,9 +23,9 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; -import org.elasticsearch.search.aggregations.bucket.filter.QueryToFilterAdapter; import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilters; +import org.elasticsearch.search.aggregations.bucket.filter.QueryToFilterAdapter; import org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTermsAggregator.OrdBucket; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds; import org.elasticsearch.search.aggregations.support.AggregationContext; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index d214bfd0580d4..e74a26255fda8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -384,6 +384,12 @@ Aggregator create(String name, * which *have* queries but they are slow and have high * cost. But runtime fields don't have global ords * so we won't have got here anyway. + * + * It's totally possible that there might be a top level + * query that was generated by a runtime field. That + * query might indeed be slow, but we won't execute it + * any more times doing filter-by-filter then we would + * doing regular collection. */ return adapted; } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregatorTests.java index 6309fff8714d9..7ad78775722f3 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregatorTests.java @@ -28,20 +28,28 @@ import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.StringFieldScript; +import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.CardinalityUpperBound; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; +import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.search.runtime.StringScriptFieldTermQuery; import java.io.IOException; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static java.util.Collections.singleton; +import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasSize; public class RangeAggregatorTests extends AggregatorTestCase { @@ -467,6 +475,54 @@ public void testOverlappingRanges() throws IOException { }, new NumberFieldMapper.NumberFieldType(NUMBER_FIELD_NAME, NumberFieldMapper.NumberType.INTEGER)); } + /** + * If the top level query is a runtime field we should still use + * {@link RangeAggregator.FromFilters} because we expect it'll still be faster + * that the normal aggregator, even though running the script for the runtime + * field is quite a bit more expensive than a regular query. The thing is, we + * won't be executing the script more times than we would if it were just at + * the top level. + */ + public void testRuntimeFieldTopLevelQueryStillOptimized() throws IOException { + long totalDocs = (long) RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 4; + SearchLookup lookup = new SearchLookup(s -> null, (ft, l) -> null); + StringFieldScript.LeafFactory scriptFactory = ctx -> new StringFieldScript( + "dummy", + org.elasticsearch.common.collect.Map.of(), + lookup, + ctx + ) { + @Override + public void execute() { + emit("cat"); + } + }; + Query query = new StringScriptFieldTermQuery(new Script("dummy"), scriptFactory, "dummy", "cat", false); + debugTestCase(new RangeAggregationBuilder("r").field(NUMBER_FIELD_NAME).addRange(0, 1).addRange(1, 2).addRange(2, 3), query, iw -> { + for (int d = 0; d < totalDocs; d++) { + iw.addDocument( + org.elasticsearch.common.collect.List.of( + new IntPoint(NUMBER_FIELD_NAME, 0), + new SortedNumericDocValuesField(NUMBER_FIELD_NAME, 0) + ) + ); + } + }, (InternalRange r, Class impl, Map debug) -> { + assertThat( + r.getBuckets().stream().map(InternalRange.Bucket::getKey).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of("0.0-1.0", "1.0-2.0", "2.0-3.0")) + ); + assertThat( + r.getBuckets().stream().map(InternalRange.Bucket::getDocCount).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(totalDocs, 0L, 0L)) + ); + assertThat(impl, equalTo(RangeAggregator.FromFilters.class)); + Map delegateDebug = (Map) debug.get("delegate_debug"); + assertThat(delegateDebug, hasEntry("estimated_cost", totalDocs)); + assertThat(delegateDebug, hasEntry("max_cost", totalDocs)); + }, new NumberFieldMapper.NumberFieldType(NUMBER_FIELD_NAME, NumberFieldMapper.NumberType.INTEGER)); + } + private void testCase(Query query, CheckedConsumer buildIndex, Consumer> verify) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index 86c42fbcbce0b..b53d171b879bd 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.script.StringFieldScript; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -99,6 +100,8 @@ import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.search.runtime.StringScriptFieldTermQuery; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.ScoreSortBuilder; import org.elasticsearch.test.geo.RandomGeoGenerator; @@ -128,7 +131,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; public class TermsAggregatorTests extends AggregatorTestCase { @@ -1746,6 +1751,60 @@ public void testWithFilterAndPreciseSize() throws IOException { }, kft); } + /** + * If the top level query is a runtime field we should still use + * {@link StringTermsAggregatorFromFilters} because we expect it'll still + * be faster that the normal aggregator, even though running the script + * for the runtime field is quite a bit more expensive than a regular + * query. The thing is, we won't be executing the script more times than + * we would if it were just at the top level. + */ + public void testRuntimeFieldTopLevelQueryStillOptimized() throws IOException { + long totalDocs = 500; + SearchLookup lookup = new SearchLookup(s -> null, (ft, l) -> null); + StringFieldScript.LeafFactory scriptFactory = ctx -> new StringFieldScript( + "dummy", + org.elasticsearch.common.collect.Map.of(), + lookup, + ctx + ) { + @Override + public void execute() { + emit("cat"); + } + }; + BytesRef[] values = new BytesRef[] { + new BytesRef("stuff"), new BytesRef("more_stuff"), new BytesRef("other_stuff"), + }; + Query query = new StringScriptFieldTermQuery(new Script("dummy"), scriptFactory, "dummy", "cat", false); + debugTestCase(new TermsAggregationBuilder("t").field("k"), query, iw -> { + for (int d = 0; d < totalDocs; d++) { + BytesRef value = values[d % values.length]; + iw.addDocument( + org.elasticsearch.common.collect.List.of( + new Field("k", value, KeywordFieldMapper.Defaults.FIELD_TYPE), + new SortedSetDocValuesField("k", value) + ) + ); + } + }, (StringTerms r, Class impl, Map debug) -> { + assertThat( + r.getBuckets().stream().map(StringTerms.Bucket::getKey).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of("more_stuff", "stuff", "other_stuff")) + ); + assertThat( + r.getBuckets().stream().map(StringTerms.Bucket::getDocCount).collect(toList()), + equalTo(org.elasticsearch.common.collect.List.of(167L, 167L, 166L)) + ); + assertThat(impl, equalTo(StringTermsAggregatorFromFilters.class)); + Map delegateDebug = (Map) debug.get("delegate_debug"); + // We don't estimate the cost here so these shouldn't show up + assertThat(delegateDebug, not(hasKey("estimated_cost"))); + assertThat(delegateDebug, not(hasKey("max_cost"))); + assertThat((int) delegateDebug.get("segments_counted"), greaterThan(0)); + }, new KeywordFieldType("k", true, true, Collections.emptyMap())); + } + private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); private List generateDocsWithNested(String id, int value, int[] nestedValues) { List documents = new ArrayList<>(); diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 249cc3a0c715c..507871902387a 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -97,6 +98,7 @@ import org.elasticsearch.search.NestedDocuments; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; @@ -550,6 +552,62 @@ protected void tes } } + /** + * Execute and aggregation and collect its {@link Aggregator#collectDebugInfo debug} + * information. Unlike {@link #testCase} this doesn't randomly create an + * {@link Aggregator} per leaf and perform partial reductions. It always + * creates a single {@link Aggregator} so we can get consistent debug info. + */ + protected void debugTestCase( + AggregationBuilder builder, + Query query, + CheckedConsumer buildIndex, + TriConsumer, Map> verify, + MappedFieldType... fieldTypes + ) throws IOException { + try (Directory directory = newDirectory()) { + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + buildIndex.accept(indexWriter); + indexWriter.close(); + + try (DirectoryReader unwrapped = DirectoryReader.open(directory); IndexReader indexReader = wrapDirectoryReader(unwrapped)) { + IndexSearcher searcher = newIndexSearcher(indexReader); + // Don't use searchAndReduce because we only want a single aggregator. + IndexReaderContext ctx = searcher.getTopReaderContext(); + CircuitBreakerService breakerService = new NoneCircuitBreakerService(); + AggregationContext context = createAggregationContext( + searcher, + createIndexSettings(), + searcher.rewrite(query), + breakerService, + builder.bytesToPreallocate(), + DEFAULT_MAX_BUCKETS, + fieldTypes + ); + Aggregator aggregator = createAggregator(builder, context); + aggregator.preCollection(); + searcher.search(context.query(), aggregator); + aggregator.postCollection(); + InternalAggregation r = aggregator.buildTopLevel(); + r = r.reduce( + org.elasticsearch.common.collect.List.of(r), + ReduceContext.forFinalReduction( + context.bigArrays(), + getMockScriptService(), + context.multiBucketConsumer(), + builder.buildPipelineTree() + ) + ); + @SuppressWarnings("unchecked") // We'll get a cast error in the test if we're wrong here and that is ok + R result = (R) r; + Map debug = new HashMap<>(); + aggregator.collectDebugInfo(debug::put); + verify.apply(result, aggregator.getClass(), debug); + verifyOutputFieldNames(builder, result); + } + } + } + protected void withAggregator( AggregationBuilder aggregationBuilder, Query query,