Skip to content

Commit

Permalink
Add setting to disable aggs optimization (backport of elastic#73620)
Browse files Browse the repository at this point in the history
Sometimes our fancy "run this agg as a Query" optimizations end up
slower than running the aggregation in the old way. We know that and use
heuristics to dissable the optimization in that case. But it turns out
that the process of running the heuristics itself can be slow, depending
on the query. Worse, changing the heuristics requires an upgrade, which
means waiting. If the heurisics make a terrible choice folks need a
quick way out. This adds such a way: a cluster level setting that
contains a list of queries that are considered "too expensive" to try
and optimize. If the top level query contains any of those queries we'll
disable the "run as Query" optimization.

The default for this settings is wildcard and term-in-set queries, which
is fairly conservative. There are certainly wildcard and term-in-set
queries that the optimization works well with, but there are other queries
of that type that it works very badly with. So we're being careful.

Better, you can modify this setting in a running cluster to disable the
optimization if we find a new type of query that doesn't work well.

Closes elastic#73426
  • Loading branch information
nik9000 committed Jun 2, 2021
1 parent 50d4a37 commit d1e68b8
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ public Version indexVersionCreated() {
return Version.CURRENT;
}

@Override
public boolean enableRewriteToFilterByFilter() {
return true;
}

@Override
public void close() {
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
setup:
- do:
cluster.put_settings:
body:
persistent:
search.aggs.rewrite_to_filter_by_filter: false

---
teardown:
- do:
cluster.put_settings:
body:
persistent:
search.aggs.rewrite_to_filter_by_filter: null

---
does not use optimization:
- skip:
version: " - 7.13.99"
reason: setting to disable optimization added in 7.14.0 to be backported to 7.13.2
- do:
bulk:
index: test
refresh: true
body: |
{ "index": {} }
{ "str": "sheep" }
{ "index": {} }
{ "str": "sheep" }
{ "index": {} }
{ "str": "cow" }
{ "index": {} }
{ "str": "pig" }
- do:
search:
index: test
body:
profile: true
size: 0
aggs:
str_terms:
terms:
field: str.keyword
- match: { aggregations.str_terms.buckets.0.key: sheep }
- match: { aggregations.str_terms.buckets.1.key: cow }
- match: { aggregations.str_terms.buckets.2.key: pig }
- match: { profile.shards.0.aggregations.0.type: GlobalOrdinalsStringTermsAggregator }
- match: { profile.shards.0.aggregations.0.description: str_terms }
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
Expand All @@ -30,6 +32,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static io.github.nik9000.mapmatcher.MapMatcher.assertMap;
import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler;
Expand Down Expand Up @@ -599,7 +603,7 @@ public void testFilterByFilter() throws InterruptedException, IOException {
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
assertThat(profileResults.size(), equalTo(getNumShards("dateidx").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
Expand Down Expand Up @@ -649,4 +653,88 @@ public void testFilterByFilter() throws InterruptedException, IOException {
assertThat(queryDebug, hasEntry("query", "DocValuesFieldExistsQuery [field=date]"));
}
}

public void testDateHistogramFilterByFilterDisabled() throws InterruptedException, IOException {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER.getKey(), false))
);
try {
assertAcked(
client().admin()
.indices()
.prepareCreate("date_filter_by_filter_disabled")
.setSettings(org.elasticsearch.common.collect.Map.of("number_of_shards", 1, "number_of_replicas", 0))
.addMapping("_doc", "date", "type=date", "keyword", "type=keyword")
.get()
);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 2; i++) {
String date = Instant.ofEpochSecond(i).toString();
builders.add(
client().prepareIndex("date_filter_by_filter_disabled", "_doc")
.setSource(
jsonBuilder().startObject()
.field("date", date)
.endObject()
)
);
}
indexRandom(true, false, builders);

SearchResponse response = client().prepareSearch("date_filter_by_filter_disabled")
.setProfile(true)
.addAggregation(new DateHistogramAggregationBuilder("histo").field("date").calendarInterval(DateHistogramInterval.MONTH))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("date_filter_by_filter_disabled").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(1));
ProfileResult histoAggResult = aggProfileResultsList.get(0);
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("DateHistogramAggregator.FromDateRange"));
assertThat(histoAggResult.getLuceneDescription(), equalTo("histo"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertMap(
breakdown,
matchesMap().entry(INITIALIZE, greaterThan(0L))
.entry(INITIALIZE + "_count", greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR, greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR + "_count", greaterThan(0L))
.entry(COLLECT, greaterThan(0L))
.entry(COLLECT + "_count", greaterThan(0L))
.entry(POST_COLLECTION, greaterThan(0L))
.entry(POST_COLLECTION + "_count", 1L)
.entry(BUILD_AGGREGATION, greaterThan(0L))
.entry(BUILD_AGGREGATION + "_count", greaterThan(0L))
.entry(REDUCE, 0L)
.entry(REDUCE + "_count", 0L)
);
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertMap(
debug,
matchesMap().entry("delegate", "RangeAggregator.NoOverlap")
.entry("delegate_debug", matchesMap().entry("ranges", 1).entry("average_docs_per_range", 10000.0))
);
}
} finally {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull(SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER.getKey()))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public void apply(Settings value, Settings current, Settings previous) {
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_ATTRIBUTES,
Expand Down
20 changes: 19 additions & 1 deletion server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT =
Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope);

