Skip to content

Commit

Permalink
Tests for runtime field queries with fbf aggs (backport of elastic#71503
Browse files Browse the repository at this point in the history
) (elastic#71585)

This adds a few tests for runtime field queries applied to
"filter-by-filter" style aggregations. We expect to still be able to
use filter-by-filter aggregations to speed up collection when the top
level query is a runtime field. You'd think that filter-by-filter would
be slow when the top level query is slow, like it is with runtime
fields, but we only run filter-by-filter when we can translate each
aggregation bucket into a quick query. So long as the results of those
queries don't "overlap" we shouldn't end up running the slower top level
query more times than we would during regular collection.

This also adds some javadoc to that effect to the two places where we
chose between filter-by-filter and a "native" aggregation
implementation.
  • Loading branch information
nik9000 authored Apr 12, 2021
1 parent 27892c3 commit ca8379b
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,10 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
* Without sub.isNoop we always end up in the `collectXXX` modes even if
* the sub-aggregators opt out of traditional collection.
*/
segmentsCounted++;
collectCount(ctx, live);
} else {
segmentsCollected++;
collectSubs(ctx, live, sub);
}
return LeafBucketCollector.NO_OP_COLLECTOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ protected int collect(LeafBucketCollector sub, int doc, double value, long ownin
}
}

private static class FromFilters<B extends InternalRange.Bucket> extends AdaptingAggregator {
static class FromFilters<B extends InternalRange.Bucket> extends AdaptingAggregator {
private final DocValueFormat format;
private final Range[] ranges;
private final boolean keyed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.filter.QueryToFilterAdapter;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilters;
import org.elasticsearch.search.aggregations.bucket.filter.QueryToFilterAdapter;
import org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTermsAggregator.OrdBucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
import org.elasticsearch.search.aggregations.support.AggregationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ Aggregator create(String name,
* which *have* queries but they are slow and have high
* cost. But runtime fields don't have global ords
* so we won't have got here anyway.
*
* It's totally possible that there might be a top level
* query that was generated by a runtime field. That
* query might indeed be slow, but we won't execute it
* any more times doing filter-by-filter then we would
* doing regular collection.
*/
return adapted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,28 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.StringFieldScript;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.runtime.StringScriptFieldTermQuery;

import java.io.IOException;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static java.util.Collections.singleton;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;

public class RangeAggregatorTests extends AggregatorTestCase {
Expand Down Expand Up @@ -467,6 +475,54 @@ public void testOverlappingRanges() throws IOException {
}, new NumberFieldMapper.NumberFieldType(NUMBER_FIELD_NAME, NumberFieldMapper.NumberType.INTEGER));
}

/**
* If the top level query is a runtime field we should still use
* {@link RangeAggregator.FromFilters} because we expect it'll still be faster
* that the normal aggregator, even though running the script for the runtime
* field is quite a bit more expensive than a regular query. The thing is, we
* won't be executing the script more times than we would if it were just at
* the top level.
*/
public void testRuntimeFieldTopLevelQueryStillOptimized() throws IOException {
long totalDocs = (long) RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 4;
SearchLookup lookup = new SearchLookup(s -> null, (ft, l) -> null);
StringFieldScript.LeafFactory scriptFactory = ctx -> new StringFieldScript(
"dummy",
org.elasticsearch.common.collect.Map.of(),
lookup,
ctx
) {
@Override
public void execute() {
emit("cat");
}
};
Query query = new StringScriptFieldTermQuery(new Script("dummy"), scriptFactory, "dummy", "cat", false);
debugTestCase(new RangeAggregationBuilder("r").field(NUMBER_FIELD_NAME).addRange(0, 1).addRange(1, 2).addRange(2, 3), query, iw -> {
for (int d = 0; d < totalDocs; d++) {
iw.addDocument(
org.elasticsearch.common.collect.List.of(
new IntPoint(NUMBER_FIELD_NAME, 0),
new SortedNumericDocValuesField(NUMBER_FIELD_NAME, 0)
)
);
}
}, (InternalRange<?, ?> r, Class<? extends Aggregator> impl, Map<String, Object> debug) -> {
assertThat(
r.getBuckets().stream().map(InternalRange.Bucket::getKey).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of("0.0-1.0", "1.0-2.0", "2.0-3.0"))
);
assertThat(
r.getBuckets().stream().map(InternalRange.Bucket::getDocCount).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of(totalDocs, 0L, 0L))
);
assertThat(impl, equalTo(RangeAggregator.FromFilters.class));
Map<?, ?> delegateDebug = (Map<?, ?>) debug.get("delegate_debug");
assertThat(delegateDebug, hasEntry("estimated_cost", totalDocs));
assertThat(delegateDebug, hasEntry("max_cost", totalDocs));
}, new NumberFieldMapper.NumberFieldType(NUMBER_FIELD_NAME, NumberFieldMapper.NumberType.INTEGER));
}

private void testCase(Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalRange<? extends InternalRange.Bucket, ? extends InternalRange>> verify) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.script.StringFieldScript;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
Expand Down Expand Up @@ -99,6 +100,8 @@
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.runtime.StringScriptFieldTermQuery;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.test.geo.RandomGeoGenerator;
Expand Down Expand Up @@ -128,7 +131,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;

