Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setting to disable aggs optimization (backport of #73620) #73670

Merged
merged 2 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ public Version indexVersionCreated() {
return Version.CURRENT;
}

@Override
public boolean enableRewriteToFilterByFilter() {
return true;
}

@Override
public void close() {
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
setup:
- do:
cluster.put_settings:
body:
persistent:
search.aggs.rewrite_to_filter_by_filter: false

---
teardown:
- do:
cluster.put_settings:
body:
persistent:
search.aggs.rewrite_to_filter_by_filter: null

---
does not use optimization:
- skip:
version: " - 7.13.2"
reason: setting to disable optimization added in 7.13.2
- do:
bulk:
index: test
refresh: true
body: |
{ "index": {} }
{ "str": "sheep" }
{ "index": {} }
{ "str": "sheep" }
{ "index": {} }
{ "str": "cow" }
{ "index": {} }
{ "str": "pig" }

- do:
search:
index: test
body:
profile: true
size: 0
aggs:
str_terms:
terms:
field: str.keyword
- match: { aggregations.str_terms.buckets.0.key: sheep }
- match: { aggregations.str_terms.buckets.1.key: cow }
- match: { aggregations.str_terms.buckets.2.key: pig }
- match: { profile.shards.0.aggregations.0.type: GlobalOrdinalsStringTermsAggregator }
- match: { profile.shards.0.aggregations.0.description: str_terms }
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
Expand All @@ -29,6 +31,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static io.github.nik9000.mapmatcher.MapMatcher.assertMap;
import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.diversifiedSampler;
Expand Down Expand Up @@ -601,7 +605,7 @@ public void testFilterByFilter() throws InterruptedException, IOException {
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
assertThat(profileResults.size(), equalTo(getNumShards("dateidx").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
Expand Down Expand Up @@ -651,4 +655,88 @@ public void testFilterByFilter() throws InterruptedException, IOException {
assertThat(queryDebug, hasEntry("query", "ConstantScore(DocValuesFieldExistsQuery [field=date])"));
}
}

public void testDateHistogramFilterByFilterDisabled() throws InterruptedException, IOException {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER.getKey(), false))
);
try {
assertAcked(
client().admin()
.indices()
.prepareCreate("date_filter_by_filter_disabled")
.setSettings(org.elasticsearch.common.collect.Map.of("number_of_shards", 1, "number_of_replicas", 0))
.addMapping("_doc", "date", "type=date", "keyword", "type=keyword")
.get()
);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < RangeAggregator.DOCS_PER_RANGE_TO_USE_FILTERS * 2; i++) {
String date = Instant.ofEpochSecond(i).toString();
builders.add(
client().prepareIndex("date_filter_by_filter_disabled", "_doc")
.setSource(
jsonBuilder().startObject()
.field("date", date)
.endObject()
)
);
}
indexRandom(true, false, builders);

SearchResponse response = client().prepareSearch("date_filter_by_filter_disabled")
.setProfile(true)
.addAggregation(new DateHistogramAggregationBuilder("histo").field("date").calendarInterval(DateHistogramInterval.MONTH))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("date_filter_by_filter_disabled").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(1));
ProfileResult histoAggResult = aggProfileResultsList.get(0);
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("DateHistogramAggregator.FromDateRange"));
assertThat(histoAggResult.getLuceneDescription(), equalTo("histo"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertMap(
breakdown,
matchesMap().entry(INITIALIZE, greaterThan(0L))
.entry(INITIALIZE + "_count", greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR, greaterThan(0L))
.entry(BUILD_LEAF_COLLECTOR + "_count", greaterThan(0L))
.entry(COLLECT, greaterThan(0L))
.entry(COLLECT + "_count", greaterThan(0L))
.entry(POST_COLLECTION, greaterThan(0L))
.entry(POST_COLLECTION + "_count", 1L)
.entry(BUILD_AGGREGATION, greaterThan(0L))
.entry(BUILD_AGGREGATION + "_count", greaterThan(0L))
.entry(REDUCE, 0L)
.entry(REDUCE + "_count", 0L)
);
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertMap(
debug,
matchesMap().entry("delegate", "RangeAggregator.NoOverlap")
.entry("delegate_debug", matchesMap().entry("ranges", 1).entry("average_docs_per_range", 10000.0))
);
}
} finally {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull(SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER.getKey()))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public void apply(Settings value, Settings current, Settings previous) {
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_ATTRIBUTES,
Expand Down
20 changes: 19 additions & 1 deletion server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT =
Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope);

public static final Setting<Boolean> ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER = Setting.boolSetting(
"search.aggs.rewrite_to_filter_by_filter",
true,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down Expand Up @@ -197,6 +204,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile int maxOpenScrollContext;

private volatile boolean enableRewriteAggsToFilterByFilter;

private final Cancellable keepAliveReaper;

private final AtomicLong idGenerator = new AtomicLong();
Expand Down Expand Up @@ -243,6 +252,10 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);

enableRewriteAggsToFilterByFilter = ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER, this::setEnableRewriteAggsToFilterByFilter);
}

private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -279,6 +292,10 @@ private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

private void setEnableRewriteAggsToFilterByFilter(boolean enableRewriteAggsToFilterByFilter) {
this.enableRewriteAggsToFilterByFilter = enableRewriteAggsToFilterByFilter;
}

