Skip to content

Commit

Permalink
Fix wrong error upper bound when performing incremental reductions (e…
Browse files Browse the repository at this point in the history
…lastic#43874)

When performing incremental reductions, 0 value of docCountError may mean that
the error was not previously calculated, or that the error was indeed previously
calculated and its value was 0. We end up rejecting true values set to 0 this
way. This may lead to wrong upper bound of error in result. To fix it, this PR
makes docCountError nullable. null values mean that error was not calculated
yet.

Fixes elastic#40005

Co-authored-by: Igor Motov <[email protected]>
Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
3 people committed Aug 12, 2021
1 parent d3b853d commit b7cc6eb
Show file tree
Hide file tree
Showing 21 changed files with 100 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
true,
0,
buckets,
0
0L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private StringTerms newTerms(boolean withNested) {
false,
100000,
resultBuckets,
0
0L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ public void testSearchWithParentJoin() throws IOException {
assertEquals(Float.NaN, searchResponse.getHits().getMaxScore(), 0f);
assertEquals(1, searchResponse.getAggregations().asList().size());
Terms terms = searchResponse.getAggregations().get("top-tags");
assertEquals(0, terms.getDocCountError());
assertEquals(0, terms.getDocCountError().longValue());
assertEquals(0, terms.getSumOfOtherDocCounts());
assertEquals(3, terms.getBuckets().size());
for (Terms.Bucket bucket : terms.getBuckets()) {
Expand All @@ -589,7 +589,7 @@ public void testSearchWithParentJoin() throws IOException {
assertEquals(2, children.getDocCount());
assertEquals(1, children.getAggregations().asList().size());
Terms leafTerms = children.getAggregations().get("top-names");
assertEquals(0, leafTerms.getDocCountError());
assertEquals(0, leafTerms.getDocCountError().longValue());
assertEquals(0, leafTerms.getSumOfOtherDocCounts());
assertEquals(2, leafTerms.getBuckets().size());
assertEquals(2, leafTerms.getBuckets().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -89,8 +90,6 @@ public void setupSuiteScopeCluster() throws Exception {
.field(DOUBLE_FIELD_NAME, 1.0 * randomInt(numUniqueTerms))
.endObject()));
}
assertAcked(prepareCreate("idx_fixed_docs_0").addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard0DocsPerTerm = new HashMap<>();
shard0DocsPerTerm.put("A", 25);
shard0DocsPerTerm.put("B", 18);
Expand All @@ -102,16 +101,8 @@ public void setupSuiteScopeCluster() throws Exception {
shard0DocsPerTerm.put("H", 2);
shard0DocsPerTerm.put("I", 1);
shard0DocsPerTerm.put("J", 1);
for (Map.Entry<String, Integer> entry : shard0DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_0", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).endObject()));
}
}
buildIndex(shard0DocsPerTerm, "idx_fixed_docs_0", 0, builders);

assertAcked(prepareCreate("idx_fixed_docs_1").addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard1DocsPerTerm = new HashMap<>();
shard1DocsPerTerm.put("A", 30);
shard1DocsPerTerm.put("B", 25);
Expand All @@ -123,17 +114,8 @@ public void setupSuiteScopeCluster() throws Exception {
shard1DocsPerTerm.put("Q", 6);
shard1DocsPerTerm.put("J", 8);
shard1DocsPerTerm.put("C", 4);
for (Map.Entry<String, Integer> entry : shard1DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_1", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 1).endObject()));
}
}
buildIndex(shard1DocsPerTerm, "idx_fixed_docs_1", 1, builders);

assertAcked(prepareCreate("idx_fixed_docs_2")
.addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard2DocsPerTerm = new HashMap<>();
shard2DocsPerTerm.put("A", 45);
shard2DocsPerTerm.put("C", 44);
Expand All @@ -143,16 +125,46 @@ public void setupSuiteScopeCluster() throws Exception {
shard2DocsPerTerm.put("H", 28);
shard2DocsPerTerm.put("Q", 2);
shard2DocsPerTerm.put("D", 1);
for (Map.Entry<String, Integer> entry : shard2DocsPerTerm.entrySet()) {
buildIndex(shard2DocsPerTerm, "idx_fixed_docs_2", 2, builders);

Map<String, Integer> shard3DocsPerTerm = new HashMap<>();
shard3DocsPerTerm.put("A", 1);
shard3DocsPerTerm.put("B", 1);
shard3DocsPerTerm.put("C", 1);
buildIndex(shard3DocsPerTerm, "idx_fixed_docs_3", 3, builders);

Map<String, Integer> shard4DocsPerTerm = new HashMap<>();
shard4DocsPerTerm.put("K", 1);
shard4DocsPerTerm.put("L", 1);
shard4DocsPerTerm.put("M", 1);
buildIndex(shard4DocsPerTerm, "idx_fixed_docs_4", 4, builders);

Map<String, Integer> shard5DocsPerTerm = new HashMap<>();
shard5DocsPerTerm.put("X", 1);
shard5DocsPerTerm.put("Y", 1);
shard5DocsPerTerm.put("Z", 1);
buildIndex(shard5DocsPerTerm, "idx_fixed_docs_5", 5, builders);

indexRandom(true, builders);
ensureSearchable();
}

private void buildIndex(Map<String, Integer> docsPerTerm, String index, int shard, List<IndexRequestBuilder> builders)
throws IOException {
assertAcked(
prepareCreate(index).addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
);
for (Map.Entry<String, Integer> entry : docsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_2", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 2).endObject()));
builders.add(
client().prepareIndex(index, "type")
.setId(term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", shard).endObject())
);
}
}

indexRandom(true, builders);
ensureSearchable();
}

private void assertDocCountErrorWithinBounds(int size, SearchResponse accurateResponse, SearchResponse testResponse) {
Expand Down Expand Up @@ -1015,4 +1027,21 @@ public void testFixedDocs() throws Exception {
assertThat(bucket.getDocCountError(), equalTo(29L));
}

/**
* Tests the upper bounds are correct when performing incremental reductions
* See https://github.com/elastic/elasticsearch/issues/40005 for more details
*/
public void testIncrementalReduction() {
SearchResponse response = client().prepareSearch("idx_fixed_docs_3", "idx_fixed_docs_4", "idx_fixed_docs_5")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field(STRING_FIELD_NAME)
.showTermDocCountError(true)
.size(5).shardSize(5)
.collectMode(randomFrom(SubAggCollectionMode.values())))
.get();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms.getDocCountError(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract static class AbstractTermsBucket extends InternalMultiBucketAggr

protected abstract long getSumOfOtherDocCounts();

protected abstract long getDocCountError();
protected abstract Long getDocCountError();

protected abstract void setDocCountError(long docCountError);

Expand Down Expand Up @@ -133,7 +133,7 @@ private long getDocCountError(A terms) {
if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.getOrder())) {
return 0;
} else if (InternalOrder.isCountDesc(terms.getOrder())) {
if (terms.getDocCountError() > 0) {
if (terms.getDocCountError() != null && terms.getDocCountError() > 0) {
// If there is an existing docCountError for this agg then
// use this as the error for this aggregation
return terms.getDocCountError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {

protected StringTerms buildEmptyTermsAggregation() {
return new StringTerms(name, order, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0L);
}

protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subsetSize, SignificanceHeuristic significanceHeuristic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public int hashCode() {

public DoubleTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
otherDocCount, Arrays.asList(topBuckets), 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.search.aggregations.bucket.terms;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -32,11 +33,11 @@ public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B exten
protected final List<B> buckets;
protected Map<String, B> bucketMap;

protected long docCountError;
protected Long docCountError;

protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize,
boolean showTermDocCountError, long otherDocCount, List<B> buckets, long docCountError) {
boolean showTermDocCountError, long otherDocCount, List<B> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata);
this.format = format;
this.shardSize = shardSize;
Expand All @@ -51,7 +52,14 @@ protected InternalMappedTerms(String name, BucketOrder reduceOrder, BucketOrder
*/
protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) throws IOException {
super(in);
docCountError = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_15_0)) {
docCountError = in.readOptionalLong();
} else {
docCountError = in.readZLong();
if (docCountError == 0) {
docCountError = null;
}
}
format = in.readNamedWriteable(DocValueFormat.class);
shardSize = readSize(in);
showTermDocCountError = in.readBoolean();
Expand All @@ -61,7 +69,11 @@ protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) thr

@Override
protected final void writeTermTypeInfoTo(StreamOutput out) throws IOException {
out.writeZLong(docCountError);
if (out.getVersion().onOrAfter(Version.V_7_15_0)) {
out.writeOptionalLong(docCountError);
} else {
out.writeZLong(docCountError == null ? 0 : docCountError);
}
out.writeNamedWriteable(format);
writeSize(shardSize, out);
out.writeBoolean(showTermDocCountError);
Expand All @@ -80,7 +92,7 @@ protected int getShardSize() {
}

@Override
public long getDocCountError() {
public Long getDocCountError() {
return docCountError;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public int hashCode() {

public LongTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize, showTermDocCountError,
otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
}
return new StringTerms(name, reduceOrder, order, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
otherDocCount, Arrays.asList(topBuckets), 0);
otherDocCount, Arrays.asList(topBuckets), 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ LongTerms buildResult(long owningBucketOrd, long otherDocCount, LongTerms.Bucket
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
0L
);
}

Expand All @@ -390,7 +390,7 @@ LongTerms buildEmptyResult() {
showTermDocCountError,
0,
emptyList(),
0
0L
);
}
}
Expand Down Expand Up @@ -454,7 +454,7 @@ DoubleTerms buildResult(long owningBucketOrd, long otherDocCount, DoubleTerms.Bu
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
0
0L
);
}

Expand All @@ -472,7 +472,7 @@ DoubleTerms buildEmptyResult() {
showTermDocCountError,
0,
emptyList(),
0
0L
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class ParsedTerms extends ParsedMultiBucketAggregation<ParsedTer
protected long sumOtherDocCount;

@Override
public long getDocCountError() {
public Long getDocCountError() {
return docCountErrorUpperBound;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public int hashCode() {

public StringTerms(String name, BucketOrder reduceOrder, BucketOrder order, int requiredSize, long minDocCount,
Map<String, Object> metadata, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
List<Bucket> buckets, long docCountError) {
List<Bucket> buckets, Long docCountError) {
super(name, reduceOrder, order, requiredSize, minDocCount, metadata, format,
shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) {
showTermDocCountError,
otherDocsCount,
buckets,
0
0L
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ interface Bucket extends MultiBucketsAggregation.Bucket {
/**
* Get an upper bound of the error on document counts in this aggregation.
*/
long getDocCountError();
Long getDocCountError();

/**
* Return the sum of the document counts of all buckets that did not make
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ protected int getShardSize() {
}

@Override
public long getDocCountError() {
return 0;
public Long getDocCountError() {
return 0L;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testReduceEmptyAggs() {

public void testNonFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0L);
List<InternalAggregations> aggs = singletonList(InternalAggregations.from(Collections.singletonList(terms)));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(aggs, maxBucketReduceContext().forPartialReduction());
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
Expand All @@ -61,7 +61,7 @@ public void testNonFinalReduceTopLevelPipelineAggs() {

public void testFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), BucketOrder.key(true),
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
10, 1, Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0L);

InternalAggregations aggs = InternalAggregations.from(Collections.singletonList(terms));
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Collections.singletonList(aggs),
Expand Down
Loading

0 comments on commit b7cc6eb

Please sign in to comment.