Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the composite aggregation for match_all and range queries #28745

Merged
merged 24 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d46143c
Optimize the composite aggregation for match_all and range queries
jimczi Feb 16, 2018
6a8e866
fix checkstyle
jimczi Feb 20, 2018
0bc679e
handle null point values
jimczi Feb 20, 2018
0581226
restore global ord execution for normal execution
jimczi Feb 20, 2018
8fd25e1
add missing change
jimczi Feb 20, 2018
0d32ab0
fix checkstyle
jimczi Feb 20, 2018
32f0904
fix global ord comparaison
jimczi Feb 20, 2018
0634551
Merge branch 'master' into composite_sort_optim
jimczi Feb 21, 2018
a7f8ffe
add tests for the composite queue and address review comments
jimczi Feb 22, 2018
35c017e
Merge branch 'master' into composite_sort_optim
jimczi Feb 22, 2018
841118c
cosmetics
jimczi Feb 22, 2018
6445f23
adapt heuristic to disable sorted docs producer
jimczi Feb 22, 2018
4437f2e
protect against empty reader
jimczi Feb 22, 2018
cdba4c2
Add missing license
jimczi Feb 22, 2018
93e3345
refactor the composite source to create the sorted docs producer dire…
jimczi Feb 22, 2018
cc6539c
Merge branch 'master' into composite_sort_optim
jimczi Feb 22, 2018
fc91434
fail composite agg that contains an unmapped field and no missing value
jimczi Feb 23, 2018
f9d1eeb
implement deferring collection directly in the collector
jimczi Mar 11, 2018
f2588f2
Merge branch 'master' into composite_sort_optim
jimczi Mar 11, 2018
eb61b02
line len
jimczi Mar 12, 2018
f22dd2a
Merge branch 'master' into composite_sort_optim
jimczi Mar 19, 2018
8bf9703
more javadocs and cleanups
jimczi Mar 19, 2018
e58e540
make sure that the cost is within the integer range when building the…
jimczi Mar 23, 2018
1d71d98
Merge branch 'master' into composite_sort_optim
jimczi Mar 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 0 additions & 85 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -545,88 +545,3 @@ GET /_search
}
--------------------------------------------------
// TESTRESPONSE[s/\.\.\.//]

==== Index sorting

By default this aggregation runs on every document that match the query.
Though if the index sort matches the composite sort this aggregation can optimize
the execution and can skip documents that contain composite buckets that would not
be part of the response.

For instance the following aggregations:

[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "asc" } } },
{ "product": { "terms": { "field": "product", "order": "asc" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE

\... is much faster on an index that uses the following sort:

[source,js]
--------------------------------------------------
PUT twitter
{
"settings" : {
"index" : {
"sort.field" : ["timestamp", "product"],
"sort.order" : ["asc", "asc"]
}
},
"mappings": {
"sales": {
"properties": {
"timestamp": {
"type": "date"
},
"product": {
"type": "keyword"
}
}
}
}
}
--------------------------------------------------
// CONSOLE

WARNING: The optimization takes effect only if the fields used for sorting are single-valued and follow
the same order as the aggregation (`desc` or `asc`).

If only the aggregation results are needed it is also better to set the size of the query to 0
and `track_total_hits` to false in order to remove other slowing factors:

[source,js]
--------------------------------------------------
GET /_search
{
"size": 0,
"track_total_hits": false,
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } },
{ "product": { "terms": { "field": "product" } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE

See <<index-modules-index-sorting, index sorting>> for more details.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ setup:
- do:
search:
index: test
allow_partial_search_results: false
body:
aggregations:
test:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.composite;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.aggregations.LeafBucketCollector;

import java.io.IOException;

/**
* A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}).
*/
class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
private final CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc;
private final BytesRef[] values;
private BytesRef currentValue;

BinaryValuesSource(MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
int size, int reverseMul) {
super(fieldType, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.values = new BytesRef[size];
}

@Override
String type() {
return "binary";
}

@Override
public void copyCurrent(int slot) {
values[slot] = BytesRef.deepCopyOf(currentValue);
}

@Override
public int compare(int from, int to) {
return compareValues(values[from], values[to]);
}

@Override
int compareCurrent(int slot) {
return compareValues(currentValue, values[slot]);
}

@Override
int compareCurrentWithAfter() {
return compareValues(currentValue, afterValue);
}

int compareValues(BytesRef v1, BytesRef v2) {
return v1.compareTo(v2) * reverseMul;
}

@Override
void setAfter(Comparable<?> value) {
if (value.getClass() == BytesRef.class) {
afterValue = (BytesRef) value;
} else if (value.getClass() == String.class) {
afterValue = new BytesRef((String) value);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to accept both BytesRef and String?

throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
}
}

@Override
BytesRef toComparable(int slot) {
return values[slot];
}

@Override
LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException {
final SortedBinaryDocValues dvs = docValuesFunc.apply(context);
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
currentValue = dvs.nextValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means currentValue will always be the higher value in case of a multi-valued field, is that ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentValue is only valid for the current composite bucket, next.collect() below will fill the other sources's currentValue and the last collector in the chain will check if the final composite bucket should be added in the queue. We don't use currentValue outside of these recursive calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks.

next.collect(doc, bucket);
}
}
}
};
}

@Override
LeafBucketCollector getLeafCollector(Comparable<?> value, LeafReaderContext context, LeafBucketCollector next) {
if (value.getClass() != BytesRef.class) {
throw new IllegalArgumentException("Expected BytesRef, got " + value.getClass());
}
final BytesRef filterValue = (BytesRef) value;
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
currentValue = filterValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to set it for every doc?

next.collect(doc, bucket);
}
};
}

@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
return null;
}
return new TermsSortedDocsProducer(fieldType.name());
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -154,16 +150,9 @@ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<
if (parent != null) {
throw new IllegalArgumentException("[composite] aggregation cannot be used with a parent aggregation");
}
final QueryShardContext shardContext = context.getQueryShardContext();
CompositeValuesSourceConfig[] configs = new CompositeValuesSourceConfig[sources.size()];
SortField[] sortFields = new SortField[configs.length];
IndexSortConfig indexSortConfig = shardContext.getIndexSettings().getIndexSortConfig();
if (indexSortConfig.hasIndexSort()) {
Sort sort = indexSortConfig.buildIndexSort(shardContext::fieldMapper, shardContext::getForField);
System.arraycopy(sort.getSort(), 0, sortFields, 0, sortFields.length);
}
for (int i = 0; i < configs.length; i++) {
configs[i] = sources.get(i).build(context, i, configs.length, sortFields[i]);
configs[i] = sources.get(i).build(context);
if (configs[i].valuesSource().needsScores()) {
throw new IllegalArgumentException("[sources] cannot access _score");
}
Expand Down

This file was deleted.

Loading