@Override
public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRemovalReason reason) {
// once an index is removed due to deletion or closing, we can just clean up all the pending search context information
Expand Down Expand Up @@ -984,7 +1001,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.indexShard().shardId().hashCode(),
context::getRelativeTimeInMillis,
context::isCancelled,
context::buildFilteredQuery
context::buildFilteredQuery,
enableRewriteAggsToFilterByFilter
);
context.addReleasable(aggContext);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
if (false == FiltersAggregator.canUseFilterByFilter(parent, null)) {
return null;
}
if (false == context.enableRewriteToFilterByFilter()) {
return null;
}
boolean wholeNumbersOnly = false == ((ValuesSource.Numeric) valuesSourceConfig.getValuesSource()).isFloatingPoint();
List<QueryToFilterAdapter<?>> filters = new ArrayList<>(ranges.length);
for (int i = 0; i < ranges.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator {
order,
format,
bucketCountThresholds,
l -> true,
ALWAYS_TRUE,
context,
parent,
remapGlobalOrds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ Aggregator create(String name,
.getValuesSource();
SortedSetDocValues values = globalOrdsValues(context, ordinalsValuesSource);
long maxOrd = values.getValueCount();
if (maxOrd > 0 && maxOrd <= MAX_ORDS_TO_TRY_FILTERS) {
if (maxOrd > 0 && maxOrd <= MAX_ORDS_TO_TRY_FILTERS && context.enableRewriteToFilterByFilter()) {
StringTermsAggregatorFromFilters adapted = StringTermsAggregatorFromFilters.adaptIntoFiltersOrNull(
name,
factories,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator.FilterByFilter;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.profile.aggregation.AggregationProfiler;
Expand Down Expand Up @@ -247,6 +248,16 @@ public final AggregationUsageService getUsageService() {
*/
public abstract boolean isCacheable();

/**
* Are aggregations allowed to try to rewrite themselves into
* {@link FilterByFilter} aggregations? <strong>Often</strong>
* {@linkplain FilterByFilter} is faster to execute, but it isn't
* always. For now this just hooks into a cluster level setting
* so users can disable the behavior when the existing heuristics
* don't detect cases where its slower.
*/
public abstract boolean enableRewriteToFilterByFilter();

/**
* Implementation of {@linkplain AggregationContext} for production usage
* that wraps our ubiquitous {@link SearchExecutionContext} and anything else
Expand All @@ -267,6 +278,7 @@ public static class ProductionAggregationContext extends AggregationContext {
private final LongSupplier relativeTimeInMillis;
private final Supplier<Boolean> isCancelled;
private final Function<Query, Query> filterQuery;
private final boolean enableRewriteToFilterByFilter;

private final List<Aggregator> releaseMe = new ArrayList<>();

Expand All @@ -282,7 +294,8 @@ public ProductionAggregationContext(
int randomSeed,
LongSupplier relativeTimeInMillis,
Supplier<Boolean> isCancelled,
Function<Query, Query> filterQuery
Function<Query, Query> filterQuery,
boolean enableRewriteToFilterByFilter
) {
this.context = context;
if (bytesToPreallocate == 0) {
Expand Down Expand Up @@ -313,6 +326,7 @@ public ProductionAggregationContext(
this.relativeTimeInMillis = relativeTimeInMillis;
this.isCancelled = isCancelled;
this.filterQuery = filterQuery;
this.enableRewriteToFilterByFilter = enableRewriteToFilterByFilter;
}

@Override
Expand Down Expand Up @@ -474,6 +488,11 @@ public boolean isCacheable() {
return context.isCacheable();
}

@Override
public boolean enableRewriteToFilterByFilter() {
return enableRewriteToFilterByFilter;
}

@Override
public void close() {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -841,7 +844,7 @@ private <T> void termsAggregator(ValueType valueType, MappedFieldType fieldType,
.size(numTerms)
.collectMode(randomFrom(Aggregator.SubAggCollectionMode.values()))
.field("field"));
context = createAggregationContext(indexSearcher, null, fieldType, filterFieldType);
context = createAggregationContext(indexSearcher, new MatchAllDocsQuery(), fieldType, filterFieldType);
aggregator = createAggregator(aggregationBuilder, context);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
Expand Down Expand Up @@ -1026,7 +1029,7 @@ public void testUnmappedWithMissing() throws Exception {
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
.userValueTypeHint(valueTypes[i])
.field(fieldNames[i]).missing(missingValues[i]);
AggregationContext context = createAggregationContext(indexSearcher, null, fieldType1);
AggregationContext context = createAggregationContext(indexSearcher, new MatchAllDocsQuery(), fieldType1);
Aggregator aggregator = createAggregator(aggregationBuilder, context);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
Expand Down Expand Up @@ -1776,7 +1779,10 @@ public void testWithFilterAndPreciseSize() throws IOException {
* would trigger that bug.
*/
builder.size(2).order(BucketOrder.key(true));
Query topLevel = new TermInSetQuery("k", new BytesRef[] {new BytesRef("b"), new BytesRef("c")});
Query topLevel = new BooleanQuery.Builder()
.add(new TermQuery(new Term("k", "b")), Occur.SHOULD)
.add(new TermQuery(new Term("k", "c")), Occur.SHOULD)
.build();
testCase(builder, topLevel, buildIndex, (StringTerms terms) -> {
assertThat(
terms.getBuckets().stream().map(StringTerms.Bucket::getKey).collect(toList()),
Expand Down
1 change: 1 addition & 0 deletions test/framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
api "commons-codec:commons-codec:${versions.commonscodec}"
api "org.elasticsearch:securemock:${versions.securemock}"
api "org.elasticsearch:mocksocket:${versions.mocksocket}"
api "io.github.nik9000:mapmatcher:0.0.2"

// json schema validation dependencies
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
Expand Down
Loading