Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting to disable aggs optimization #73589

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ public boolean isCacheable() {
throw new UnsupportedOperationException();
}

@Override
public boolean topLevelIsExpensiveToPrepare() {
return false;
}

@Override
public void close() {
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
Expand All @@ -28,8 +31,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static io.github.nik9000.mapmatcher.MapMatcher.assertMap;
import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that should not be here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No?

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler;
Expand Down Expand Up @@ -598,7 +604,7 @@ public void testFilterByFilter() throws InterruptedException, IOException {
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
assertThat(profileResults.size(), equalTo(getNumShards("dateidx").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
Expand Down Expand Up @@ -645,4 +651,140 @@ public void testFilterByFilter() throws InterruptedException, IOException {
assertThat(queryDebug, hasEntry("query", "DocValuesFieldExistsQuery [field=date]"));
}
}

/**
* Make sure that we don't attempt to run date_histogram in filter-by-filter
* if the top level query contains a wildcard because those are very expensive
* to build and all the sub-queries the agg generates would slow down the execution.
*/
public void testDateHistogramTopLevelWildcard() throws InterruptedException, IOException {
assertAcked(client().admin().indices().prepareCreate("datekwd")
.setSettings(Map.of("number_of_shards", 1, "number_of_replicas", 0))
.setMapping("date", "type=date", "keyword", "type=keyword").get());
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 2; i++) {
String date = Instant.ofEpochSecond(i).toString();
builders.add(client().prepareIndex("datekwd").setSource(jsonBuilder().startObject()
.field("date", date)
.field("keyword", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(TimeUnit.SECONDS.toMillis(i)))
.endObject()));
}
indexRandom(true, false, builders);

SearchResponse response = client().prepareSearch("datekwd")
.setProfile(true)
.setQuery(new WildcardQueryBuilder("keyword", "*-01-01*"))
.addAggregation(new DateHistogramAggregationBuilder("histo").field("date").calendarInterval(DateHistogramInterval.MONTH))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("datekwd").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(1));
ProfileResult histoAggResult = aggProfileResultsList.get(0);
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("DateHistogramAggregator.FromDateRange"));
assertThat(histoAggResult.getLuceneDescription(), equalTo("histo"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertMap(
breakdown,
matchesMap().entry(INITIALIZE, greaterThan(0L))
.entry(INITIALIZE + "_count", greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR, greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR + "_count", greaterThan(0L))
.entry(COLLECT, greaterThan(0L))
.entry(COLLECT + "_count", greaterThan(0L))
.entry(POST_COLLECTION, greaterThan(0L))
.entry(POST_COLLECTION + "_count", 1L)
.entry(BUILD_AGGREGATION, greaterThan(0L))
.entry(BUILD_AGGREGATION + "_count", greaterThan(0L))
.entry(REDUCE, 0L)
.entry(REDUCE + "_count", 0L)
);
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertMap(
debug,
matchesMap().entry("delegate", "RangeAggregator.NoOverlap")
.entry("delegate_debug", matchesMap().entry("ranges", 1).entry("average_docs_per_range", 10000.0))
);
}
}

/**
* Make sure that we don't attempt to run date_histogram in filter-by-filter
* if the top level query contains a wildcard because those are very expensive
* to build and all the sub-queries the agg generates would slow down the execution.
*/
public void testDateHistogramTopLevelTerms() throws InterruptedException, IOException {
assertAcked(client().admin().indices().prepareCreate("datekwd2")
.setSettings(Map.of("number_of_shards", 1, "number_of_replicas", 0))
.setMapping("date", "type=date", "keyword", "type=keyword").get());
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 2; i++) {
String date = Instant.ofEpochSecond(i).toString();
builders.add(client().prepareIndex("datekwd2").setSource(jsonBuilder().startObject()
.field("date", date)
.field("keyword", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(TimeUnit.SECONDS.toMillis(i)))
.endObject()));
}
indexRandom(true, false, builders);

String[] terms = new String[20]; // Needs to be more than 16 or else we rewrite out of TermInSetQuery
for (int i = 0; i < terms.length; i++) {
terms[i] = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(TimeUnit.SECONDS.toMillis(i));
}
SearchResponse response = client().prepareSearch("datekwd2")
.setProfile(true)
.setQuery(new TermsQueryBuilder("keyword", terms))
.addAggregation(new DateHistogramAggregationBuilder("histo").field("date").calendarInterval(DateHistogramInterval.MONTH))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("datekwd2").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(1));
ProfileResult histoAggResult = aggProfileResultsList.get(0);
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("DateHistogramAggregator.FromDateRange"));
assertThat(histoAggResult.getLuceneDescription(), equalTo("histo"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertMap(
breakdown,
matchesMap().entry(INITIALIZE, greaterThan(0L))
.entry(INITIALIZE + "_count", greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR, greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR + "_count", greaterThan(0L))
.entry(COLLECT, greaterThan(0L))
.entry(COLLECT + "_count", greaterThan(0L))
.entry(POST_COLLECTION, greaterThan(0L))
.entry(POST_COLLECTION + "_count", 1L)
.entry(BUILD_AGGREGATION, greaterThan(0L))
.entry(BUILD_AGGREGATION + "_count", greaterThan(0L))
.entry(REDUCE, 0L)
.entry(REDUCE + "_count", 0L)
);
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertMap(
debug,
matchesMap().entry("delegate", "RangeAggregator.NoOverlap")
.entry("delegate_debug", matchesMap().entry("ranges", 1).entry("average_docs_per_range", 10000.0))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ public void apply(Settings value, Settings current, Settings previous) {
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.EXPENSIVE_QUERIES_TO_PREPARE,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_ATTRIBUTES,
Expand Down
48 changes: 47 additions & 1 deletion server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.AutomatonQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -69,6 +72,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.ExpensiveQueriesToPrepare;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
Expand Down Expand Up @@ -164,6 +168,38 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT =
Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope);

public static final Setting<List<Class<? extends Query>>> EXPENSIVE_QUERIES_TO_PREPARE = Setting.listSetting(
"search.expensive_to_prepare",
List.of(AutomatonQuery.class.getName(), TermInSetQuery.class.getName()),
s -> {
try {
/*
* If the query isn't on the *server's* classpath we'll fail here. This
* ain't great, but its ok for this because:
* 1. Overriding this setting should be rare and temporary and *hopefully*
* folks will only do it after filing an issue. We'll fix the issue and
* they can remove their override.
* 2. Server has Lucene on the classpath so it should be able to see most
* queries.
*
* If Elasticsearch of Lucene drops a query and you list it in this setting
* then it won't start. This ain't great. We aren't going to not delete a
* class for BWC on this setting. But we think that's ok because this is
* should be a temporary setting, set after filing a bug. It's not great
* behavior, but it seems like the right amount of effort for something so
* rarely used.
*/
return Class.forName(s).asSubclass(Query.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Unknown query [" + s + "]", e);
} catch (ClassCastException e) {
throw new IllegalArgumentException("Not a query [" + s + "]", e);
}
},
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down Expand Up @@ -197,6 +233,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile int maxOpenScrollContext;

private volatile ExpensiveQueriesToPrepare expensiveQueriesToPrepare;

private final Cancellable keepAliveReaper;

private final AtomicLong idGenerator = new AtomicLong();
Expand Down Expand Up @@ -243,6 +281,9 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);

setExpensiveQueriesToPrepare(EXPENSIVE_QUERIES_TO_PREPARE.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(EXPENSIVE_QUERIES_TO_PREPARE, this::setExpensiveQueriesToPrepare);
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -279,6 +320,10 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

private void setExpensiveQueriesToPrepare(List<Class<? extends Query>> expensiveQueriesToPrepare) {
this.expensiveQueriesToPrepare = new ExpensiveQueriesToPrepare(expensiveQueriesToPrepare);
}

@Override
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information
Expand Down Expand Up @@ -968,7 +1013,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.indexShard().shardId().hashCode(),
context::getRelativeTimeInMillis,
context::isCancelled,
context::buildFilteredQuery
context::buildFilteredQuery,
expensiveQueriesToPrepare
);
context.addReleasable(aggContext);
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryVisitor;
import org.apache.lucene.util.automaton.ByteRunAutomaton;

import java.util.List;
import java.util.function.Supplier;

/**
* Aggregations have optimizations that only work if the top level query is
* fairly cheap to prepare and they use this to detect expensive queries and
* disable the optimization.
*/
public class ExpensiveQueriesToPrepare {
private final List<Class<? extends Query>> expensiveQueries;

public ExpensiveQueriesToPrepare(List<Class<? extends Query>> expensiveQueries) {
this.expensiveQueries = expensiveQueries;
}

public boolean isExpensive(Query query) {
Visitor visitor = new Visitor();
query.visit(visitor);
return visitor.expensive;
}

private class Visitor extends QueryVisitor {
boolean expensive = false;

@Override
public QueryVisitor getSubVisitor(Occur occur, Query parent) {
if (expensive) {
return QueryVisitor.EMPTY_VISITOR;
}
// The default behavior is to ignore occur == NONE, but we want them.
return this;
}

@Override
public void visitLeaf(Query query) {
expensive = expensive || isExpensive(query);
}

@Override
public void consumeTerms(Query query, Term... terms) {
visitLeaf(query);
}

@Override
public void consumeTermsMatching(Query query, String field, Supplier<ByteRunAutomaton> automaton) {
visitLeaf(query);
}

private boolean isExpensive(Query query) {
for (Class<? extends Query> clazz : expensiveQueries) {
if (clazz.isAssignableFrom(query.getClass())) {
return true;
}
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
if (false == FiltersAggregator.canUseFilterByFilter(parent, null)) {
return null;
}
if (context.topLevelIsExpensiveToPrepare()) {
return null;
}
boolean wholeNumbersOnly = false == ((ValuesSource.Numeric) valuesSourceConfig.getValuesSource()).isFloatingPoint();
List<QueryToFilterAdapter<?>> filters = new ArrayList<>(ranges.length);
for (int i = 0; i < ranges.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator {
order,
format,
bucketCountThresholds,
l -> true,
ALWAYS_TRUE,
context,
parent,
remapGlobalOrds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ Aggregator create(String name,
.getValuesSource();
SortedSetDocValues values = globalOrdsValues(context, ordinalsValuesSource);
long maxOrd = values.getValueCount();
if (maxOrd > 0 && maxOrd <= MAX_ORDS_TO_TRY_FILTERS) {
if (maxOrd > 0 && maxOrd <= MAX_ORDS_TO_TRY_FILTERS && false == context.topLevelIsExpensiveToPrepare()) {
StringTermsAggregatorFromFilters adapted = StringTermsAggregatorFromFilters.adaptIntoFiltersOrNull(
name,
factories,
Expand Down
Loading