Skip to content

Commit

Permalink
Fix composite early termination on sorted
Browse files Browse the repository at this point in the history
I broke composite early termination when reworking how aggregations'
contact for `getLeafCollector` around early termination in elastic#70320. We
didn't see it in our tests because we weren't properly emulating the
aggregation collection stage. This fixes early termination by adhering
to the new contract and adds more tests.

Closes elastic#72078

Co-authored-by: Benjamin Trent <[email protected]>
  • Loading branch information
nik9000 and benwtrent committed Apr 22, 2021
1 parent 9129dcd commit 50a145a
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
---
setup:
- do:
indices.create:
index: sorted
body:
mappings:
properties:
date:
type: date
kw:
type: keyword
settings:
index:
number_of_shards: 1
number_of_replicas: 0
sort:
field: date
order: desc

# Index single documents at a time and refresh. That'll create as many
# segments as possible which has revealed bugs in the past.
- do:
index:
index: sorted
body: {"date": "2021-01-01T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-02T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-03T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-04T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-05T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-06T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-07T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-08T00:00:00Z", "kw": "cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-09T00:00:00Z", "kw": "not a cat"}
refresh: true
- do:
index:
index: sorted
body: {"date": "2021-01-10T00:00:00Z", "kw": "also not a cat"}
refresh: true

---
one source - first page:
- do:
search:
index: sorted
body:
size: 0
aggs:
c:
composite:
size: 2
sources:
- date:
date_histogram:
field: date
calendar_interval: day
- length: { aggregations.c.buckets: 2 }
- match: { aggregations.c.buckets.0.key.date: 1609459200000 }
- match: { aggregations.c.buckets.0.doc_count: 1 }
- match: { aggregations.c.buckets.1.key.date: 1609545600000 }
- match: { aggregations.c.buckets.1.doc_count: 1 }

---
one source - second page:
- do:
search:
index: sorted
body:
size: 0
aggs:
c:
composite:
size: 2
sources:
- date:
date_histogram:
field: date
calendar_interval: day
after:
date: 1609545600000
- length: { aggregations.c.buckets: 2 }
- match: { aggregations.c.buckets.0.key.date: 1609632000000 }
- match: { aggregations.c.buckets.0.doc_count: 1 }
- match: { aggregations.c.buckets.1.key.date: 1609718400000 }
- match: { aggregations.c.buckets.1.doc_count: 1 }

---
two sources - first page:
- do:
search:
index: sorted
body:
size: 0
aggs:
c:
composite:
size: 2
sources:
- date:
date_histogram:
field: date
calendar_interval: day
- kw:
terms:
field: kw
- length: { aggregations.c.buckets: 2 }
- match: { aggregations.c.buckets.0.key.date: 1609459200000 }
- match: { aggregations.c.buckets.0.key.kw: 1609459200000 }
- match: { aggregations.c.buckets.0.doc_count: 1 }
- match: { aggregations.c.buckets.1.key.date: 1609545600000 }
- match: { aggregations.c.buckets.0.key.kw: 1609459200000 }
- match: { aggregations.c.buckets.1.doc_count: 1 }

---
two sources - second page:
- do:
search:
index: sorted
body:
size: 0
aggs:
c:
composite:
size: 2
sources:
- date:
date_histogram:
field: date
calendar_interval: day
- kw:
terms:
field: kw
after:
date: 1609545600000
kw: cat
- length: { aggregations.c.buckets: 2 }
- match: { aggregations.c.buckets.0.key.date: 1609632000000 }
- match: { aggregations.c.buckets.0.doc_count: 1 }
- match: { aggregations.c.buckets.1.key.date: 1609718400000 }
- match: { aggregations.c.buckets.1.doc_count: 1 }
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,22 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
// We have an after key and index sort is applicable so we jump directly to the doc
// that is after the index sort prefix using the rawAfterKey and we start collecting
// document from there.
processLeafFromQuery(ctx, indexSortPrefix);
try {
processLeafFromQuery(ctx, indexSortPrefix);
} catch (CollectionTerminatedException e) {
/*
* Signal that there isn't anything to collect. We're going
* to return noop collector anyway so we can ignore it.
*/
}
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
final LeafBucketCollector inner;
try {
inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
} catch (CollectionTerminatedException e) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
return new LeafBucketCollector() {
@Override
public void collect(int doc, long zeroBucket) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2657,7 +2657,8 @@ private static long asLong(String dateTime) {
private static Sort buildIndexSort(List<CompositeValuesSourceBuilder<?>> sources, Map<String, MappedFieldType> fieldTypes) {
List<SortField> sortFields = new ArrayList<>();
Map<String, MappedFieldType> remainingFieldTypes = new HashMap<>(fieldTypes);
for (CompositeValuesSourceBuilder<?> source : sources) {
List<CompositeValuesSourceBuilder<?>> sourcesToCreateSorts = randomBoolean() ? sources : sources.subList(0, 1);
for (CompositeValuesSourceBuilder<?> source : sourcesToCreateSorts) {
MappedFieldType type = fieldTypes.remove(source.field());
remainingFieldTypes.remove(source.field());
SortField sortField = sortFieldFrom(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
}
} else {
root.preCollection();
searcher.search(rewritten, root);
searcher.search(rewritten, MultiBucketCollector.wrap(true, List.of(root)));
root.postCollection();
aggs.add(root.buildTopLevel());
}
Expand Down

0 comments on commit 50a145a

Please sign in to comment.