public class TermsAggregatorTests extends AggregatorTestCase {

Expand Down Expand Up @@ -1746,6 +1751,60 @@ public void testWithFilterAndPreciseSize() throws IOException {
}, kft);
}

/**
* If the top level query is a runtime field we should still use
* {@link StringTermsAggregatorFromFilters} because we expect it'll still
* be faster that the normal aggregator, even though running the script
* for the runtime field is quite a bit more expensive than a regular
* query. The thing is, we won't be executing the script more times than
* we would if it were just at the top level.
*/
public void testRuntimeFieldTopLevelQueryStillOptimized() throws IOException {
long totalDocs = 500;
SearchLookup lookup = new SearchLookup(s -> null, (ft, l) -> null);
StringFieldScript.LeafFactory scriptFactory = ctx -> new StringFieldScript(
"dummy",
org.elasticsearch.common.collect.Map.of(),
lookup,
ctx
) {
@Override
public void execute() {
emit("cat");
}
};
BytesRef[] values = new BytesRef[] {
new BytesRef("stuff"), new BytesRef("more_stuff"), new BytesRef("other_stuff"),
};
Query query = new StringScriptFieldTermQuery(new Script("dummy"), scriptFactory, "dummy", "cat", false);
debugTestCase(new TermsAggregationBuilder("t").field("k"), query, iw -> {
for (int d = 0; d < totalDocs; d++) {
BytesRef value = values[d % values.length];
iw.addDocument(
org.elasticsearch.common.collect.List.of(
new Field("k", value, KeywordFieldMapper.Defaults.FIELD_TYPE),
new SortedSetDocValuesField("k", value)
)
);
}
}, (StringTerms r, Class<? extends Aggregator> impl, Map<String, Object> debug) -> {
assertThat(
r.getBuckets().stream().map(StringTerms.Bucket::getKey).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of("more_stuff", "stuff", "other_stuff"))
);
assertThat(
r.getBuckets().stream().map(StringTerms.Bucket::getDocCount).collect(toList()),
equalTo(org.elasticsearch.common.collect.List.of(167L, 167L, 166L))
);
assertThat(impl, equalTo(StringTermsAggregatorFromFilters.class));
Map<?, ?> delegateDebug = (Map<?, ?>) debug.get("delegate_debug");
// We don't estimate the cost here so these shouldn't show up
assertThat(delegateDebug, not(hasKey("estimated_cost")));
assertThat(delegateDebug, not(hasKey("max_cost")));
assertThat((int) delegateDebug.get("segments_counted"), greaterThan(0));
}, new KeywordFieldType("k", true, true, Collections.emptyMap()));
}

private final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
private List<Document> generateDocsWithNested(String id, int value, int[] nestedValues) {
List<Document> documents = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -97,6 +98,7 @@
import org.elasticsearch.search.NestedDocuments;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
Expand Down Expand Up @@ -550,6 +552,62 @@ protected <T extends AggregationBuilder, V extends InternalAggregation> void tes
}
}

/**
* Execute and aggregation and collect its {@link Aggregator#collectDebugInfo debug}
* information. Unlike {@link #testCase} this doesn't randomly create an
* {@link Aggregator} per leaf and perform partial reductions. It always
* creates a single {@link Aggregator} so we can get consistent debug info.
*/
protected <R extends InternalAggregation> void debugTestCase(
AggregationBuilder builder,
Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
TriConsumer<R, Class<? extends Aggregator>, Map<String, Object>> verify,
MappedFieldType... fieldTypes
) throws IOException {
try (Directory directory = newDirectory()) {
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
indexWriter.close();

try (DirectoryReader unwrapped = DirectoryReader.open(directory); IndexReader indexReader = wrapDirectoryReader(unwrapped)) {
IndexSearcher searcher = newIndexSearcher(indexReader);
// Don't use searchAndReduce because we only want a single aggregator.
IndexReaderContext ctx = searcher.getTopReaderContext();
CircuitBreakerService breakerService = new NoneCircuitBreakerService();
AggregationContext context = createAggregationContext(
searcher,
createIndexSettings(),
searcher.rewrite(query),
breakerService,
builder.bytesToPreallocate(),
DEFAULT_MAX_BUCKETS,
fieldTypes
);
Aggregator aggregator = createAggregator(builder, context);
aggregator.preCollection();
searcher.search(context.query(), aggregator);
aggregator.postCollection();
InternalAggregation r = aggregator.buildTopLevel();
r = r.reduce(
org.elasticsearch.common.collect.List.of(r),
ReduceContext.forFinalReduction(
context.bigArrays(),
getMockScriptService(),
context.multiBucketConsumer(),
builder.buildPipelineTree()
)
);
@SuppressWarnings("unchecked") // We'll get a cast error in the test if we're wrong here and that is ok
R result = (R) r;
Map<String, Object> debug = new HashMap<>();
aggregator.collectDebugInfo(debug::put);
verify.apply(result, aggregator.getClass(), debug);
verifyOutputFieldNames(builder, result);
}
}
}

protected void withAggregator(
AggregationBuilder aggregationBuilder,
Query query,
Expand Down

0 comments on commit ca8379b

Please sign in to comment.