From 86b24ab19a21da1cad1fe7f188527783cf12eca7 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Wed, 27 Mar 2024 14:18:33 +0100 Subject: [PATCH] Add test to exercise reduction of terms aggregation order by key and fix pruning bug (#106799) We are not computing the otherDocCounts properly as we are exiting the iteration too early so we are not counting the pruned buckets. This commit make sure we are counting all buckets. --- docs/changelog/106799.yaml | 5 ++ .../bucket/terms/StringTermsIT.java | 89 +++++++++++++++++++ .../bucket/terms/AbstractInternalTerms.java | 32 +++---- 3 files changed, 106 insertions(+), 20 deletions(-) create mode 100644 docs/changelog/106799.yaml diff --git a/docs/changelog/106799.yaml b/docs/changelog/106799.yaml new file mode 100644 index 0000000000000..c75cd5c15e44b --- /dev/null +++ b/docs/changelog/106799.yaml @@ -0,0 +1,5 @@ +pr: 106799 +summary: Add test to exercise reduction of terms aggregation order by key +area: Aggregations +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsIT.java index 8a2071584b4a0..1b2d66fc12c76 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsIT.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.search.aggregations.AggregationBuilders.avg; @@ -1287,4 +1288,92 @@ public void testScriptWithValueType() throws Exception { assertThat(ex.getCause().getMessage(), containsString("Unknown value type [foobar]")); } } + + public void testOrderByKey() throws Exception { + Map data = new HashMap<>(); + for (int i = 0; i < 5; i++) { + assertAcked( + indicesAdmin().prepareCreate("idx" + i).setMapping(SINGLE_VALUED_FIELD_NAME, "type=keyword", "filter", "type=boolean") + ); + List builders = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + String val = "val" + random().nextInt(1000); + boolean filter = randomBoolean(); + long[] counter = data.computeIfAbsent(val, s -> new long[] { 0 }); + if (filter == false) { + counter[0]++; + } + builders.add( + prepareIndex("idx" + i).setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, val).field("filter", filter).endObject() + ) + ); + } + indexRandom(true, builders); + } + List allKeys = new ArrayList<>(data.keySet()); + List keysMinDocCount1 = allKeys.stream().filter(key -> data.get(key)[0] > 0).collect(Collectors.toList()); + List keysMinDocCount2 = allKeys.stream().filter(key -> data.get(key)[0] > 1).collect(Collectors.toList()); + // test for different batch sizes to exercise partial reduces + for (int batchReduceSize = 2; batchReduceSize < 6; batchReduceSize++) { + // with min_doc_count = 0 + allKeys.sort(String::compareTo); + assertOrderByKeyResponse(allKeys, data, true, 0, batchReduceSize); + Collections.reverse(allKeys); + assertOrderByKeyResponse(allKeys, data, false, 0, batchReduceSize); + // with min_doc_count = 1 + keysMinDocCount1.sort(String::compareTo); + assertOrderByKeyResponse(keysMinDocCount1, data, true, 1, batchReduceSize); + Collections.reverse(keysMinDocCount1); + assertOrderByKeyResponse(keysMinDocCount1, data, false, 1, batchReduceSize); + // with min_doc_count = 2 + keysMinDocCount2.sort(String::compareTo); + assertOrderByKeyResponse(keysMinDocCount2, data, true, 2, batchReduceSize); + Collections.reverse(keysMinDocCount2); + assertOrderByKeyResponse(keysMinDocCount2, data, false, 2, batchReduceSize); + } + for (int i = 0; i < 5; i++) { + assertAcked(indicesAdmin().prepareDelete("idx" + i)); + } + } + + private void assertOrderByKeyResponse( + List keys, + Map counts, + boolean asc, + int minDocCount, + int batchReduceSize + ) { + int size = randomIntBetween(1, keys.size()); + long sumOtherCount = 0; + for (int i = size; i < keys.size(); i++) { + sumOtherCount += counts.get(keys.get(i))[0]; + } + final long finalSumOtherCount = sumOtherCount; + assertNoFailuresAndResponse( + prepareSearch("idx0", "idx1", "idx2", "idx3", "idx4").setBatchedReduceSize(batchReduceSize) + .setQuery(QueryBuilders.termQuery("filter", false)) + .addAggregation( + new TermsAggregationBuilder("terms").field(SINGLE_VALUED_FIELD_NAME) + .size(size) + .shardSize(500) + .minDocCount(minDocCount) + .order(BucketOrder.key(asc)) + ), + response -> { + StringTerms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.getBuckets().size(), equalTo(size)); + assertThat(terms.getSumOfOtherDocCounts(), equalTo(finalSumOtherCount)); + + for (int i = 0; i < size; i++) { + StringTerms.Bucket bucket = terms.getBuckets().get(i); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKeyAsString(), equalTo(keys.get(i))); + assertThat(bucket.getDocCount(), equalTo(counts.get(keys.get(i))[0])); + } + } + ); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java index af966963e43fc..98f5741ad7440 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; +import java.util.function.Consumer; import static org.elasticsearch.search.aggregations.InternalOrder.isKeyAsc; import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder; @@ -153,7 +153,7 @@ private long getDocCountError(A terms) { private BucketOrder reduceBuckets( List aggregations, AggregationReduceContext reduceContext, - Function, Boolean> sink + Consumer> sink ) { /* * Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}. @@ -176,7 +176,7 @@ private void reduceMergeSort( List aggregations, BucketOrder thisReduceOrder, AggregationReduceContext reduceContext, - Function, Boolean> sink + Consumer> sink ) { assert isKeyOrder(thisReduceOrder); final Comparator cmp = thisReduceOrder.comparator(); @@ -201,12 +201,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0; if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) { // the key changed so bundle up the last key's worth of buckets - boolean shouldContinue = sink.apply( - new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets) - ); - if (false == shouldContinue) { - return; - } + sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)); sameTermBuckets = new ArrayList<>(); } lastBucket = top.current(); @@ -226,14 +221,14 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { } if (sameTermBuckets.isEmpty() == false) { - sink.apply(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)); + sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)); } } private void reduceLegacy( List aggregations, AggregationReduceContext reduceContext, - Function, Boolean> sink + Consumer> sink ) { Map> bucketMap = new HashMap<>(); for (InternalAggregation aggregation : aggregations) { @@ -246,12 +241,7 @@ private void reduceLegacy( } } for (List sameTermBuckets : bucketMap.values()) { - boolean shouldContinue = sink.apply( - new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets) - ); - if (false == shouldContinue) { - return; - } + sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)); } } @@ -304,7 +294,6 @@ public InternalAggregation doReduce(List aggregations, Aggr if (bucket.getDocCount() >= getMinDocCount()) { top.add(bucket); } - return true; }); result = top.build(); } else { @@ -316,8 +305,11 @@ public InternalAggregation doReduce(List aggregations, Aggr boolean canPrune = isKeyOrder(getOrder()) && getMinDocCount() == 0; result = new ArrayList<>(); thisReduceOrder = reduceBuckets(aggregations, reduceContext, bucket -> { - result.add(bucket.reduced()); - return false == canPrune || result.size() < getRequiredSize(); + if (canPrune == false || result.size() < getRequiredSize()) { + result.add(bucket.reduced()); + } else { + otherDocCount[0] += bucket.getDocCount(); + } }); } for (B r : result) {