Skip to content

Commit

Permalink
Add test to exercise reduction of terms aggregation order by key and …
Browse files Browse the repository at this point in the history
…fix pruning bug (elastic#106799)

We are not computing the otherDocCounts properly as we are exiting the iteration too early so we are not counting the 
pruned buckets. This commit make sure we are counting all buckets.
  • Loading branch information
iverase authored Mar 27, 2024
1 parent a2af99c commit 86b24ab
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 20 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/106799.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106799
summary: Add test to exercise reduction of terms aggregation order by key
area: Aggregations
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
Expand Down Expand Up @@ -1287,4 +1288,92 @@ public void testScriptWithValueType() throws Exception {
assertThat(ex.getCause().getMessage(), containsString("Unknown value type [foobar]"));
}
}

public void testOrderByKey() throws Exception {
Map<String, long[]> data = new HashMap<>();
for (int i = 0; i < 5; i++) {
assertAcked(
indicesAdmin().prepareCreate("idx" + i).setMapping(SINGLE_VALUED_FIELD_NAME, "type=keyword", "filter", "type=boolean")
);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int j = 0; j < 100; j++) {
String val = "val" + random().nextInt(1000);
boolean filter = randomBoolean();
long[] counter = data.computeIfAbsent(val, s -> new long[] { 0 });
if (filter == false) {
counter[0]++;
}
builders.add(
prepareIndex("idx" + i).setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, val).field("filter", filter).endObject()
)
);
}
indexRandom(true, builders);
}
List<String> allKeys = new ArrayList<>(data.keySet());
List<String> keysMinDocCount1 = allKeys.stream().filter(key -> data.get(key)[0] > 0).collect(Collectors.toList());
List<String> keysMinDocCount2 = allKeys.stream().filter(key -> data.get(key)[0] > 1).collect(Collectors.toList());
// test for different batch sizes to exercise partial reduces
for (int batchReduceSize = 2; batchReduceSize < 6; batchReduceSize++) {
// with min_doc_count = 0
allKeys.sort(String::compareTo);
assertOrderByKeyResponse(allKeys, data, true, 0, batchReduceSize);
Collections.reverse(allKeys);
assertOrderByKeyResponse(allKeys, data, false, 0, batchReduceSize);
// with min_doc_count = 1
keysMinDocCount1.sort(String::compareTo);
assertOrderByKeyResponse(keysMinDocCount1, data, true, 1, batchReduceSize);
Collections.reverse(keysMinDocCount1);
assertOrderByKeyResponse(keysMinDocCount1, data, false, 1, batchReduceSize);
// with min_doc_count = 2
keysMinDocCount2.sort(String::compareTo);
assertOrderByKeyResponse(keysMinDocCount2, data, true, 2, batchReduceSize);
Collections.reverse(keysMinDocCount2);
assertOrderByKeyResponse(keysMinDocCount2, data, false, 2, batchReduceSize);
}
for (int i = 0; i < 5; i++) {
assertAcked(indicesAdmin().prepareDelete("idx" + i));
}
}

private void assertOrderByKeyResponse(
List<String> keys,
Map<String, long[]> counts,
boolean asc,
int minDocCount,
int batchReduceSize
) {
int size = randomIntBetween(1, keys.size());
long sumOtherCount = 0;
for (int i = size; i < keys.size(); i++) {
sumOtherCount += counts.get(keys.get(i))[0];
}
final long finalSumOtherCount = sumOtherCount;
assertNoFailuresAndResponse(
prepareSearch("idx0", "idx1", "idx2", "idx3", "idx4").setBatchedReduceSize(batchReduceSize)
.setQuery(QueryBuilders.termQuery("filter", false))
.addAggregation(
new TermsAggregationBuilder("terms").field(SINGLE_VALUED_FIELD_NAME)
.size(size)
.shardSize(500)
.minDocCount(minDocCount)
.order(BucketOrder.key(asc))
),
response -> {
StringTerms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
assertThat(terms.getBuckets().size(), equalTo(size));
assertThat(terms.getSumOfOtherDocCounts(), equalTo(finalSumOtherCount));

for (int i = 0; i < size; i++) {
StringTerms.Bucket bucket = terms.getBuckets().get(i);
assertThat(bucket, notNullValue());
assertThat(bucket.getKeyAsString(), equalTo(keys.get(i)));
assertThat(bucket.getDocCount(), equalTo(counts.get(keys.get(i))[0]));
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Consumer;

import static org.elasticsearch.search.aggregations.InternalOrder.isKeyAsc;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
Expand Down Expand Up @@ -153,7 +153,7 @@ private long getDocCountError(A terms) {
private BucketOrder reduceBuckets(
List<InternalAggregation> aggregations,
AggregationReduceContext reduceContext,
Function<DelayedBucket<B>, Boolean> sink
Consumer<DelayedBucket<B>> sink
) {
/*
* Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}.
Expand All @@ -176,7 +176,7 @@ private void reduceMergeSort(
List<InternalAggregation> aggregations,
BucketOrder thisReduceOrder,
AggregationReduceContext reduceContext,
Function<DelayedBucket<B>, Boolean> sink
Consumer<DelayedBucket<B>> sink
) {
assert isKeyOrder(thisReduceOrder);
final Comparator<Bucket> cmp = thisReduceOrder.comparator();
Expand All @@ -201,12 +201,7 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0;
if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
// the key changed so bundle up the last key's worth of buckets
boolean shouldContinue = sink.apply(
new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)
);
if (false == shouldContinue) {
return;
}
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
sameTermBuckets = new ArrayList<>();
}
lastBucket = top.current();
Expand All @@ -226,14 +221,14 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
}

if (sameTermBuckets.isEmpty() == false) {
sink.apply(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
}
}

private void reduceLegacy(
List<InternalAggregation> aggregations,
AggregationReduceContext reduceContext,
Function<DelayedBucket<B>, Boolean> sink
Consumer<DelayedBucket<B>> sink
) {
Map<Object, List<B>> bucketMap = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
Expand All @@ -246,12 +241,7 @@ private void reduceLegacy(
}
}
for (List<B> sameTermBuckets : bucketMap.values()) {
boolean shouldContinue = sink.apply(
new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)
);
if (false == shouldContinue) {
return;
}
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
}
}

Expand Down Expand Up @@ -304,7 +294,6 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Aggr
if (bucket.getDocCount() >= getMinDocCount()) {
top.add(bucket);
}
return true;
});
result = top.build();
} else {
Expand All @@ -316,8 +305,11 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Aggr
boolean canPrune = isKeyOrder(getOrder()) && getMinDocCount() == 0;
result = new ArrayList<>();
thisReduceOrder = reduceBuckets(aggregations, reduceContext, bucket -> {
result.add(bucket.reduced());
return false == canPrune || result.size() < getRequiredSize();
if (canPrune == false || result.size() < getRequiredSize()) {
result.add(bucket.reduced());
} else {
otherDocCount[0] += bucket.getDocCount();
}
});
}
for (B r : result) {
Expand Down

0 comments on commit 86b24ab

Please sign in to comment.