diff --git a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc index 5d46a7f4c4a99..688cf20c5320f 100644 --- a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc @@ -545,88 +545,3 @@ GET /_search } -------------------------------------------------- // TESTRESPONSE[s/\.\.\.//] - -==== Index sorting - -By default this aggregation runs on every document that match the query. -Though if the index sort matches the composite sort this aggregation can optimize -the execution and can skip documents that contain composite buckets that would not -be part of the response. - -For instance the following aggregations: - -[source,js] --------------------------------------------------- -GET /_search -{ - "aggs" : { - "my_buckets": { - "composite" : { - "size": 2, - "sources" : [ - { "date": { "date_histogram": { "field": "timestamp", "interval": "1d", "order": "asc" } } }, - { "product": { "terms": { "field": "product", "order": "asc" } } } - ] - } - } - } -} --------------------------------------------------- -// CONSOLE - -\... is much faster on an index that uses the following sort: - -[source,js] --------------------------------------------------- -PUT twitter -{ - "settings" : { - "index" : { - "sort.field" : ["timestamp", "product"], - "sort.order" : ["asc", "asc"] - } - }, - "mappings": { - "sales": { - "properties": { - "timestamp": { - "type": "date" - }, - "product": { - "type": "keyword" - } - } - } - } -} --------------------------------------------------- -// CONSOLE - -WARNING: The optimization takes effect only if the fields used for sorting are single-valued and follow -the same order as the aggregation (`desc` or `asc`). - -If only the aggregation results are needed it is also better to set the size of the query to 0 -and `track_total_hits` to false in order to remove other slowing factors: - -[source,js] --------------------------------------------------- -GET /_search -{ - "size": 0, - "track_total_hits": false, - "aggs" : { - "my_buckets": { - "composite" : { - "size": 2, - "sources" : [ - { "date": { "date_histogram": { "field": "timestamp", "interval": "1d" } } }, - { "product": { "terms": { "field": "product" } } } - ] - } - } - } -} --------------------------------------------------- -// CONSOLE - -See <> for more details. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml index b8c89517ec119..10fc9f33beb8b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml @@ -99,6 +99,7 @@ setup: - do: search: index: test + allow_partial_search_results: false body: aggregations: test: diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java new file mode 100644 index 0000000000000..cd46b90889d49 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; + +/** + * A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}). + */ +class BinaryValuesSource extends SingleDimensionValuesSource { + private final CheckedFunction docValuesFunc; + private final BytesRef[] values; + private BytesRef currentValue; + + BinaryValuesSource(MappedFieldType fieldType, CheckedFunction docValuesFunc, + int size, int reverseMul) { + super(fieldType, size, reverseMul); + this.docValuesFunc = docValuesFunc; + this.values = new BytesRef[size]; + } + + @Override + public void copyCurrent(int slot) { + values[slot] = BytesRef.deepCopyOf(currentValue); + } + + @Override + public int compare(int from, int to) { + return compareValues(values[from], values[to]); + } + + @Override + int compareCurrent(int slot) { + return compareValues(currentValue, values[slot]); + } + + @Override + int compareCurrentWithAfter() { + return compareValues(currentValue, afterValue); + } + + int compareValues(BytesRef v1, BytesRef v2) { + return v1.compareTo(v2) * reverseMul; + } + + @Override + void setAfter(Comparable value) { + if (value.getClass() == BytesRef.class) { + afterValue = (BytesRef) value; + } else if (value.getClass() == String.class) { + afterValue = new BytesRef((String) value); + } else { + throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName()); + } + } + + @Override + BytesRef toComparable(int slot) { + return values[slot]; + } + + @Override + LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException { + final SortedBinaryDocValues dvs = docValuesFunc.apply(context); + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + if (dvs.advanceExact(doc)) { + int num = dvs.docValueCount(); + for (int i = 0; i < num; i++) { + currentValue = dvs.nextValue(); + next.collect(doc, bucket); + } + } + } + }; + } + + @Override + LeafBucketCollector getLeafCollector(Comparable value, LeafReaderContext context, LeafBucketCollector next) { + if (value.getClass() != BytesRef.class) { + throw new IllegalArgumentException("Expected BytesRef, got " + value.getClass()); + } + currentValue = (BytesRef) value; + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + next.collect(doc, bucket); + } + }; + } + + @Override + SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { + if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || + (query != null && query.getClass() != MatchAllDocsQuery.class)) { + return null; + } + return new TermsSortedDocsProducer(fieldType.name()); + } + + @Override + public void close() {} +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java index b54371dce62ed..0912555ea711b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java @@ -19,16 +19,12 @@ package org.elasticsearch.search.aggregations.bucket.composite; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.IndexSortConfig; -import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -154,16 +150,9 @@ protected AggregatorFactory doBuild(SearchContext context, AggregatorFactory< if (parent != null) { throw new IllegalArgumentException("[composite] aggregation cannot be used with a parent aggregation"); } - final QueryShardContext shardContext = context.getQueryShardContext(); CompositeValuesSourceConfig[] configs = new CompositeValuesSourceConfig[sources.size()]; - SortField[] sortFields = new SortField[configs.length]; - IndexSortConfig indexSortConfig = shardContext.getIndexSettings().getIndexSortConfig(); - if (indexSortConfig.hasIndexSort()) { - Sort sort = indexSortConfig.buildIndexSort(shardContext::fieldMapper, shardContext::getForField); - System.arraycopy(sort.getSort(), 0, sortFields, 0, sortFields.length); - } for (int i = 0; i < configs.length; i++) { - configs[i] = sources.get(i).build(context, i, configs.length, sortFields[i]); + configs[i] = sources.get(i).build(context); if (configs[i].valuesSource().needsScores()) { throw new IllegalArgumentException("[sources] cannot access _score"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationPlugin.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationPlugin.java deleted file mode 100644 index bd49acbe4da0b..0000000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationPlugin.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.aggregations.bucket.composite; - -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.SearchPlugin; - -import java.util.Arrays; -import java.util.List; - -public class CompositeAggregationPlugin extends Plugin implements SearchPlugin { - @Override - public List getAggregations() { - return Arrays.asList( - new AggregationSpec(CompositeAggregationBuilder.NAME, CompositeAggregationBuilder::new, CompositeAggregationBuilder::parse) - .addResultReader(InternalComposite::new) - ); - } -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index 830aba3bcf1e1..04864e7419def 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -19,22 +19,29 @@ package org.elasticsearch.search.aggregations.bucket.composite; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSet; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.MultiCollector; import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.RoaringDocIdSet; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -43,97 +50,74 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.stream.Collectors; final class CompositeAggregator extends BucketsAggregator { private final int size; - private final CompositeValuesSourceConfig[] sources; + private final SortedDocsProducer sortedDocsProducer; private final List sourceNames; + private final int[] reverseMuls; private final List formats; - private final boolean canEarlyTerminate; - private final TreeMap keys; - private final CompositeValuesComparator array; + private final CompositeValuesCollectorQueue queue; - private final List contexts = new ArrayList<>(); - private LeafContext leaf; - private RoaringDocIdSet.Builder builder; + private final List entries; + private LeafReaderContext currentLeaf; + private RoaringDocIdSet.Builder docIdSetBuilder; + private BucketCollector deferredCollectors; CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, - List pipelineAggregators, Map metaData, - int size, CompositeValuesSourceConfig[] sources, CompositeKey rawAfterKey) throws IOException { + List pipelineAggregators, Map metaData, + int size, CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey) throws IOException { super(name, factories, context, parent, pipelineAggregators, metaData); this.size = size; - this.sources = sources; - this.sourceNames = Arrays.stream(sources).map(CompositeValuesSourceConfig::name).collect(Collectors.toList()); - this.formats = Arrays.stream(sources).map(CompositeValuesSourceConfig::format).collect(Collectors.toList()); - // we use slot 0 to fill the current document (size+1). - this.array = new CompositeValuesComparator(context.searcher().getIndexReader(), sources, size+1); + this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList()); + this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray(); + this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList()); + final SingleDimensionValuesSource[] sources = + createValuesSources(context.bigArrays(), context.searcher().getIndexReader(), context.query(), sourceConfigs, size); + this.queue = new CompositeValuesCollectorQueue(sources, size); + this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query()); if (rawAfterKey != null) { - array.setTop(rawAfterKey.values()); + queue.setAfter(rawAfterKey.values()); } - this.keys = new TreeMap<>(array::compare); - this.canEarlyTerminate = Arrays.stream(sources) - .allMatch(CompositeValuesSourceConfig::canEarlyTerminate); + this.entries = new ArrayList<>(); } - boolean canEarlyTerminate() { - return canEarlyTerminate; + @Override + protected void doClose() { + Releasables.close(queue); + } + + @Override + protected void doPreCollection() throws IOException { + List collectors = Arrays.asList(subAggregators); + deferredCollectors = BucketCollector.wrap(collectors); + collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR; } - private int[] getReverseMuls() { - return Arrays.stream(sources).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray(); + @Override + protected void doPostCollection() throws IOException { + finishLeaf(); } @Override public InternalAggregation buildAggregation(long zeroBucket) throws IOException { assert zeroBucket == 0L; - consumeBucketsAndMaybeBreak(keys.size()); + consumeBucketsAndMaybeBreak(queue.size()); - // Replay all documents that contain at least one top bucket (collected during the first pass). - grow(keys.size()+1); - final boolean needsScores = needsScores(); - Weight weight = null; - if (needsScores) { - Query query = context.query(); - weight = context.searcher().createNormalizedWeight(query, true); - } - for (LeafContext context : contexts) { - DocIdSetIterator docIdSetIterator = context.docIdSet.iterator(); - if (docIdSetIterator == null) { - continue; - } - final CompositeValuesSource.Collector collector = - array.getLeafCollector(context.ctx, getSecondPassCollector(context.subCollector)); - int docID; - DocIdSetIterator scorerIt = null; - if (needsScores) { - Scorer scorer = weight.scorer(context.ctx); - // We don't need to check if the scorer is null - // since we are sure that there are documents to replay (docIdSetIterator it not empty). - scorerIt = scorer.iterator(); - context.subCollector.setScorer(scorer); - } - while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - if (needsScores) { - assert scorerIt.docID() < docID; - scorerIt.advance(docID); - // aggregations should only be replayed on matching documents - assert scorerIt.docID() == docID; - } - collector.collect(docID); - } + if (deferredCollectors != NO_OP_COLLECTOR) { + // Replay all documents that contain at least one top bucket (collected during the first pass). + runDeferredCollections(); } - int num = Math.min(size, keys.size()); + int num = Math.min(size, queue.size()); final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; - final int[] reverseMuls = getReverseMuls(); int pos = 0; - for (int slot : keys.keySet()) { - CompositeKey key = array.toCompositeKey(slot); + for (int slot : queue.getSortedSlot()) { + CompositeKey key = queue.toCompositeKey(slot); InternalAggregations aggs = bucketAggregations(slot); - int docCount = bucketDocCount(slot); + int docCount = queue.getDocCount(slot); buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs); } CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null; @@ -143,125 +127,179 @@ public InternalAggregation buildAggregation(long zeroBucket) throws IOException @Override public InternalAggregation buildEmptyAggregation() { - final int[] reverseMuls = getReverseMuls(); return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, pipelineAggregators(), metaData()); } - @Override - protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - if (leaf != null) { - leaf.docIdSet = builder.build(); - contexts.add(leaf); + private void finishLeaf() { + if (currentLeaf != null) { + DocIdSet docIdSet = docIdSetBuilder.build(); + entries.add(new Entry(currentLeaf, docIdSet)); + currentLeaf = null; + docIdSetBuilder = null; } - leaf = new LeafContext(ctx, sub); - builder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); - final CompositeValuesSource.Collector inner = array.getLeafCollector(ctx, getFirstPassCollector()); - return new LeafBucketCollector() { - @Override - public void collect(int doc, long zeroBucket) throws IOException { - assert zeroBucket == 0L; - inner.collect(doc); - } - }; } @Override - protected void doPostCollection() throws IOException { - if (leaf != null) { - leaf.docIdSet = builder.build(); - contexts.add(leaf); + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + finishLeaf(); + boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; + if (sortedDocsProducer != null) { + /** + * The producer will visit documents sorted by the leading source of the composite definition + * and terminates when the leading source value is guaranteed to be greater than the lowest + * composite bucket in the queue. + */ + DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet); + if (fillDocIdSet) { + entries.add(new Entry(ctx, docIdSet)); + } + + /** + * We can bypass search entirely for this segment, all the processing has been done in the previous call. + * Throwing this exception will terminate the execution of the search for this root aggregation, + * see {@link MultiCollector} for more details on how we handle early termination in aggregations. + */ + throw new CollectionTerminatedException(); + } else { + if (fillDocIdSet) { + currentLeaf = ctx; + docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); + } + final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder)); + return new LeafBucketCollector() { + @Override + public void collect(int doc, long zeroBucket) throws IOException { + assert zeroBucket == 0L; + inner.collect(doc); + } + }; } } /** - * The first pass selects the top N composite buckets from all matching documents. - * It also records all doc ids that contain a top N composite bucket in a {@link RoaringDocIdSet} in order to be - * able to replay the collection filtered on the best buckets only. + * The first pass selects the top composite buckets from all matching documents. */ - private CompositeValuesSource.Collector getFirstPassCollector() { - return new CompositeValuesSource.Collector() { + private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder) { + return new LeafBucketCollector() { int lastDoc = -1; @Override - public void collect(int doc) throws IOException { - - // Checks if the candidate key in slot 0 is competitive. - if (keys.containsKey(0)) { - // This key is already in the top N, skip it for now. - if (doc != lastDoc) { + public void collect(int doc, long bucket) throws IOException { + int slot = queue.addIfCompetitive(); + if (slot != -1) { + if (builder != null && lastDoc != doc) { builder.add(doc); lastDoc = doc; } - return; - } - if (array.hasTop() && array.compareTop(0) <= 0) { - // This key is greater than the top value collected in the previous round. - if (canEarlyTerminate) { - // The index sort matches the composite sort, we can early terminate this segment. - throw new CollectionTerminatedException(); - } - // just skip this key for now - return; - } - if (keys.size() >= size) { - // The tree map is full, check if the candidate key should be kept. - if (array.compare(0, keys.lastKey()) > 0) { - // The candidate key is not competitive - if (canEarlyTerminate) { - // The index sort matches the composite sort, we can early terminate this segment. - throw new CollectionTerminatedException(); - } - // just skip this key - return; - } } + } + }; + } - // The candidate key is competitive - final int newSlot; - if (keys.size() >= size) { - // the tree map is full, we replace the last key with this candidate. - int slot = keys.pollLastEntry().getKey(); - // and we recycle the deleted slot - newSlot = slot; - } else { - newSlot = keys.size() + 1; + /** + * Replay the documents that might contain a top bucket and pass top buckets to + * the {@link this#deferredCollectors}. + */ + private void runDeferredCollections() throws IOException { + final boolean needsScores = needsScores(); + Weight weight = null; + if (needsScores) { + Query query = context.query(); + weight = context.searcher().createNormalizedWeight(query, true); + } + deferredCollectors.preCollection(); + for (Entry entry : entries) { + DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator(); + if (docIdSetIterator == null) { + continue; + } + final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context); + final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector)); + DocIdSetIterator scorerIt = null; + if (needsScores) { + Scorer scorer = weight.scorer(entry.context); + if (scorer != null) { + scorerIt = scorer.iterator(); + subCollector.setScorer(scorer); } - // move the candidate key to its new slot. - array.move(0, newSlot); - keys.put(newSlot, newSlot); - if (doc != lastDoc) { - builder.add(doc); - lastDoc = doc; + } + int docID; + while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + if (needsScores) { + assert scorerIt != null && scorerIt.docID() < docID; + scorerIt.advance(docID); + // aggregations should only be replayed on matching documents + assert scorerIt.docID() == docID; } + collector.collect(docID); } - }; + } + deferredCollectors.postCollection(); } - /** - * The second pass delegates the collection to sub-aggregations but only if the collected composite bucket is a top bucket (selected - * in the first pass). + * Replay the top buckets from the matching documents. */ - private CompositeValuesSource.Collector getSecondPassCollector(LeafBucketCollector subCollector) throws IOException { - return doc -> { - Integer bucket = keys.get(0); - if (bucket != null) { - // The candidate key in slot 0 is a top bucket. - // We can defer the collection of this document/bucket to the sub collector - collectExistingBucket(subCollector, doc, bucket); + private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollector) { + return new LeafBucketCollector() { + @Override + public void collect(int doc, long zeroBucket) throws IOException { + assert zeroBucket == 0; + Integer slot = queue.compareCurrent(); + if (slot != null) { + // The candidate key is a top bucket. + // We can defer the collection of this document/bucket to the sub collector + subCollector.collect(doc, slot); + } } }; } - static class LeafContext { - final LeafReaderContext ctx; - final LeafBucketCollector subCollector; - DocIdSet docIdSet; + private static SingleDimensionValuesSource[] createValuesSources(BigArrays bigArrays, IndexReader reader, Query query, + CompositeValuesSourceConfig[] configs, int size) { + final SingleDimensionValuesSource[] sources = new SingleDimensionValuesSource[configs.length]; + for (int i = 0; i < sources.length; i++) { + final int reverseMul = configs[i].reverseMul(); + if (configs[i].valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) { + ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) configs[i].valuesSource(); + sources[i] = new GlobalOrdinalValuesSource(bigArrays, configs[i].fieldType(), vs::globalOrdinalsValues, size, reverseMul); + if (i == 0 && sources[i].createSortedDocsProducerOrNull(reader, query) != null) { + // this the leading source and we can optimize it with the sorted docs producer but + // we don't want to use global ordinals because the number of visited documents + // should be low and global ordinals need one lookup per visited term. + Releasables.close(sources[i]); + sources[i] = new BinaryValuesSource(configs[i].fieldType(), vs::bytesValues, size, reverseMul); + } + } else if (configs[i].valuesSource() instanceof ValuesSource.Bytes) { + ValuesSource.Bytes vs = (ValuesSource.Bytes) configs[i].valuesSource(); + sources[i] = new BinaryValuesSource(configs[i].fieldType(), vs::bytesValues, size, reverseMul); + } else if (configs[i].valuesSource() instanceof ValuesSource.Numeric) { + final ValuesSource.Numeric vs = (ValuesSource.Numeric) configs[i].valuesSource(); + if (vs.isFloatingPoint()) { + sources[i] = new DoubleValuesSource(bigArrays, configs[i].fieldType(), vs::doubleValues, size, reverseMul); + } else { + if (vs instanceof RoundingValuesSource) { + sources[i] = new LongValuesSource(bigArrays, configs[i].fieldType(), vs::longValues, + ((RoundingValuesSource) vs)::round, configs[i].format(), size, reverseMul); + } else { + sources[i] = new LongValuesSource(bigArrays, configs[i].fieldType(), vs::longValues, + (value) -> value, configs[i].format(), size, reverseMul); + } + } + } + } + return sources; + } + + private static class Entry { + final LeafReaderContext context; + final DocIdSet docIdSet; - LeafContext(LeafReaderContext ctx, LeafBucketCollector subCollector) { - this.ctx = ctx; - this.subCollector = subCollector; + Entry(LeafReaderContext context, DocIdSet docIdSet) { + this.context = context; + this.docIdSet = docIdSet; } } } + diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java new file mode 100644 index 0000000000000..5be4508612ece --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -0,0 +1,247 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.TreeMap; + +/** + * A specialized queue implementation for composite buckets + */ +final class CompositeValuesCollectorQueue implements Releasable { + // the slot for the current candidate + private static final int CANDIDATE_SLOT = Integer.MAX_VALUE; + + private final int maxSize; + private final TreeMap keys; + private final SingleDimensionValuesSource[] arrays; + private final int[] docCounts; + private boolean afterValueSet = false; + + /** + * Constructs a composite queue with the specified size and sources. + * + * @param sources The list of {@link CompositeValuesSourceConfig} to build the composite buckets. + * @param size The number of composite buckets to keep. + */ + CompositeValuesCollectorQueue(SingleDimensionValuesSource[] sources, int size) { + this.maxSize = size; + this.arrays = sources; + this.docCounts = new int[size]; + this.keys = new TreeMap<>(this::compare); + } + + void clear() { + keys.clear(); + Arrays.fill(docCounts, 0); + afterValueSet = false; + } + + /** + * The current size of the queue. + */ + int size() { + return keys.size(); + } + + /** + * Whether the queue is full or not. + */ + boolean isFull() { + return keys.size() == maxSize; + } + + /** + * Returns a sorted {@link Set} view of the slots contained in this queue. + */ + Set getSortedSlot() { + return keys.keySet(); + } + + /** + * Compares the current candidate with the values in the queue and returns + * the slot if the candidate is already in the queue or null if the candidate is not present. + */ + Integer compareCurrent() { + return keys.get(CANDIDATE_SLOT); + } + + /** + * Returns the lowest value (exclusive) of the leading source. + */ + Comparable getLowerValueLeadSource() { + return afterValueSet ? arrays[0].getAfter() : null; + } + + /** + * Returns the upper value (inclusive) of the leading source. + */ + Comparable getUpperValueLeadSource() throws IOException { + return size() >= maxSize ? arrays[0].toComparable(keys.lastKey()) : null; + } + /** + * Returns the document count in slot. + */ + int getDocCount(int slot) { + return docCounts[slot]; + } + + /** + * Copies the current value in slot. + */ + private void copyCurrent(int slot) { + for (int i = 0; i < arrays.length; i++) { + arrays[i].copyCurrent(slot); + } + docCounts[slot] = 1; + } + + /** + * Compares the values in slot1 with slot2. + */ + int compare(int slot1, int slot2) { + for (int i = 0; i < arrays.length; i++) { + int cmp = (slot1 == CANDIDATE_SLOT) ? arrays[i].compareCurrent(slot2) : + arrays[i].compare(slot1, slot2); + if (cmp != 0) { + return cmp; + } + } + return 0; + } + + /** + * Sets the after values for this comparator. + */ + void setAfter(Comparable[] values) { + assert values.length == arrays.length; + afterValueSet = true; + for (int i = 0; i < arrays.length; i++) { + arrays[i].setAfter(values[i]); + } + } + + /** + * Compares the after values with the values in slot. + */ + private int compareCurrentWithAfter() { + for (int i = 0; i < arrays.length; i++) { + int cmp = arrays[i].compareCurrentWithAfter(); + if (cmp != 0) { + return cmp; + } + } + return 0; + } + + /** + * Builds the {@link CompositeKey} for slot. + */ + CompositeKey toCompositeKey(int slot) throws IOException { + assert slot < maxSize; + Comparable[] values = new Comparable[arrays.length]; + for (int i = 0; i < values.length; i++) { + values[i] = arrays[i].toComparable(slot); + } + return new CompositeKey(values); + } + + /** + * Creates the collector that will visit the composite buckets of the matching documents. + * The provided collector in is called on each composite bucket. + */ + LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector in) throws IOException { + return getLeafCollector(null, context, in); + } + /** + * Creates the collector that will visit the composite buckets of the matching documents. + * If forceLeadSourceValue is not null, the leading source will use this value + * for each document. + * The provided collector in is called on each composite bucket. + */ + LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, + LeafReaderContext context, LeafBucketCollector in) throws IOException { + int last = arrays.length - 1; + LeafBucketCollector collector = in; + while (last > 0) { + collector = arrays[last--].getLeafCollector(context, collector); + } + if (forceLeadSourceValue != null) { + collector = arrays[last].getLeafCollector(forceLeadSourceValue, context, collector); + } else { + collector = arrays[last].getLeafCollector(context, collector); + } + return collector; + } + + /** + * Check if the current candidate should be added in the queue. + * @return The target slot of the candidate or -1 is the candidate is not competitive. + */ + int addIfCompetitive() { + // checks if the candidate key is competitive + Integer topSlot = compareCurrent(); + if (topSlot != null) { + // this key is already in the top N, skip it + docCounts[topSlot] += 1; + return topSlot; + } + if (afterValueSet && compareCurrentWithAfter() <= 0) { + // this key is greater than the top value collected in the previous round, skip it + return -1; + } + if (keys.size() >= maxSize) { + // the tree map is full, check if the candidate key should be kept + if (compare(CANDIDATE_SLOT, keys.lastKey()) > 0) { + // the candidate key is not competitive, skip it + return -1; + } + } + + // the candidate key is competitive + final int newSlot; + if (keys.size() >= maxSize) { + // the tree map is full, we replace the last key with this candidate + int slot = keys.pollLastEntry().getKey(); + // and we recycle the deleted slot + newSlot = slot; + } else { + newSlot = keys.size(); + assert newSlot < maxSize; + } + // move the candidate key to its new slot + copyCurrent(newSlot); + keys.put(newSlot, newSlot); + return newSlot; + } + + + @Override + public void close() { + Releasables.close(arrays); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesComparator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesComparator.java deleted file mode 100644 index 0ce87460a5429..0000000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesComparator.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.aggregations.bucket.composite; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.elasticsearch.search.aggregations.LeafBucketCollector; - -import java.io.IOException; - -import static org.elasticsearch.search.aggregations.support.ValuesSource.Numeric; -import static org.elasticsearch.search.aggregations.support.ValuesSource.Bytes; -import static org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.WithOrdinals; - -final class CompositeValuesComparator { - private final int size; - private final CompositeValuesSource[] arrays; - private boolean topValueSet = false; - - /** - * - * @param sources The list of {@link CompositeValuesSourceConfig} to build the composite buckets. - * @param size The number of composite buckets to keep. - */ - CompositeValuesComparator(IndexReader reader, CompositeValuesSourceConfig[] sources, int size) { - this.size = size; - this.arrays = new CompositeValuesSource[sources.length]; - for (int i = 0; i < sources.length; i++) { - final int reverseMul = sources[i].reverseMul(); - if (sources[i].valuesSource() instanceof WithOrdinals && reader instanceof DirectoryReader) { - WithOrdinals vs = (WithOrdinals) sources[i].valuesSource(); - arrays[i] = CompositeValuesSource.wrapGlobalOrdinals(vs, size, reverseMul); - } else if (sources[i].valuesSource() instanceof Bytes) { - Bytes vs = (Bytes) sources[i].valuesSource(); - arrays[i] = CompositeValuesSource.wrapBinary(vs, size, reverseMul); - } else if (sources[i].valuesSource() instanceof Numeric) { - final Numeric vs = (Numeric) sources[i].valuesSource(); - if (vs.isFloatingPoint()) { - arrays[i] = CompositeValuesSource.wrapDouble(vs, size, reverseMul); - } else { - arrays[i] = CompositeValuesSource.wrapLong(vs, sources[i].format(), size, reverseMul); - } - } - } - } - - /** - * Moves the values in slot1 to slot2. - */ - void move(int slot1, int slot2) { - assert slot1 < size && slot2 < size; - for (int i = 0; i < arrays.length; i++) { - arrays[i].move(slot1, slot2); - } - } - - /** - * Compares the values in slot1 with slot2. - */ - int compare(int slot1, int slot2) { - assert slot1 < size && slot2 < size; - for (int i = 0; i < arrays.length; i++) { - int cmp = arrays[i].compare(slot1, slot2); - if (cmp != 0) { - return cmp; - } - } - return 0; - } - - /** - * Returns true if a top value has been set for this comparator. - */ - boolean hasTop() { - return topValueSet; - } - - /** - * Sets the top values for this comparator. - */ - void setTop(Comparable[] values) { - assert values.length == arrays.length; - topValueSet = true; - for (int i = 0; i < arrays.length; i++) { - arrays[i].setTop(values[i]); - } - } - - /** - * Compares the top values with the values in slot. - */ - int compareTop(int slot) { - assert slot < size; - for (int i = 0; i < arrays.length; i++) { - int cmp = arrays[i].compareTop(slot); - if (cmp != 0) { - return cmp; - } - } - return 0; - } - - /** - * Builds the {@link CompositeKey} for slot. - */ - CompositeKey toCompositeKey(int slot) throws IOException { - assert slot < size; - Comparable[] values = new Comparable[arrays.length]; - for (int i = 0; i < values.length; i++) { - values[i] = arrays[i].toComparable(slot); - } - return new CompositeKey(values); - } - - /** - * Gets the {@link LeafBucketCollector} that will record the composite buckets of the visited documents. - */ - CompositeValuesSource.Collector getLeafCollector(LeafReaderContext context, CompositeValuesSource.Collector in) throws IOException { - int last = arrays.length - 1; - CompositeValuesSource.Collector next = arrays[last].getLeafCollector(context, in); - for (int i = last - 1; i >= 0; i--) { - next = arrays[i].getLeafCollector(context, next); - } - return next; - } -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSource.java deleted file mode 100644 index 2d0368dfd4d28..0000000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSource.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.composite; - -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.joda.FormatDateTimeFormatter; -import org.elasticsearch.index.fielddata.SortedBinaryDocValues; -import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.sort.SortOrder; - -import java.io.IOException; - -import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS; - -/** - * A wrapper for {@link ValuesSource} that can record and compare values produced during a collection. - */ -abstract class CompositeValuesSource> { - interface Collector { - void collect(int doc) throws IOException; - } - - protected final VS vs; - protected final int size; - protected final int reverseMul; - protected T topValue; - - /** - * - * @param vs The original {@link ValuesSource}. - * @param size The number of values to record. - * @param reverseMul -1 if the natural order ({@link SortOrder#ASC} should be reversed. - */ - CompositeValuesSource(VS vs, int size, int reverseMul) { - this.vs = vs; - this.size = size; - this.reverseMul = reverseMul; - } - - /** - * The type of this source. - */ - abstract String type(); - - /** - * Moves the value in from in to. - * The value present in to is overridden. - */ - abstract void move(int from, int to); - - /** - * Compares the value in from with the value in to. - */ - abstract int compare(int from, int to); - - /** - * Compares the value in slot with the top value in this source. - */ - abstract int compareTop(int slot); - - /** - * Sets the top value for this source. Values that compares smaller should not be recorded. - */ - abstract void setTop(Comparable value); - - /** - * Transforms the value in slot to a {@link Comparable} object. - */ - abstract Comparable toComparable(int slot) throws IOException; - - /** - * Gets the {@link LeafCollector} that will record the values of the visited documents. - */ - abstract Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException; - - /** - * Creates a {@link CompositeValuesSource} that generates long values. - */ - static CompositeValuesSource wrapLong(ValuesSource.Numeric vs, DocValueFormat format, - int size, int reverseMul) { - return new LongValuesSource(vs, format, size, reverseMul); - } - - /** - * Creates a {@link CompositeValuesSource} that generates double values. - */ - static CompositeValuesSource wrapDouble(ValuesSource.Numeric vs, int size, int reverseMul) { - return new DoubleValuesSource(vs, size, reverseMul); - } - - /** - * Creates a {@link CompositeValuesSource} that generates binary values. - */ - static CompositeValuesSource wrapBinary(ValuesSource.Bytes vs, int size, int reverseMul) { - return new BinaryValuesSource(vs, size, reverseMul); - } - - /** - * Creates a {@link CompositeValuesSource} that generates global ordinal values. - */ - static CompositeValuesSource wrapGlobalOrdinals(ValuesSource.Bytes.WithOrdinals vs, - int size, - int reverseMul) { - return new GlobalOrdinalValuesSource(vs, size, reverseMul); - } - - /** - * A {@link CompositeValuesSource} for global ordinals - */ - private static class GlobalOrdinalValuesSource extends CompositeValuesSource { - private final long[] values; - private SortedSetDocValues lookup; - private Long topValueGlobalOrd; - private boolean isTopValueInsertionPoint; - - GlobalOrdinalValuesSource(ValuesSource.Bytes.WithOrdinals vs, int size, int reverseMul) { - super(vs, size, reverseMul); - this.values = new long[size]; - } - - @Override - String type() { - return "global_ordinals"; - } - - @Override - void move(int from, int to) { - values[to] = values[from]; - } - - @Override - int compare(int from, int to) { - return Long.compare(values[from], values[to]) * reverseMul; - } - - @Override - int compareTop(int slot) { - int cmp = Long.compare(values[slot], topValueGlobalOrd); - if (cmp == 0 && isTopValueInsertionPoint) { - // the top value is missing in this shard, the comparison is against - // the insertion point of the top value so equality means that the value - // is "after" the insertion point. - return reverseMul; - } - return cmp * reverseMul; - } - - @Override - void setTop(Comparable value) { - if (value instanceof BytesRef) { - topValue = (BytesRef) value; - } else if (value instanceof String) { - topValue = new BytesRef(value.toString()); - } else { - throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName()); - } - } - - @Override - Comparable toComparable(int slot) throws IOException { - return BytesRef.deepCopyOf(lookup.lookupOrd(values[slot])); - } - - @Override - Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException { - final SortedSetDocValues dvs = vs.globalOrdinalsValues(context); - if (lookup == null) { - lookup = dvs; - if (topValue != null && topValueGlobalOrd == null) { - topValueGlobalOrd = lookup.lookupTerm(topValue); - if (topValueGlobalOrd < 0) { - // convert negative insert position - topValueGlobalOrd = -topValueGlobalOrd - 1; - isTopValueInsertionPoint = true; - } - } - } - return doc -> { - if (dvs.advanceExact(doc)) { - long ord; - while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) { - values[0] = ord; - next.collect(doc); - } - } - }; - } - } - - /** - * A {@link CompositeValuesSource} for binary source ({@link BytesRef}) - */ - private static class BinaryValuesSource extends CompositeValuesSource { - private final BytesRef[] values; - - BinaryValuesSource(ValuesSource.Bytes vs, int size, int reverseMul) { - super(vs, size, reverseMul); - this.values = new BytesRef[size]; - } - - @Override - String type() { - return "binary"; - } - - @Override - public void move(int from, int to) { - values[to] = BytesRef.deepCopyOf(values[from]); - } - - @Override - public int compare(int from, int to) { - return values[from].compareTo(values[to]) * reverseMul; - } - - @Override - int compareTop(int slot) { - return values[slot].compareTo(topValue) * reverseMul; - } - - @Override - void setTop(Comparable value) { - if (value.getClass() == BytesRef.class) { - topValue = (BytesRef) value; - } else if (value.getClass() == String.class) { - topValue = new BytesRef((String) value); - } else { - throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName()); - } - } - - @Override - Comparable toComparable(int slot) { - return values[slot]; - } - - @Override - Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException { - final SortedBinaryDocValues dvs = vs.bytesValues(context); - return doc -> { - if (dvs.advanceExact(doc)) { - int num = dvs.docValueCount(); - for (int i = 0; i < num; i++) { - values[0] = dvs.nextValue(); - next.collect(doc); - } - } - }; - } - } - - /** - * A {@link CompositeValuesSource} for longs. - */ - private static class LongValuesSource extends CompositeValuesSource { - private final long[] values; - // handles "format" for date histogram source - private final DocValueFormat format; - - LongValuesSource(ValuesSource.Numeric vs, DocValueFormat format, int size, int reverseMul) { - super(vs, size, reverseMul); - this.format = format; - this.values = new long[size]; - } - - @Override - String type() { - return "long"; - } - - @Override - void move(int from, int to) { - values[to] = values[from]; - } - - @Override - int compare(int from, int to) { - return Long.compare(values[from], values[to]) * reverseMul; - } - - @Override - int compareTop(int slot) { - return Long.compare(values[slot], topValue) * reverseMul; - } - - @Override - void setTop(Comparable value) { - if (value instanceof Number) { - topValue = ((Number) value).longValue(); - } else { - // for date histogram source with "format", the after value is formatted - // as a string so we need to retrieve the original value in milliseconds. - topValue = format.parseLong(value.toString(), false, () -> { - throw new IllegalArgumentException("now() is not supported in [after] key"); - }); - } - } - - @Override - Comparable toComparable(int slot) { - return values[slot]; - } - - @Override - Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException { - final SortedNumericDocValues dvs = vs.longValues(context); - return doc -> { - if (dvs.advanceExact(doc)) { - int num = dvs.docValueCount(); - for (int i = 0; i < num; i++) { - values[0] = dvs.nextValue(); - next.collect(doc); - } - } - }; - } - } - - /** - * A {@link CompositeValuesSource} for doubles. - */ - private static class DoubleValuesSource extends CompositeValuesSource { - private final double[] values; - - DoubleValuesSource(ValuesSource.Numeric vs, int size, int reverseMul) { - super(vs, size, reverseMul); - this.values = new double[size]; - } - - @Override - String type() { - return "long"; - } - - @Override - void move(int from, int to) { - values[to] = values[from]; - } - - @Override - int compare(int from, int to) { - return Double.compare(values[from], values[to]) * reverseMul; - } - - @Override - int compareTop(int slot) { - return Double.compare(values[slot], topValue) * reverseMul; - } - - @Override - void setTop(Comparable value) { - if (value instanceof Number) { - topValue = ((Number) value).doubleValue(); - } else { - topValue = Double.parseDouble(value.toString()); - } - } - - @Override - Comparable toComparable(int slot) { - return values[slot]; - } - - @Override - Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOException { - final SortedNumericDoubleValues dvs = vs.doubleValues(context); - return doc -> { - if (dvs.advanceExact(doc)) { - int num = dvs.docValueCount(); - for (int i = 0; i < num; i++) { - values[0] = dvs.nextValue(); - next.collect(doc); - } - } - }; - } - } -} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java index 2e06d7c9fe30b..d19729293a912 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java @@ -19,19 +19,13 @@ package org.elasticsearch.search.aggregations.bucket.composite; -import org.apache.lucene.index.DocValues; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.search.SortField; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.IndexSortConfig; +import org.elasticsearch.index.query.QueryShardException; import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; @@ -291,46 +285,18 @@ public String format() { * * @param context The search context for this source. * @param config The {@link ValuesSourceConfig} for this source. - * @param pos The position of this source in the composite key. - * @param numPos The total number of positions in the composite key. - * @param sortField The {@link SortField} of the index sort at this position or null if not present. */ - protected abstract CompositeValuesSourceConfig innerBuild(SearchContext context, - ValuesSourceConfig config, - int pos, - int numPos, - SortField sortField) throws IOException; + protected abstract CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig config) throws IOException; - public final CompositeValuesSourceConfig build(SearchContext context, int pos, int numPos, SortField sortField) throws IOException { + public final CompositeValuesSourceConfig build(SearchContext context) throws IOException { ValuesSourceConfig config = ValuesSourceConfig.resolve(context.getQueryShardContext(), valueType, field, script, missing, null, format); - return innerBuild(context, config, pos, numPos, sortField); - } - - protected boolean checkCanEarlyTerminate(IndexReader reader, - String fieldName, - boolean reverse, - SortField sortField) throws IOException { - return sortField.getField().equals(fieldName) && - sortField.getReverse() == reverse && - isSingleValued(reader, sortField); - } - - private static boolean isSingleValued(IndexReader reader, SortField field) throws IOException { - SortField.Type type = IndexSortConfig.getSortFieldType(field); - for (LeafReaderContext context : reader.leaves()) { - if (type == SortField.Type.STRING) { - final SortedSetDocValues values = DocValues.getSortedSet(context.reader(), field.getField()); - if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) { - return false; - } - } else { - final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), field.getField()); - if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) { - return false; - } - } + if (config.unmapped() && field != null && config.missing() == null) { + // this source cannot produce any values so we refuse to build + // since composite buckets are not created on null values + throw new QueryShardException(context.getQueryShardContext(), + "failed to find field [" + field + "] and [missing] is not provided"); } - return true; + return innerBuild(context, config); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java index ee70d3f39a550..ca4b38dc1f4d5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java @@ -19,22 +19,25 @@ package org.elasticsearch.search.aggregations.bucket.composite; +import org.elasticsearch.common.inject.internal.Nullable; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.sort.SortOrder; class CompositeValuesSourceConfig { private final String name; + @Nullable + private final MappedFieldType fieldType; private final ValuesSource vs; private final DocValueFormat format; private final int reverseMul; - private final boolean canEarlyTerminate; - CompositeValuesSourceConfig(String name, ValuesSource vs, DocValueFormat format, SortOrder order, boolean canEarlyTerminate) { + CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format, SortOrder order) { this.name = name; + this.fieldType = fieldType; this.vs = vs; this.format = format; - this.canEarlyTerminate = canEarlyTerminate; this.reverseMul = order == SortOrder.ASC ? 1 : -1; } @@ -45,6 +48,13 @@ String name() { return name; } + /** + * Returns the {@link MappedFieldType} for this config. + */ + MappedFieldType fieldType() { + return fieldType; + } + /** * Returns the {@link ValuesSource} for this configuration. */ @@ -67,11 +77,4 @@ int reverseMul() { assert reverseMul == -1 || reverseMul == 1; return reverseMul; } - - /** - * Returns whether this {@link ValuesSource} is used to sort the index. - */ - boolean canEarlyTerminate() { - return canEarlyTerminate; - } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index caa16b7a9d57f..fb2999bbd0b33 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -19,7 +19,6 @@ package org.elasticsearch.search.aggregations.bucket.composite; -import org.apache.lucene.search.SortField; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -29,9 +28,9 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.script.Script; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.support.FieldContext; @@ -39,7 +38,6 @@ import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.sort.SortOrder; import org.joda.time.DateTimeZone; import java.io.IOException; @@ -217,11 +215,7 @@ private Rounding createRounding() { } @Override - protected CompositeValuesSourceConfig innerBuild(SearchContext context, - ValuesSourceConfig config, - int pos, - int numPos, - SortField sortField) throws IOException { + protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig config) throws IOException { Rounding rounding = createRounding(); ValuesSource orig = config.toValuesSource(context.getQueryShardContext()); if (orig == null) { @@ -230,19 +224,10 @@ protected CompositeValuesSourceConfig innerBuild(SearchContext context, if (orig instanceof ValuesSource.Numeric) { ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig; RoundingValuesSource vs = new RoundingValuesSource(numeric, rounding); - boolean canEarlyTerminate = false; - final FieldContext fieldContext = config.fieldContext(); - if (sortField != null && - pos == numPos-1 && - fieldContext != null) { - canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(), - fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField); - } - // dates are returned as timestamp in milliseconds-since-the-epoch unless a specific date format // is specified in the builder. final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format(); - return new CompositeValuesSourceConfig(name, vs, docValueFormat, - order(), canEarlyTerminate); + final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; + return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order()); } else { throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java new file mode 100644 index 0000000000000..baf63a8d65fee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Query; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; + +/** + * A {@link SingleDimensionValuesSource} for doubles. + */ +class DoubleValuesSource extends SingleDimensionValuesSource { + private final CheckedFunction docValuesFunc; + private final DoubleArray values; + private double currentValue; + + DoubleValuesSource(BigArrays bigArrays, MappedFieldType fieldType, + CheckedFunction docValuesFunc, + int size, int reverseMul) { + super(fieldType, size, reverseMul); + this.docValuesFunc = docValuesFunc; + this.values = bigArrays.newDoubleArray(size, false); + } + + @Override + void copyCurrent(int slot) { + values.set(slot, currentValue); + } + + @Override + int compare(int from, int to) { + return compareValues(values.get(from), values.get(to)); + } + + @Override + int compareCurrent(int slot) { + return compareValues(currentValue, values.get(slot)); + } + + @Override + int compareCurrentWithAfter() { + return compareValues(currentValue, afterValue); + } + + private int compareValues(double v1, double v2) { + return Double.compare(v1, v2) * reverseMul; + } + + @Override + void setAfter(Comparable value) { + if (value instanceof Number) { + afterValue = ((Number) value).doubleValue(); + } else { + afterValue = Double.parseDouble(value.toString()); + } + } + + @Override + Double toComparable(int slot) { + return values.get(slot); + } + + @Override + LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException { + final SortedNumericDoubleValues dvs = docValuesFunc.apply(context); + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + if (dvs.advanceExact(doc)) { + int num = dvs.docValueCount(); + for (int i = 0; i < num; i++) { + currentValue = dvs.nextValue(); + next.collect(doc, bucket); + } + } + } + }; + } + + @Override + LeafBucketCollector getLeafCollector(Comparable value, LeafReaderContext context, LeafBucketCollector next) { + if (value.getClass() != Double.class) { + throw new IllegalArgumentException("Expected Double, got " + value.getClass()); + } + currentValue = (Double) value; + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + next.collect(doc, bucket); + } + }; + } + + @Override + SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { + return null; + } + + @Override + public void close() { + Releasables.close(values); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java new file mode 100644 index 0000000000000..e3ae3dca1bd63 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java @@ -0,0 +1,189 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; + +import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS; + +/** + * A {@link SingleDimensionValuesSource} for global ordinals. + */ +class GlobalOrdinalValuesSource extends SingleDimensionValuesSource { + private final CheckedFunction docValuesFunc; + private final LongArray values; + private SortedSetDocValues lookup; + private long currentValue; + private Long afterValueGlobalOrd; + private boolean isTopValueInsertionPoint; + + private long lastLookupOrd = -1; + private BytesRef lastLookupValue; + + GlobalOrdinalValuesSource(BigArrays bigArrays, + MappedFieldType type, CheckedFunction docValuesFunc, + int size, int reverseMul) { + super(type, size, reverseMul); + this.docValuesFunc = docValuesFunc; + this.values = bigArrays.newLongArray(size, false); + } + + @Override + void copyCurrent(int slot) { + values.set(slot, currentValue); + } + + @Override + int compare(int from, int to) { + return Long.compare(values.get(from), values.get(to)) * reverseMul; + } + + @Override + int compareCurrent(int slot) { + return Long.compare(currentValue, values.get(slot)) * reverseMul; + } + + @Override + int compareCurrentWithAfter() { + int cmp = Long.compare(currentValue, afterValueGlobalOrd); + if (cmp == 0 && isTopValueInsertionPoint) { + // the top value is missing in this shard, the comparison is against + // the insertion point of the top value so equality means that the value + // is "after" the insertion point. + return reverseMul; + } + return cmp * reverseMul; + } + + @Override + void setAfter(Comparable value) { + if (value instanceof BytesRef) { + afterValue = (BytesRef) value; + } else if (value instanceof String) { + afterValue = new BytesRef(value.toString()); + } else { + throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName()); + } + } + + @Override + BytesRef toComparable(int slot) throws IOException { + long globalOrd = values.get(slot); + if (globalOrd == lastLookupOrd) { + return lastLookupValue; + } else { + lastLookupOrd= globalOrd; + lastLookupValue = BytesRef.deepCopyOf(lookup.lookupOrd(values.get(slot))); + return lastLookupValue; + } + } + + @Override + LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException { + final SortedSetDocValues dvs = docValuesFunc.apply(context); + if (lookup == null) { + initLookup(dvs); + } + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + if (dvs.advanceExact(doc)) { + long ord; + while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) { + currentValue = ord; + next.collect(doc, bucket); + } + } + } + }; + } + + @Override + LeafBucketCollector getLeafCollector(Comparable value, LeafReaderContext context, LeafBucketCollector next) throws IOException { + if (value.getClass() != BytesRef.class) { + throw new IllegalArgumentException("Expected BytesRef, got " + value.getClass()); + } + BytesRef term = (BytesRef) value; + final SortedSetDocValues dvs = docValuesFunc.apply(context); + if (lookup == null) { + initLookup(dvs); + } + return new LeafBucketCollector() { + boolean currentValueIsSet = false; + + @Override + public void collect(int doc, long bucket) throws IOException { + if (!currentValueIsSet) { + if (dvs.advanceExact(doc)) { + long ord; + while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) { + if (term.equals(lookup.lookupOrd(ord))) { + currentValueIsSet = true; + currentValue = ord; + break; + } + } + } + } + assert currentValueIsSet; + next.collect(doc, bucket); + } + }; + } + + @Override + SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { + if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || + (query != null && query.getClass() != MatchAllDocsQuery.class)) { + return null; + } + return new TermsSortedDocsProducer(fieldType.name()); + } + + @Override + public void close() { + Releasables.close(values); + } + + private void initLookup(SortedSetDocValues dvs) throws IOException { + lookup = dvs; + if (afterValue != null && afterValueGlobalOrd == null) { + afterValueGlobalOrd = lookup.lookupTerm(afterValue); + if (afterValueGlobalOrd < 0) { + // convert negative insert position + afterValueGlobalOrd = -afterValueGlobalOrd - 1; + isTopValueInsertionPoint = true; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java index 83ada5dbbc3c3..1dc0aa596d790 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java @@ -19,19 +19,17 @@ package org.elasticsearch.search.aggregations.bucket.composite; -import org.apache.lucene.search.SortField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.support.FieldContext; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.Objects; @@ -108,27 +106,16 @@ public HistogramValuesSourceBuilder interval(double interval) { } @Override - protected CompositeValuesSourceConfig innerBuild(SearchContext context, - ValuesSourceConfig config, - int pos, - int numPos, - SortField sortField) throws IOException { + protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig config) throws IOException { ValuesSource orig = config.toValuesSource(context.getQueryShardContext()); if (orig == null) { orig = ValuesSource.Numeric.EMPTY; } if (orig instanceof ValuesSource.Numeric) { ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig; - HistogramValuesSource vs = new HistogramValuesSource(numeric, interval); - boolean canEarlyTerminate = false; - final FieldContext fieldContext = config.fieldContext(); - if (sortField != null && - pos == numPos-1 && - fieldContext != null) { - canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(), - fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField); - } - return new CompositeValuesSourceConfig(name, vs, config.format(), order(), canEarlyTerminate); + final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval); + final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; + return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order()); } else { throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java new file mode 100644 index 0000000000000..96d0b02780948 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -0,0 +1,190 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.PointRangeQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; +import java.util.function.LongUnaryOperator; +import java.util.function.ToLongFunction; + +/** + * A {@link SingleDimensionValuesSource} for longs. + */ +class LongValuesSource extends SingleDimensionValuesSource { + private final CheckedFunction docValuesFunc; + private final LongUnaryOperator rounding; + // handles "format" for date histogram source + private final DocValueFormat format; + + private final LongArray values; + private long currentValue; + + LongValuesSource(BigArrays bigArrays, MappedFieldType fieldType, + CheckedFunction docValuesFunc, + LongUnaryOperator rounding, DocValueFormat format, int size, int reverseMul) { + super(fieldType, size, reverseMul); + this.docValuesFunc = docValuesFunc; + this.rounding = rounding; + this.format = format; + this.values = bigArrays.newLongArray(size, false); + } + + @Override + void copyCurrent(int slot) { + values.set(slot, currentValue); + } + + @Override + int compare(int from, int to) { + return compareValues(values.get(from), values.get(to)); + } + + @Override + int compareCurrent(int slot) { + return compareValues(currentValue, values.get(slot)); + } + + @Override + int compareCurrentWithAfter() { + return compareValues(currentValue, afterValue); + } + + private int compareValues(long v1, long v2) { + return Long.compare(v1, v2) * reverseMul; + } + + @Override + void setAfter(Comparable value) { + if (value instanceof Number) { + afterValue = ((Number) value).longValue(); + } else { + // for date histogram source with "format", the after value is formatted + // as a string so we need to retrieve the original value in milliseconds. + afterValue = format.parseLong(value.toString(), false, () -> { + throw new IllegalArgumentException("now() is not supported in [after] key"); + }); + } + } + + @Override + Long toComparable(int slot) { + return values.get(slot); + } + + @Override + LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException { + final SortedNumericDocValues dvs = docValuesFunc.apply(context); + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + if (dvs.advanceExact(doc)) { + int num = dvs.docValueCount(); + for (int i = 0; i < num; i++) { + currentValue = dvs.nextValue(); + next.collect(doc, bucket); + } + } + } + }; + } + + @Override + LeafBucketCollector getLeafCollector(Comparable value, LeafReaderContext context, LeafBucketCollector next) { + if (value.getClass() != Long.class) { + throw new IllegalArgumentException("Expected Long, got " + value.getClass()); + } + currentValue = (Long) value; + return new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + next.collect(doc, bucket); + } + }; + } + + @Override + SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { + if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || + (query != null && + query.getClass() != MatchAllDocsQuery.class && + // if the query is a range query over the same field + (query instanceof PointRangeQuery && fieldType.name().equals((((PointRangeQuery) query).getField()))) == false)) { + return null; + } + final byte[] lowerPoint; + final byte[] upperPoint; + if (query instanceof PointRangeQuery) { + final PointRangeQuery rangeQuery = (PointRangeQuery) query; + lowerPoint = rangeQuery.getLowerPoint(); + upperPoint = rangeQuery.getUpperPoint(); + } else { + lowerPoint = null; + upperPoint = null; + } + + if (fieldType instanceof NumberFieldMapper.NumberFieldType) { + NumberFieldMapper.NumberFieldType ft = (NumberFieldMapper.NumberFieldType) fieldType; + final ToLongFunction toBucketFunction; + + switch (ft.typeName()) { + case "long": + toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0)); + break; + + case "int": + case "short": + case "byte": + toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0)); + break; + + default: + return null; + } + return new PointsSortedDocsProducer(fieldType.name(), toBucketFunction, lowerPoint, upperPoint); + } else if (fieldType instanceof DateFieldMapper.DateFieldType) { + final ToLongFunction toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0)); + return new PointsSortedDocsProducer(fieldType.name(), toBucketFunction, lowerPoint, upperPoint); + } else { + return null; + } + } + + @Override + public void close() { + Releasables.close(values); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java new file mode 100644 index 0000000000000..d0f2d6ef9461a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.DocIdSetBuilder; +import org.apache.lucene.util.StringHelper; + +import java.io.IOException; +import java.util.function.ToLongFunction; + +/** + * A {@link SortedDocsProducer} that can sort documents based on numerics indexed in the provided field. + */ +class PointsSortedDocsProducer extends SortedDocsProducer { + private final ToLongFunction bucketFunction; + private final byte[] lowerPointQuery; + private final byte[] upperPointQuery; + + PointsSortedDocsProducer(String field, ToLongFunction bucketFunction, byte[] lowerPointQuery, byte[] upperPointQuery) { + super(field); + this.bucketFunction = bucketFunction; + this.lowerPointQuery = lowerPointQuery; + this.upperPointQuery = upperPointQuery; + } + + @Override + DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, + LeafReaderContext context, boolean fillDocIdSet) throws IOException { + final PointValues values = context.reader().getPointValues(field); + if (values == null) { + // no value for the field + return DocIdSet.EMPTY; + } + long lowerBucket = Long.MIN_VALUE; + Comparable lowerValue = queue.getLowerValueLeadSource(); + if (lowerValue != null) { + if (lowerValue.getClass() != Long.class) { + throw new IllegalStateException("expected Long, got " + lowerValue.getClass()); + } + lowerBucket = (Long) lowerValue; + } + + long upperBucket = Long.MAX_VALUE; + Comparable upperValue = queue.getUpperValueLeadSource(); + if (upperValue != null) { + if (upperValue.getClass() != Long.class) { + throw new IllegalStateException("expected Long, got " + upperValue.getClass()); + } + upperBucket = (Long) upperValue; + } + DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null; + Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket); + try { + values.intersect(visitor); + visitor.flush(); + } catch (CollectionTerminatedException exc) {} + return fillDocIdSet ? builder.build() : DocIdSet.EMPTY; + } + + private class Visitor implements PointValues.IntersectVisitor { + final LeafReaderContext context; + final CompositeValuesCollectorQueue queue; + final DocIdSetBuilder builder; + final int maxDoc; + final int bytesPerDim; + final long lowerBucket; + final long upperBucket; + + DocIdSetBuilder bucketDocsBuilder; + DocIdSetBuilder.BulkAdder adder; + int remaining; + long lastBucket; + boolean first = true; + + Visitor(LeafReaderContext context, CompositeValuesCollectorQueue queue, DocIdSetBuilder builder, + int bytesPerDim, long lowerBucket, long upperBucket) { + this.context = context; + this.maxDoc = context.reader().maxDoc(); + this.queue = queue; + this.builder = builder; + this.lowerBucket = lowerBucket; + this.upperBucket = upperBucket; + this.bucketDocsBuilder = new DocIdSetBuilder(maxDoc); + this.bytesPerDim = bytesPerDim; + } + + @Override + public void grow(int count) { + remaining = count; + adder = bucketDocsBuilder.grow(count); + } + + @Override + public void visit(int docID) throws IOException { + throw new IllegalStateException("should never be called"); + } + + @Override + public void visit(int docID, byte[] packedValue) throws IOException { + if (compare(packedValue, packedValue) != PointValues.Relation.CELL_CROSSES_QUERY) { + remaining --; + return; + } + + long bucket = bucketFunction.applyAsLong(packedValue); + if (first == false && bucket != lastBucket) { + final DocIdSet docIdSet = bucketDocsBuilder.build(); + if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && + // lower bucket is inclusive + lowerBucket != lastBucket) { + // this bucket does not have any competitive composite buckets, + // we can early terminate the collection because the remaining buckets are guaranteed + // to be greater than this bucket. + throw new CollectionTerminatedException(); + } + bucketDocsBuilder = new DocIdSetBuilder(maxDoc); + assert remaining > 0; + adder = bucketDocsBuilder.grow(remaining); + } + lastBucket = bucket; + first = false; + adder.add(docID); + remaining --; + } + + @Override + public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { + if ((upperPointQuery != null && StringHelper.compare(bytesPerDim, minPackedValue, 0, upperPointQuery, 0) > 0) || + (lowerPointQuery != null && StringHelper.compare(bytesPerDim, maxPackedValue, 0, lowerPointQuery, 0) < 0)) { + // does not match the query + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + + // check the current bounds + if (lowerBucket != Long.MIN_VALUE) { + long maxBucket = bucketFunction.applyAsLong(maxPackedValue); + if (maxBucket < lowerBucket) { + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + } + + if (upperBucket != Long.MAX_VALUE) { + long minBucket = bucketFunction.applyAsLong(minPackedValue); + if (minBucket > upperBucket) { + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + } + return PointValues.Relation.CELL_CROSSES_QUERY; + } + + public void flush() throws IOException { + if (first == false) { + final DocIdSet docIdSet = bucketDocsBuilder.build(); + processBucket(queue, context, docIdSet.iterator(), lastBucket, builder); + bucketDocsBuilder = null; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/RoundingValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/RoundingValuesSource.java index 099f2e5e0fd5a..635690c44f49e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/RoundingValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/RoundingValuesSource.java @@ -51,13 +51,17 @@ public boolean isFloatingPoint() { return false; } + public long round(long value) { + return rounding.round(value); + } + @Override public SortedNumericDocValues longValues(LeafReaderContext context) throws IOException { SortedNumericDocValues values = vs.longValues(context); return new SortedNumericDocValues() { @Override public long nextValue() throws IOException { - return rounding.round(values.nextValue()); + return round(values.nextValue()); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java new file mode 100644 index 0000000000000..efedce7db2afa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -0,0 +1,143 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Query; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; + +/** + * A source that can record and compare values of similar type. + */ +abstract class SingleDimensionValuesSource> implements Releasable { + protected final int size; + protected final int reverseMul; + protected T afterValue; + @Nullable + protected MappedFieldType fieldType; + + /** + * Ctr + * + * @param fieldType The fieldType associated with the source. + * @param size The number of values to record. + * @param reverseMul -1 if the natural order ({@link SortOrder#ASC} should be reversed. + */ + SingleDimensionValuesSource(@Nullable MappedFieldType fieldType, int size, int reverseMul) { + this.fieldType = fieldType; + this.size = size; + this.reverseMul = reverseMul; + this.afterValue = null; + } + + /** + * The current value is filled by a {@link LeafBucketCollector} that visits all the + * values of each document. This method saves this current value in a slot and should only be used + * in the context of a collection. + * See {@link this#getLeafCollector}. + */ + abstract void copyCurrent(int slot); + + /** + * Compares the value in from with the value in to. + */ + abstract int compare(int from, int to); + + /** + * The current value is filled by a {@link LeafBucketCollector} that visits all the + * values of each document. This method compares this current value with the value present in + * the provided slot and should only be used in the context of a collection. + * See {@link this#getLeafCollector}. + */ + abstract int compareCurrent(int slot); + + /** + * The current value is filled by a {@link LeafBucketCollector} that visits all the + * values of each document. This method compares this current value with the after value + * set on this source and should only be used in the context of a collection. + * See {@link this#getLeafCollector}. + */ + abstract int compareCurrentWithAfter(); + + /** + * Sets the after value for this source. Values that compares smaller are filtered. + */ + abstract void setAfter(Comparable value); + + /** + * Returns the after value set for this source. + */ + T getAfter() { + return afterValue; + } + + /** + * Transforms the value in slot to a {@link Comparable} object. + */ + abstract T toComparable(int slot) throws IOException; + + /** + * Creates a {@link LeafBucketCollector} that extracts all values from a document and invokes + * {@link LeafBucketCollector#collect} on the provided next collector for each of them. + * The current value of this source is set on each call and can be accessed by next via + * the {@link this#copyCurrent(int)} and {@link this#compareCurrent(int)} methods. Note that these methods + * are only valid when invoked from the {@link LeafBucketCollector} created in this source. + */ + abstract LeafBucketCollector getLeafCollector(LeafReaderContext context, LeafBucketCollector next) throws IOException; + + /** + * Creates a {@link LeafBucketCollector} that sets the current value for each document to the provided + * value and invokes {@link LeafBucketCollector#collect} on the provided next collector. + */ + abstract LeafBucketCollector getLeafCollector(Comparable value, + LeafReaderContext context, LeafBucketCollector next) throws IOException; + + /** + * Returns a {@link SortedDocsProducer} or null if this source cannot produce sorted docs. + */ + abstract SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query); + + /** + * Returns true if a {@link SortedDocsProducer} should be used to optimize the execution. + */ + protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldType fieldType) { + if (fieldType == null || + fieldType.indexOptions() == IndexOptions.NONE || + // inverse of the natural order + reverseMul == -1) { + return false; + } + + if (reader.hasDeletions() && + (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) { + // do not use the index if it has more than 50% of deleted docs + return false; + } + return true; + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java new file mode 100644 index 0000000000000..d9d927ff66061 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.DocIdSetBuilder; +import org.elasticsearch.common.inject.internal.Nullable; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; + +/** + * A producer that visits composite buckets in the order of the value indexed in the leading source of the composite + * definition. It can be used to control which documents should be collected to produce the top composite buckets + * without visiting all documents in an index. + */ +abstract class SortedDocsProducer { + protected final String field; + + SortedDocsProducer(String field) { + this.field = field; + } + + /** + * Visits all non-deleted documents in iterator and fills the provided queue + * with the top composite buckets extracted from the collection. + * Documents that contain a top composite bucket are added in the provided builder if it is not null. + * + * Returns true if the queue is full and the current leadSourceBucket did not produce any competitive + * composite buckets. + */ + protected boolean processBucket(CompositeValuesCollectorQueue queue, LeafReaderContext context, DocIdSetIterator iterator, + Comparable leadSourceBucket, @Nullable DocIdSetBuilder builder) throws IOException { + final int[] topCompositeCollected = new int[1]; + final boolean[] hasCollected = new boolean[1]; + final LeafBucketCollector queueCollector = new LeafBucketCollector() { + int lastDoc = -1; + + // we need to add the matching document in the builder + // so we build a bulk adder from the approximate cost of the iterator + // and rebuild the adder during the collection if needed + int remainingBits = (int) Math.min(iterator.cost(), Integer.MAX_VALUE); + DocIdSetBuilder.BulkAdder adder = builder == null ? null : builder.grow(remainingBits); + + @Override + public void collect(int doc, long bucket) throws IOException { + hasCollected[0] = true; + int slot = queue.addIfCompetitive(); + if (slot != -1) { + topCompositeCollected[0]++; + if (adder != null && doc != lastDoc) { + if (remainingBits == 0) { + // the cost approximation was lower than the real size, we need to grow the adder + // by some numbers (128) to ensure that we can add the extra documents + adder = builder.grow(128); + remainingBits = 128; + } + adder.add(doc); + remainingBits --; + lastDoc = doc; + } + } + } + }; + final Bits liveDocs = context.reader().getLiveDocs(); + final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector); + while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (liveDocs == null || liveDocs.get(iterator.docID())) { + collector.collect(iterator.docID()); + } + } + if (queue.isFull() && + hasCollected[0] && + topCompositeCollected[0] == 0) { + return true; + } + return false; + } + + /** + * Populates the queue with the composite buckets present in the context. + * Returns the {@link DocIdSet} of the documents that contain a top composite bucket in this leaf or + * {@link DocIdSet#EMPTY} if fillDocIdSet is false. + */ + abstract DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, + LeafReaderContext context, boolean fillDocIdSet) throws IOException; +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java new file mode 100644 index 0000000000000..f9d9877e320b4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.DocIdSetBuilder; + +import java.io.IOException; + +/** + * A {@link SortedDocsProducer} that can sort documents based on terms indexed in the provided field. + */ +class TermsSortedDocsProducer extends SortedDocsProducer { + TermsSortedDocsProducer(String field) { + super(field); + } + + @Override + DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, + LeafReaderContext context, boolean fillDocIdSet) throws IOException { + final Terms terms = context.reader().terms(field); + if (terms == null) { + // no value for the field + return DocIdSet.EMPTY; + } + BytesRef lowerValue = (BytesRef) queue.getLowerValueLeadSource(); + BytesRef upperValue = (BytesRef) queue.getUpperValueLeadSource(); + final TermsEnum te = terms.iterator(); + if (lowerValue != null) { + if (te.seekCeil(lowerValue) == TermsEnum.SeekStatus.END) { + return DocIdSet.EMPTY ; + } + } else { + if (te.next() == null) { + return DocIdSet.EMPTY; + } + } + DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), terms) : null; + PostingsEnum reuse = null; + boolean first = true; + do { + if (upperValue != null && upperValue.compareTo(te.term()) < 0) { + break; + } + reuse = te.postings(reuse, PostingsEnum.NONE); + if (processBucket(queue, context, reuse, te.term(), builder) && !first) { + // this bucket does not have any competitive composite buckets, + // we can early terminate the collection because the remaining buckets are guaranteed + // to be greater than this bucket. + break; + } + first = false; + } while (te.next() != null); + return fillDocIdSet ? builder.build() : DocIdSet.EMPTY; + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java index 6ca5cdbcb6230..21ab14fe27e21 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java @@ -19,18 +19,16 @@ package org.elasticsearch.search.aggregations.bucket.composite; -import org.apache.lucene.search.SortField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.search.aggregations.support.FieldContext; +import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.script.Script; -import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; @@ -80,21 +78,12 @@ public String type() { } @Override - protected CompositeValuesSourceConfig innerBuild(SearchContext context, - ValuesSourceConfig config, - int pos, - int numPos, - SortField sortField) throws IOException { + protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSourceConfig config) throws IOException { ValuesSource vs = config.toValuesSource(context.getQueryShardContext()); if (vs == null) { vs = ValuesSource.Numeric.EMPTY; } - boolean canEarlyTerminate = false; - final FieldContext fieldContext = config.fieldContext(); - if (sortField != null && config.fieldContext() != null) { - canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(), - fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField); - } - return new CompositeValuesSourceConfig(name, vs, config.format(), order(), canEarlyTerminate); + final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; + return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order()); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 094457a8bf4f6..e52a4b7bbbc9e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -19,42 +19,44 @@ package org.elasticsearch.search.aggregations.bucket.composite; -import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; +import org.apache.lucene.document.DoublePoint; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.MultiReader; import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; -import org.apache.lucene.search.SortedNumericSortField; -import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.NumericUtils; -import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.query.QueryShardException; +import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.max.InternalMax; +import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.tophits.TopHits; import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.test.IndexSettingsModule; import org.joda.time.DateTimeZone; import org.junit.After; import org.junit.Before; @@ -64,12 +66,18 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; public class CompositeAggregatorTests extends AggregatorTestCase { @@ -79,7 +87,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase { @Before public void setUp() throws Exception { super.setUp(); - FIELD_TYPES = new MappedFieldType[5]; + FIELD_TYPES = new MappedFieldType[6]; FIELD_TYPES[0] = new KeywordFieldMapper.KeywordFieldType(); FIELD_TYPES[0].setName("keyword"); FIELD_TYPES[0].setHasDocValues(true); @@ -101,6 +109,10 @@ public void setUp() throws Exception { FIELD_TYPES[4] = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER); FIELD_TYPES[4].setName("price"); FIELD_TYPES[4].setHasDocValues(true); + + FIELD_TYPES[5] = new KeywordFieldMapper.KeywordFieldType(); + FIELD_TYPES[5].setName("terms"); + FIELD_TYPES[5].setHasDocValues(true); } @Override @@ -110,6 +122,19 @@ public void tearDown() throws Exception { FIELD_TYPES = null; } + public void testUnmappedField() throws Exception { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10)) + .field("unknown"); + CompositeAggregationBuilder builder = new CompositeAggregationBuilder("test", Collections.singletonList(terms)); + IndexSearcher searcher = new IndexSearcher(new MultiReader()); + QueryShardException exc = + expectThrows(QueryShardException.class, () -> createAggregatorFactory(builder, searcher)); + assertThat(exc.getMessage(), containsString("failed to find field [unknown] and [missing] is not provided")); + // should work when missing is provided + terms.missing("missing"); + createAggregatorFactory(builder, searcher); + } + public void testWithKeyword() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( @@ -121,8 +146,7 @@ public void testWithKeyword() throws Exception { createDocument("keyword", "c") ) ); - final Sort sort = new Sort(new SortedSetSortField("keyword", false)); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -139,7 +163,7 @@ public void testWithKeyword() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -168,8 +192,7 @@ public void testWithKeywordMissingAfter() throws Exception { createDocument("keyword", "delta") ) ); - final Sort sort = new Sort(new SortedSetSortField("keyword", false)); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -188,7 +211,7 @@ public void testWithKeywordMissingAfter() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -206,7 +229,7 @@ public void testWithKeywordMissingAfter() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword").order(SortOrder.DESC); @@ -236,8 +259,7 @@ public void testWithKeywordDesc() throws Exception { createDocument("keyword", "c") ) ); - final Sort sort = new Sort(new SortedSetSortField("keyword", true)); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword") @@ -255,7 +277,7 @@ public void testWithKeywordDesc() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword") @@ -285,7 +307,7 @@ public void testMultiValuedWithKeyword() throws Exception { ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -307,7 +329,7 @@ public void testMultiValuedWithKeyword() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -339,7 +361,7 @@ public void testMultiValuedWithKeywordDesc() throws Exception { ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword") @@ -362,7 +384,7 @@ public void testMultiValuedWithKeywordDesc() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword") @@ -394,11 +416,7 @@ public void testWithKeywordAndLong() throws Exception { createDocument("long", 100L) ) ); - final Sort sort = new Sort( - new SortedSetSortField("keyword", false), - new SortedNumericSortField("long", SortField.Type.LONG) - ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( new TermsValuesSourceBuilder("keyword").field("keyword"), @@ -419,7 +437,7 @@ public void testWithKeywordAndLong() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( new TermsValuesSourceBuilder("keyword").field("keyword"), @@ -451,11 +469,7 @@ public void testWithKeywordAndLongDesc() throws Exception { createDocument("long", 100L) ) ); - final Sort sort = new Sort( - new SortedSetSortField("keyword", true), - new SortedNumericSortField("long", SortField.Type.LONG, true) - ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -477,7 +491,7 @@ public void testWithKeywordAndLongDesc() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -510,7 +524,7 @@ public void testMultiValuedWithKeywordAndLong() throws Exception { ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -543,7 +557,7 @@ public void testMultiValuedWithKeywordAndLong() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -580,11 +594,10 @@ public void testMultiValuedWithKeywordAndLongDesc() throws Exception { createDocument("keyword", Arrays.asList("d", "d"), "long", Arrays.asList(10L, 100L, 1000L)), createDocument("keyword", "c"), createDocument("long", 100L) - ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -619,7 +632,7 @@ public void testMultiValuedWithKeywordAndLongDesc() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -653,7 +666,7 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -688,7 +701,7 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -723,7 +736,7 @@ public void testMultiValuedWithKeywordLongAndDouble() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -751,8 +764,12 @@ public void testWithDateHistogram() throws IOException { createDocument("long", 4L) ) ); - final Sort sort = new Sort(new SortedNumericSortField("date", SortField.Type.LONG)); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"), + LongPoint.newRangeQuery( + "date", + asLong("2016-09-20T09:00:34"), + asLong("2017-10-20T06:09:24") + )), dataset, () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -771,7 +788,12 @@ public void testWithDateHistogram() throws IOException { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"), + LongPoint.newRangeQuery( + "date", + asLong("2016-09-20T11:34:00"), + asLong("2017-10-20T06:09:24") + )), dataset, () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -802,8 +824,7 @@ public void testWithDateHistogramAndFormat() throws IOException { createDocument("long", 4L) ) ); - final Sort sort = new Sort(new SortedNumericSortField("date", SortField.Type.LONG)); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset, () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -823,7 +844,7 @@ public void testWithDateHistogramAndFormat() throws IOException { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset, () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -845,7 +866,7 @@ public void testWithDateHistogramAndFormat() throws IOException { public void testThatDateHistogramFailsFormatAfter() throws IOException { ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class, - () -> testSearchCase(new MatchAllDocsQuery(), null, Collections.emptyList(), + () -> testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), Collections.emptyList(), () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -860,7 +881,7 @@ public void testThatDateHistogramFailsFormatAfter() throws IOException { assertThat(exc.getCause().getMessage(), containsString("now() is not supported in [after] key")); exc = expectThrows(ElasticsearchParseException.class, - () -> testSearchCase(new MatchAllDocsQuery(), null, Collections.emptyList(), + () -> testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), Collections.emptyList(), () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -887,8 +908,7 @@ public void testWithDateHistogramAndTimeZone() throws IOException { createDocument("long", 4L) ) ); - final Sort sort = new Sort(new SortedNumericSortField("date", SortField.Type.LONG)); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset, () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -908,7 +928,7 @@ public void testWithDateHistogramAndTimeZone() throws IOException { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date")), dataset, () -> { DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date") .field("date") @@ -940,7 +960,12 @@ public void testWithDateHistogramAndKeyword() throws IOException { createDocument("long", 4L) ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"), + LongPoint.newRangeQuery( + "date", + asLong("2016-09-20T09:00:34"), + asLong("2017-10-20T06:09:24") + )), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -971,7 +996,12 @@ public void testWithDateHistogramAndKeyword() throws IOException { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("date"), + LongPoint.newRangeQuery( + "date", + asLong("2016-09-20T11:34:00"), + asLong("2017-10-20T06:09:24") + )), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -1007,7 +1037,7 @@ public void testWithKeywordAndHistogram() throws IOException { createDocument("long", 4L) ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("price")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -1035,7 +1065,7 @@ public void testWithKeywordAndHistogram() throws IOException { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("price")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -1075,7 +1105,7 @@ public void testWithHistogramAndKeyword() throws IOException { createDocument("long", 4L) ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("double")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -1105,7 +1135,7 @@ public void testWithHistogramAndKeyword() throws IOException { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("double")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -1138,7 +1168,7 @@ public void testWithKeywordAndDateHistogram() throws IOException { createDocument("long", 4L) ) ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -1167,7 +1197,7 @@ public void testWithKeywordAndDateHistogram() throws IOException { } ); - testSearchCase(new MatchAllDocsQuery(), null, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", Arrays.asList( @@ -1202,8 +1232,7 @@ public void testWithKeywordAndTopHits() throws Exception { createDocument("keyword", "c") ) ); - final Sort sort = new Sort(new SortedSetSortField("keyword", false)); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -1232,7 +1261,7 @@ public void testWithKeywordAndTopHits() throws Exception { } ); - testSearchCase(new MatchAllDocsQuery(), sort, dataset, + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> { TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") .field("keyword"); @@ -1257,36 +1286,174 @@ public void testWithKeywordAndTopHits() throws Exception { ); } - private void testSearchCase(Query query, Sort sort, + public void testWithTermsSubAggExecutionMode() throws Exception { + // test with no bucket + for (Aggregator.SubAggCollectionMode mode : Aggregator.SubAggCollectionMode.values()) { + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), + Collections.singletonList(createDocument()), + () -> { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") + .field("keyword"); + return new CompositeAggregationBuilder("name", Collections.singletonList(terms)) + .subAggregation( + new TermsAggregationBuilder("terms", ValueType.STRING) + .field("terms") + .collectMode(mode) + .subAggregation(new MaxAggregationBuilder("max").field("long")) + ); + }, (result) -> { + assertEquals(0, result.getBuckets().size()); + } + ); + } + + final List>> dataset = new ArrayList<>(); + dataset.addAll( + Arrays.asList( + createDocument("keyword", "a", "terms", "a", "long", 50L), + createDocument("keyword", "c", "terms", "d", "long", 78L), + createDocument("keyword", "a", "terms", "w", "long", 78L), + createDocument("keyword", "d", "terms", "y", "long", 76L), + createDocument("keyword", "c", "terms", "y", "long", 70L) + ) + ); + for (Aggregator.SubAggCollectionMode mode : Aggregator.SubAggCollectionMode.values()) { + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, + () -> { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") + .field("keyword"); + return new CompositeAggregationBuilder("name", Collections.singletonList(terms)) + .subAggregation( + new TermsAggregationBuilder("terms", ValueType.STRING) + .field("terms") + .collectMode(mode) + .subAggregation(new MaxAggregationBuilder("max").field("long")) + ); + }, (result) -> { + assertEquals(3, result.getBuckets().size()); + + assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + StringTerms subTerms = result.getBuckets().get(0).getAggregations().get("terms"); + assertEquals(2, subTerms.getBuckets().size()); + assertEquals("a", subTerms.getBuckets().get(0).getKeyAsString()); + assertEquals("w", subTerms.getBuckets().get(1).getKeyAsString()); + InternalMax max = subTerms.getBuckets().get(0).getAggregations().get("max"); + assertEquals(50L, (long) max.getValue()); + max = subTerms.getBuckets().get(1).getAggregations().get("max"); + assertEquals(78L, (long) max.getValue()); + + assertEquals("{keyword=c}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(1).getDocCount()); + subTerms = result.getBuckets().get(1).getAggregations().get("terms"); + assertEquals(2, subTerms.getBuckets().size()); + assertEquals("d", subTerms.getBuckets().get(0).getKeyAsString()); + assertEquals("y", subTerms.getBuckets().get(1).getKeyAsString()); + max = subTerms.getBuckets().get(0).getAggregations().get("max"); + assertEquals(78L, (long) max.getValue()); + max = subTerms.getBuckets().get(1).getAggregations().get("max"); + assertEquals(70L, (long) max.getValue()); + + assertEquals("{keyword=d}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(2).getDocCount()); + subTerms = result.getBuckets().get(2).getAggregations().get("terms"); + assertEquals(1, subTerms.getBuckets().size()); + assertEquals("y", subTerms.getBuckets().get(0).getKeyAsString()); + max = subTerms.getBuckets().get(0).getAggregations().get("max"); + assertEquals(76L, (long) max.getValue()); + } + ); + } + } + + public void testRandomStrings() throws IOException { + testRandomTerms("keyword", () -> randomAlphaOfLengthBetween(5, 50), (v) -> (String) v); + } + + public void testRandomLongs() throws IOException { + testRandomTerms("long", () -> randomLong(), (v) -> (long) v); + } + + public void testRandomInts() throws IOException { + testRandomTerms("price", () -> randomInt(), (v) -> ((Number) v).intValue()); + } + + private , V extends Comparable> void testRandomTerms(String field, + Supplier randomSupplier, + Function transformKey) throws IOException { + int numTerms = randomIntBetween(10, 500); + List terms = new ArrayList<>(); + for (int i = 0; i < numTerms; i++) { + terms.add(randomSupplier.get()); + } + int numDocs = randomIntBetween(100, 200); + List>> dataset = new ArrayList<>(); + + Set valuesSet = new HashSet<>(); + Map, AtomicLong> expectedDocCounts = new HashMap<> (); + for (int i = 0; i < numDocs; i++) { + int numValues = randomIntBetween(1, 5); + Set values = new HashSet<>(); + for (int j = 0; j < numValues; j++) { + int rand = randomIntBetween(0, terms.size() - 1); + if (values.add(terms.get(rand))) { + AtomicLong count = expectedDocCounts.computeIfAbsent(terms.get(rand), + (k) -> new AtomicLong(0)); + count.incrementAndGet(); + valuesSet.add(terms.get(rand)); + } + } + dataset.add(Collections.singletonMap(field, new ArrayList<>(values))); + } + List expected = new ArrayList<>(valuesSet); + Collections.sort(expected); + + List> seen = new ArrayList<>(); + AtomicBoolean finish = new AtomicBoolean(false); + int size = randomIntBetween(1, expected.size()); + while (finish.get() == false) { + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(field)), dataset, + () -> { + Map afterKey = null; + if (seen.size() > 0) { + afterKey = Collections.singletonMap(field, seen.get(seen.size()-1)); + } + TermsValuesSourceBuilder source = new TermsValuesSourceBuilder(field).field(field); + return new CompositeAggregationBuilder("name", Collections.singletonList(source)) + .subAggregation(new TopHitsAggregationBuilder("top_hits").storedField("_none_")) + .aggregateAfter(afterKey) + .size(size); + }, (result) -> { + if (result.getBuckets().size() == 0) { + finish.set(true); + } + for (InternalComposite.InternalBucket bucket : result.getBuckets()) { + V term = transformKey.apply(bucket.getKey().get(field)); + seen.add(term); + assertThat(bucket.getDocCount(), equalTo(expectedDocCounts.get(term).get())); + } + }); + } + assertEquals(expected, seen); + } + + private void testSearchCase(List queries, List>> dataset, Supplier create, Consumer verify) throws IOException { - executeTestCase(false, null, query, dataset, create, verify); - executeTestCase(true, null, query, dataset, create, verify); - if (sort != null) { - executeTestCase(false, sort, query, dataset, create, verify); - executeTestCase(true, sort, query, dataset, create, verify); + for (Query query : queries) { + executeTestCase(false, query, dataset, create, verify); + executeTestCase(true, query, dataset, create, verify); } } private void executeTestCase(boolean reduced, - Sort sort, Query query, List>> dataset, Supplier create, Consumer verify) throws IOException { - IndexSettings indexSettings = createIndexSettings(sort); try (Directory directory = newDirectory()) { - IndexWriterConfig config = LuceneTestCase.newIndexWriterConfig(random(), new MockAnalyzer(random())); - if (sort != null) { - config.setIndexSort(sort); - /** - * Forces the default codec because {@link CompositeValuesSourceBuilder#checkCanEarlyTerminate} - * cannot detect single-valued field with the asserting-codec. - **/ - config.setCodec(TestUtil.getDefaultCodec()); - } - try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { Document document = new Document(); for (Map> fields : dataset) { addToDocument(document, fields); @@ -1295,12 +1462,8 @@ private void executeTestCase(boolean reduced, } } try (IndexReader indexReader = DirectoryReader.open(directory)) { - IndexSearcher indexSearcher = newSearcher(indexReader, sort == null, sort == null); + IndexSearcher indexSearcher = new IndexSearcher(indexReader); CompositeAggregationBuilder aggregationBuilder = create.get(); - if (sort != null) { - CompositeAggregator aggregator = createAggregator(query, aggregationBuilder, indexSearcher, indexSettings, FIELD_TYPES); - assertTrue(aggregator.canEarlyTerminate()); - } final InternalComposite composite; if (reduced) { composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES); @@ -1312,31 +1475,22 @@ private void executeTestCase(boolean reduced, } } - private static IndexSettings createIndexSettings(Sort sort) { - Settings.Builder builder = Settings.builder(); - if (sort != null) { - String[] fields = Arrays.stream(sort.getSort()) - .map(SortField::getField) - .toArray(String[]::new); - String[] orders = Arrays.stream(sort.getSort()) - .map((o) -> o.getReverse() ? "desc" : "asc") - .toArray(String[]::new); - builder.putList("index.sort.field", fields); - builder.putList("index.sort.order", orders); - } - return IndexSettingsModule.newIndexSettings(new Index("_index", "0"), builder.build()); - } - private void addToDocument(Document doc, Map> keys) { for (Map.Entry> entry : keys.entrySet()) { final String name = entry.getKey(); for (Object value : entry.getValue()) { - if (value instanceof Long) { + if (value instanceof Integer) { + doc.add(new SortedNumericDocValuesField(name, (int) value)); + doc.add(new IntPoint(name, (int) value)); + } else if (value instanceof Long) { doc.add(new SortedNumericDocValuesField(name, (long) value)); + doc.add(new LongPoint(name, (long) value)); } else if (value instanceof Double) { doc.add(new SortedNumericDocValuesField(name, NumericUtils.doubleToSortableLong((double) value))); + doc.add(new DoublePoint(name, (double) value)); } else if (value instanceof String) { doc.add(new SortedSetDocValuesField(name, new BytesRef((String) value))); + doc.add(new StringField(name, new BytesRef((String) value), Field.Store.NO)); } else { throw new AssertionError("invalid object: " + value.getClass().getSimpleName()); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java new file mode 100644 index 0000000000000..122c8185e2e70 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java @@ -0,0 +1,330 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.analysis.core.KeywordAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.fielddata.FieldData; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.LeafBucketCollector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE; +import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.LONG; +import static org.hamcrest.Matchers.equalTo; + +public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { + static class ClassAndName { + final MappedFieldType fieldType; + final Class> clazz; + + ClassAndName(MappedFieldType fieldType, Class> clazz) { + this.fieldType = fieldType; + this.clazz = clazz; + } + } + + public void testRandomLong() throws IOException { + testRandomCase(new ClassAndName(createNumber("long", LONG) , Long.class)); + } + + public void testRandomDouble() throws IOException { + testRandomCase(new ClassAndName(createNumber("double", DOUBLE) , Double.class)); + } + + public void testRandomDoubleAndLong() throws IOException { + testRandomCase(new ClassAndName(createNumber("double", DOUBLE), Double.class), + new ClassAndName(createNumber("long", LONG), Long.class)); + } + + public void testRandomDoubleAndKeyword() throws IOException { + testRandomCase(new ClassAndName(createNumber("double", DOUBLE), Double.class), + new ClassAndName(createKeyword("keyword"), BytesRef.class)); + } + + public void testRandomKeyword() throws IOException { + testRandomCase(new ClassAndName(createKeyword("keyword"), BytesRef.class)); + } + + public void testRandomLongAndKeyword() throws IOException { + testRandomCase(new ClassAndName(createNumber("long", LONG), Long.class), + new ClassAndName(createKeyword("keyword"), BytesRef.class)); + } + + public void testRandomLongAndDouble() throws IOException { + testRandomCase(new ClassAndName(createNumber("long", LONG), Long.class), + new ClassAndName(createNumber("double", DOUBLE) , Double.class)); + } + + public void testRandomKeywordAndLong() throws IOException { + testRandomCase(new ClassAndName(createKeyword("keyword"), BytesRef.class), + new ClassAndName(createNumber("long", LONG), Long.class)); + } + + public void testRandomKeywordAndDouble() throws IOException { + testRandomCase(new ClassAndName(createKeyword("keyword"), BytesRef.class), + new ClassAndName(createNumber("double", DOUBLE), Double.class)); + } + + public void testRandom() throws IOException { + int numTypes = randomIntBetween(3, 8); + ClassAndName[] types = new ClassAndName[numTypes]; + for (int i = 0; i < numTypes; i++) { + int rand = randomIntBetween(0, 2); + switch (rand) { + case 0: + types[i] = new ClassAndName(createNumber(Integer.toString(i), LONG), Long.class); + break; + case 1: + types[i] = new ClassAndName(createNumber(Integer.toString(i), DOUBLE), Double.class); + break; + case 2: + types[i] = new ClassAndName(createKeyword(Integer.toString(i)), BytesRef.class); + break; + default: + assert(false); + } + } + testRandomCase(true, types); + } + + private void testRandomCase(ClassAndName... types) throws IOException { + testRandomCase(true, types); + testRandomCase(false, types); + } + + private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IOException { + final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE; + int numDocs = randomIntBetween(50, 100); + List[]> possibleValues = new ArrayList<>(); + for (ClassAndName type : types) { + int numValues = randomIntBetween(1, numDocs*2); + Comparable[] values = new Comparable[numValues]; + if (type.clazz == Long.class) { + for (int i = 0; i < numValues; i++) { + values[i] = randomLong(); + } + } else if (type.clazz == Double.class) { + for (int i = 0; i < numValues; i++) { + values[i] = randomDouble(); + } + } else if (type.clazz == BytesRef.class) { + for (int i = 0; i < numValues; i++) { + values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50)); + } + } else { + assert(false); + } + possibleValues.add(values); + } + + Set keys = new HashSet<>(); + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, new KeywordAnalyzer())) { + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + List>> docValues = new ArrayList<>(); + boolean hasAllField = true; + for (int j = 0; j < types.length; j++) { + int numValues = randomIntBetween(0, 5); + if (numValues == 0) { + hasAllField = false; + } + List> values = new ArrayList<>(); + for (int k = 0; k < numValues; k++) { + values.add(possibleValues.get(j)[randomIntBetween(0, possibleValues.get(j).length-1)]); + if (types[j].clazz == Long.class) { + long value = (Long) values.get(k); + document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), value)); + document.add(new LongPoint(types[j].fieldType.name(), value)); + } else if (types[j].clazz == Double.class) { + document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), + NumericUtils.doubleToSortableLong((Double) values.get(k)))); + } else if (types[j].clazz == BytesRef.class) { + BytesRef value = (BytesRef) values.get(k); + document.add(new SortedSetDocValuesField(types[j].fieldType.name(), (BytesRef) values.get(k))); + document.add(new TextField(types[j].fieldType.name(), value.utf8ToString(), Field.Store.NO)); + } else { + assert(false); + } + } + docValues.add(values); + } + if (hasAllField) { + List comb = createListCombinations(docValues); + keys.addAll(comb); + } + indexWriter.addDocument(document); + } + if (forceMerge) { + indexWriter.forceMerge(1); + } + } + IndexReader reader = DirectoryReader.open(directory); + int size = randomIntBetween(1, keys.size()); + SingleDimensionValuesSource[] sources = new SingleDimensionValuesSource[types.length]; + for (int i = 0; i < types.length; i++) { + final MappedFieldType fieldType = types[i].fieldType; + if (types[i].clazz == Long.class) { + sources[i] = new LongValuesSource(bigArrays, fieldType, + context -> context.reader().getSortedNumericDocValues(fieldType.name()), value -> value, + DocValueFormat.RAW, size, 1); + } else if (types[i].clazz == Double.class) { + sources[i] = new DoubleValuesSource(bigArrays, fieldType, + context -> FieldData.sortableLongBitsToDoubles(context.reader().getSortedNumericDocValues(fieldType.name())), + size, 1); + } else if (types[i].clazz == BytesRef.class) { + if (forceMerge) { + // we don't create global ordinals but we test this mode when the reader has a single segment + // since ordinals are global in this case. + sources[i] = new GlobalOrdinalValuesSource(bigArrays, fieldType, + context -> context.reader().getSortedSetDocValues(fieldType.name()), size, 1); + } else { + sources[i] = new BinaryValuesSource(fieldType, + context -> FieldData.toString(context.reader().getSortedSetDocValues(fieldType.name())), size, 1); + } + } else { + assert(false); + } + } + CompositeKey[] expected = keys.toArray(new CompositeKey[0]); + Arrays.sort(expected, (a, b) -> compareKey(a, b)); + CompositeValuesCollectorQueue queue = new CompositeValuesCollectorQueue(sources, size); + final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()); + for (boolean withProducer : new boolean[] {true, false}) { + if (withProducer && docsProducer == null) { + continue; + } + int pos = 0; + CompositeKey last = null; + while (pos < size) { + queue.clear(); + if (last != null) { + queue.setAfter(last.values()); + } + + for (LeafReaderContext leafReaderContext : reader.leaves()) { + final LeafBucketCollector leafCollector = new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + queue.addIfCompetitive(); + } + }; + if (withProducer) { + assertEquals(DocIdSet.EMPTY, + docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false)); + } else { + final LeafBucketCollector queueCollector = queue.getLeafCollector(leafReaderContext, leafCollector); + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + for (int i = 0; i < leafReaderContext.reader().maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + queueCollector.collect(i); + } + } + } + } + assertEquals(size, Math.min(queue.size(), expected.length - pos)); + int ptr = 0; + for (int slot : queue.getSortedSlot()) { + CompositeKey key = queue.toCompositeKey(slot); + assertThat(key, equalTo(expected[ptr++])); + last = key; + } + pos += queue.size(); + } + } + reader.close(); + } + } + + private static MappedFieldType createNumber(String name, NumberFieldMapper.NumberType type) { + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(type); + fieldType.setIndexOptions(IndexOptions.DOCS); + fieldType.setName(name); + fieldType.setHasDocValues(true); + fieldType.freeze(); + return fieldType; + } + + private static MappedFieldType createKeyword(String name) { + MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType(); + fieldType.setIndexOptions(IndexOptions.DOCS); + fieldType.setName(name); + fieldType.setHasDocValues(true); + fieldType.freeze(); + return fieldType; + } + + private static int compareKey(CompositeKey key1, CompositeKey key2) { + assert key1.size() == key2.size(); + for (int i = 0; i < key1.size(); i++) { + Comparable cmp1 = (Comparable) key1.get(i); + int cmp = cmp1.compareTo(key2.get(i)); + if (cmp != 0) { + return cmp; + } + } + return 0; + } + + private static List createListCombinations(List>> values) { + List keys = new ArrayList<>(); + createListCombinations(new Comparable[values.size()], values, 0, values.size(), keys); + return keys; + } + + private static void createListCombinations(Comparable[] key, List>> values, + int pos, int maxPos, List keys) { + if (pos == maxPos) { + keys.add(new CompositeKey(key.clone())); + } else { + for (Comparable val : values.get(pos)) { + key[pos] = val; + createListCombinations(key, values, pos + 1, maxPos, keys); + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java new file mode 100644 index 0000000000000..2fd14fe6b697d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java @@ -0,0 +1,106 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.test.ESTestCase; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SingleDimensionValuesSourceTests extends ESTestCase { + public void testBinarySorted() { + MappedFieldType keyword = new KeywordFieldMapper.KeywordFieldType(); + keyword.setName("keyword"); + BinaryValuesSource source = new BinaryValuesSource(keyword, context -> null, 1, 1); + assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + IndexReader reader = mockIndexReader(1, 1); + assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); + assertNotNull(source.createSortedDocsProducerOrNull(reader, null)); + assertNull(source.createSortedDocsProducerOrNull(reader, + new TermQuery(new Term("keyword", "toto)")))); + source = new BinaryValuesSource(keyword, context -> null, 0, -1); + assertNull(source.createSortedDocsProducerOrNull(reader, null)); + } + + public void testGlobalOrdinalsSorted() { + MappedFieldType keyword = new KeywordFieldMapper.KeywordFieldType(); + keyword.setName("keyword"); + BinaryValuesSource source = new BinaryValuesSource(keyword, context -> null, 1, 1); + assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + IndexReader reader = mockIndexReader(1, 1); + assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); + assertNotNull(source.createSortedDocsProducerOrNull(reader, null)); + assertNull(source.createSortedDocsProducerOrNull(reader, + new TermQuery(new Term("keyword", "toto)")))); + source = new BinaryValuesSource(keyword, context -> null, 1, -1); + assertNull(source.createSortedDocsProducerOrNull(reader, null)); + } + + public void testNumericSorted() { + for (NumberFieldMapper.NumberType numberType : NumberFieldMapper.NumberType.values()) { + MappedFieldType number = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + number.setName("number"); + final SingleDimensionValuesSource source; + if (numberType == NumberFieldMapper.NumberType.BYTE || + numberType == NumberFieldMapper.NumberType.SHORT || + numberType == NumberFieldMapper.NumberType.INTEGER || + numberType == NumberFieldMapper.NumberType.LONG) { + source = new LongValuesSource(BigArrays.NON_RECYCLING_INSTANCE, + number, context -> null, value -> value, DocValueFormat.RAW, 1, 1); + assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + IndexReader reader = mockIndexReader(1, 1); + assertNotNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); + assertNotNull(source.createSortedDocsProducerOrNull(reader, null)); + assertNotNull(source.createSortedDocsProducerOrNull(reader, LongPoint.newRangeQuery("number", 0, 1))); + assertNull(source.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)")))); + LongValuesSource sourceRev = + new LongValuesSource(BigArrays.NON_RECYCLING_INSTANCE, + number, context -> null, value -> value, DocValueFormat.RAW, 1, -1); + assertNull(sourceRev.createSortedDocsProducerOrNull(reader, null)); + } else if (numberType == NumberFieldMapper.NumberType.HALF_FLOAT || + numberType == NumberFieldMapper.NumberType.FLOAT || + numberType == NumberFieldMapper.NumberType.DOUBLE) { + source = new DoubleValuesSource(BigArrays.NON_RECYCLING_INSTANCE, + number, context -> null, 1, 1); + } else{ + throw new AssertionError ("missing type:" + numberType.typeName()); + } + assertNull(source.createSortedDocsProducerOrNull(mockIndexReader(100, 49), null)); + } + } + + private static IndexReader mockIndexReader(int maxDoc, int numDocs) { + IndexReader reader = mock(IndexReader.class); + when(reader.hasDeletions()).thenReturn(maxDoc - numDocs > 0); + when(reader.maxDoc()).thenReturn(maxDoc); + when(reader.numDocs()).thenReturn(numDocs); + return reader; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 720d701e64ced..1940c82438839 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -291,7 +291,6 @@ protected A search(IndexSe A internalAgg = (A) a.buildAggregation(0L); InternalAggregationTestCase.assertMultiBucketConsumer(internalAgg, bucketConsumer); return internalAgg; - } protected A searchAndReduce(IndexSearcher searcher,