diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java index d145e32c45b3e..17b50fa9bef5f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java @@ -31,27 +31,21 @@ private BucketUtils() {} * * @param finalSize * The number of terms required in the final reduce phase. - * @param numberOfShards - * The number of shards being queried. + * @param singleShard + * whether a single shard is being queried, or multiple shards * @return A suggested default for the size of any shard-side PriorityQueues */ - public static int suggestShardSideQueueSize(int finalSize, int numberOfShards) { + public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) { if (finalSize < 1) { throw new IllegalArgumentException("size must be positive, got " + finalSize); } - if (numberOfShards < 1) { - throw new IllegalArgumentException("number of shards must be positive, got " + numberOfShards); - } - - if (numberOfShards == 1) { + if (singleShard) { // In the case of a single shard, we do not need to over-request return finalSize; } - // Request 50% more buckets on the shards in order to improve accuracy // as well as a small constant that should help with small values of 'size' final long shardSampleSize = (long) (finalSize * 1.5 + 10); return (int) Math.min(Integer.MAX_VALUE, shardSampleSize); } - } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java index 569845fcdf0e4..353f391f213d6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregationBuilder.java @@ -157,7 +157,7 @@ public int shardSize() { if (shardSize < 0) { // Use default heuristic to avoid any wrong-ranking caused by // distributed counting - shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards()); + shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1); } if (requiredSize <= 0 || shardSize <= 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java index df1bd115e2bfc..d612014e0177f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTermsAggregatorFactory.java @@ -195,7 +195,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare // such are impossible to differentiate from non-significant terms // at that early stage. bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), - context.numberOfShards())); + context.numberOfShards() == 1)); } if (valuesSource instanceof ValuesSource.Bytes) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java index ea9a8a91aea9e..a51a33defdd00 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantTextAggregatorFactory.java @@ -176,7 +176,7 @@ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingl // such are impossible to differentiate from non-significant terms // at that early stage. bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), - context.numberOfShards())); + context.numberOfShards() == 1)); } // TODO - need to check with mapping that this is indeed a text field.... diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index a6481b58ca499..cc2719e5b9678 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -122,7 +122,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare // heuristic to avoid any wrong-ranking caused by distributed // counting bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(), - context.numberOfShards())); + context.numberOfShards() == 1)); } bucketCountThresholds.ensureValidity(); if (valuesSource instanceof ValuesSource.Bytes) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java index aa9068b651e9b..35f3175f7cfe5 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketUtilsTests.java @@ -27,18 +27,14 @@ public class BucketUtilsTests extends ESTestCase { public void testBadInput() { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> BucketUtils.suggestShardSideQueueSize(0, 10)); + () -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean())); assertEquals(e.getMessage(), "size must be positive, got 0"); - - e = expectThrows(IllegalArgumentException.class, - () -> BucketUtils.suggestShardSideQueueSize(10, 0)); - assertEquals(e.getMessage(), "number of shards must be positive, got 0"); } public void testOptimizesSingleShard() { for (int iter = 0; iter < 10; ++iter) { final int size = randomIntBetween(1, Integer.MAX_VALUE); - assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, 1)); + assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true)); } } @@ -46,7 +42,7 @@ public void testOverFlow() { for (int iter = 0; iter < 10; ++iter) { final int size = Integer.MAX_VALUE - randomInt(10); final int numberOfShards = randomIntBetween(1, 10); - final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards); + final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1); assertThat(shardSize, greaterThanOrEqualTo(shardSize)); } } @@ -55,7 +51,7 @@ public void testShardSizeIsGreaterThanGlobalSize() { for (int iter = 0; iter < 10; ++iter) { final int size = randomIntBetween(1, Integer.MAX_VALUE); final int numberOfShards = randomIntBetween(1, 10); - final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards); + final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1); assertThat(shardSize, greaterThanOrEqualTo(size)); } }