public static final Setting<Boolean> ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER = Setting.boolSetting(
"search.aggs.rewrite_to_filter_by_filter",
true,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down Expand Up @@ -197,6 +204,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile int maxOpenScrollContext;

private volatile boolean enableRewriteAggsToFilterByFilter;

private final Cancellable keepAliveReaper;

private final AtomicLong idGenerator = new AtomicLong();
Expand Down Expand Up @@ -243,6 +252,10 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);

enableRewriteAggsToFilterByFilter = ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER, this::setEnableRewriteAggsToFilterByFilter);
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -279,6 +292,10 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

private void setEnableRewriteAggsToFilterByFilter(boolean enableRewriteAggsToFilterByFilter) {
this.enableRewriteAggsToFilterByFilter = enableRewriteAggsToFilterByFilter;
}

@Override
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information
Expand Down Expand Up @@ -984,7 +1001,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.indexShard().shardId().hashCode(),
context::getRelativeTimeInMillis,
context::isCancelled,
context::buildFilteredQuery
context::buildFilteredQuery,
enableRewriteAggsToFilterByFilter
);
context.addReleasable(aggContext);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
if (false == FiltersAggregator.canUseFilterByFilter(parent, null)) {
return null;
}
if (false == context.enableRewriteToFilterByFilter()) {
return null;
}
boolean wholeNumbersOnly = false == ((ValuesSource.Numeric) valuesSourceConfig.getValuesSource()).isFloatingPoint();
List<QueryToFilterAdapter<?>> filters = new ArrayList<>(ranges.length);
for (int i = 0; i < ranges.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator {
order,
format,
bucketCountThresholds,
l -> true,
ALWAYS_TRUE,
context,
parent,
remapGlobalOrds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ Aggregator create(String name,
.getValuesSource();
SortedSetDocValues values = globalOrdsValues(context, ordinalsValuesSource);
long maxOrd = values.getValueCount();
if (maxOrd > 0 && maxOrd <= MAX_ORDS_TO_TRY_FILTERS) {
if (maxOrd > 0 && maxOrd <= MAX_ORDS_TO_TRY_FILTERS && context.enableRewriteToFilterByFilter()) {
StringTermsAggregatorFromFilters adapted = StringTermsAggregatorFromFilters.adaptIntoFiltersOrNull(
name,
factories,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator.FilterByFilter;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.profile.aggregation.AggregationProfiler;
Expand Down Expand Up @@ -247,6 +248,16 @@ public final AggregationUsageService getUsageService() {
*/
public abstract boolean isCacheable();

/**
* Are aggregations allowed to try to rewrite themselves into
* {@link FilterByFilter} aggregations? <strong>Often</strong>
* {@linkplain FilterByFilter} is faster to execute, but it isn't
* always. For now this just hooks into a cluster level setting
* so users can disable the behavior when the existing heuristics
* don't detect cases where its slower.
*/
public abstract boolean enableRewriteToFilterByFilter();

/**
* Implementation of {@linkplain AggregationContext} for production usage
* that wraps our ubiquitous {@link SearchExecutionContext} and anything else
Expand All @@ -267,6 +278,7 @@ public static class ProductionAggregationContext extends AggregationContext {
private final LongSupplier relativeTimeInMillis;
private final Supplier<Boolean> isCancelled;
private final Function<Query, Query> filterQuery;
private final boolean enableRewriteToFilterByFilter;

private final List<Aggregator> releaseMe = new ArrayList<>();

Expand All @@ -282,7 +294,8 @@ public ProductionAggregationContext(
int randomSeed,
LongSupplier relativeTimeInMillis,
Supplier<Boolean> isCancelled,
Function<Query, Query> filterQuery
Function<Query, Query> filterQuery,
boolean enableRewriteToFilterByFilter
) {
this.context = context;
if (bytesToPreallocate == 0) {
Expand Down Expand Up @@ -313,6 +326,7 @@ public ProductionAggregationContext(
this.relativeTimeInMillis = relativeTimeInMillis;
this.isCancelled = isCancelled;
this.filterQuery = filterQuery;
this.enableRewriteToFilterByFilter = enableRewriteToFilterByFilter;
}

@Override
Expand Down Expand Up @@ -474,6 +488,11 @@ public boolean isCacheable() {
return context.isCacheable();
}

@Override
public boolean enableRewriteToFilterByFilter() {
return enableRewriteToFilterByFilter;
}

@Override
public void close() {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -841,7 +844,7 @@ private <T> void termsAggregator(ValueType valueType, MappedFieldType fieldType,
.size(numTerms)
.collectMode(randomFrom(Aggregator.SubAggCollectionMode.values()))
.field("field"));
context = createAggregationContext(indexSearcher, null, fieldType, filterFieldType);
context = createAggregationContext(indexSearcher, new MatchAllDocsQuery(), fieldType, filterFieldType);
aggregator = createAggregator(aggregationBuilder, context);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
Expand Down Expand Up @@ -1026,7 +1029,7 @@ public void testUnmappedWithMissing() throws Exception {
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
.userValueTypeHint(valueTypes[i])
.field(fieldNames[i]).missing(missingValues[i]);
AggregationContext context = createAggregationContext(indexSearcher, null, fieldType1);
AggregationContext context = createAggregationContext(indexSearcher, new MatchAllDocsQuery(), fieldType1);
Aggregator aggregator = createAggregator(aggregationBuilder, context);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
Expand Down Expand Up @@ -1776,7 +1779,10 @@ public void testWithFilterAndPreciseSize() throws IOException {
* would trigger that bug.
*/
builder.size(2).order(BucketOrder.key(true));
Query topLevel = new TermInSetQuery("k", new BytesRef[] {new BytesRef("b"), new BytesRef("c")});
Query topLevel = new BooleanQuery.Builder()
.add(new TermQuery(new Term("k", "b")), Occur.SHOULD)
.add(new TermQuery(new Term("k", "c")), Occur.SHOULD)
.build();
testCase(builder, topLevel, buildIndex, (StringTerms terms) -> {
assertThat(
terms.getBuckets().stream().map(StringTerms.Bucket::getKey).collect(toList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ public boolean isCacheable() {
throw new UnsupportedOperationException();
}

@Override
public boolean enableRewriteToFilterByFilter() {
throw new UnsupportedOperationException();
}

@Override
public void close() {
throw new UnsupportedOperationException();
Expand Down
Loading

0 comments on commit d1e68b8

Please sign in to comment.