Skip to content

Commit

Permalink
[7.x] Fix rate agg with custom _doc_count (elastic#79449)
Browse files Browse the repository at this point in the history
Backports elastic#79346 to 7.x

    When running a rate aggregation without setting the field parameter, the result is computed based on the bucket doc_count.

    This PR adds support for a custom _doc_count field.

    Closes elastic#77734
  • Loading branch information
csoulios authored Oct 19, 2021
1 parent 74ba6fb commit ffc61a2
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 28 deletions.
8 changes: 4 additions & 4 deletions docs/reference/aggregations/metrics/rate-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
++++

A `rate` metrics aggregation can be used only inside a `date_histogram` or `composite` aggregation. It calculates a rate of documents
or a field in each bucket. The field values can be generated extracted from specific numeric or
or a field in each bucket. The field values can be extracted from specific numeric or
<<histogram,histogram fields>> in the documents.

NOTE: For `composite` aggregations, there must be exactly one `date_histogram` source for the `rate` aggregation to be supported.
Expand All @@ -27,7 +27,7 @@ A `rate` aggregation looks like this in isolation:
--------------------------------------------------
// NOTCONSOLE

The following request will group all sales records into monthly bucket and than convert the number of sales transaction in each bucket
The following request will group all sales records into monthly buckets and then convert the number of sales transactions in each bucket
into per annual sales rate.

[source,console]
Expand Down Expand Up @@ -56,8 +56,8 @@ GET sales/_search
<1> Histogram is grouped by month.
<2> But the rate is converted into annual rate.

The response will return the annual rate of transaction in each bucket. Since there are 12 months per year, the annual rate will
be automatically calculated by multiplying monthly rate by 12.
The response will return the annual rate of transactions in each bucket. Since there are 12 months per year, the annual rate will
be automatically calculated by multiplying the monthly rate by 12.

[source,console-result]
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public abstract class AbstractRateAggregator extends NumericMetricsAggregator.Si
private final Rounding.DateTimeUnit rateUnit;
protected final RateMode rateMode;
private final SizedBucketAggregator sizedBucketAggregator;
protected final boolean computeWithDocCount;

protected DoubleArray sums;
protected DoubleArray compensations;
Expand All @@ -55,6 +56,8 @@ public AbstractRateAggregator(
this.rateUnit = rateUnit;
this.rateMode = rateMode;
this.sizedBucketAggregator = findSizedBucketAncestor();
// If no fields or scripts have been defined in the agg, rate should be computed based on bucket doc_counts
this.computeWithDocCount = valuesSourceConfig.fieldContext() == null && valuesSourceConfig.script() == null;
}

private SizedBucketAggregator findSizedBucketAncestor() {
Expand Down Expand Up @@ -112,5 +115,4 @@ public InternalAggregation buildEmptyAggregation() {
public void doClose() {
Releasables.close(sums, compensations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.DocCountProvider;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand All @@ -22,6 +23,9 @@
import java.util.Map;

public class NumericRateAggregator extends AbstractRateAggregator {

private final DocCountProvider docCountProvider;

public NumericRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Expand All @@ -32,42 +36,68 @@ public NumericRateAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
docCountProvider = computeWithDocCount ? new DocCountProvider() : null;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
if (computeWithDocCount) {
// No field or script has been set at the rate agg. So, rate will be computed based on the doc_counts.
// This implementation hard-wires the DocCountProvider and reads the _doc_count fields when available.
// A better approach would be to create a DOC_COUNT ValuesSource type and use that as valuesSource
// In that case the computeRateOnDocs variable and this branch of the if-statement are not required.
docCountProvider.setLeafReaderContext(ctx);
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
switch (rateMode) {
case SUM:
for (int i = 0; i < valuesCount; i++) {
kahanSummation.add(values.nextValue());
}
break;
case VALUE_COUNT:
kahanSummation.add(valuesCount);
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}

final int docCount = docCountProvider.getDocCount(doc);
kahanSummation.add(docCount);
compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
};
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
switch (rateMode) {
case SUM:
for (int i = 0; i < valuesCount; i++) {
kahanSummation.add(values.nextValue());
}
break;
case VALUE_COUNT:
kahanSummation.add(valuesCount);
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.mapper.CustomTermFreqField;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -859,6 +860,18 @@ public void testModeWithoutField() {
assertEquals("The mode parameter is only supported with field or script", ex.getMessage());
}

public void testWithCustomDocCount() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "month", null, iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new CustomTermFreqField("_doc_count", "_doc_count", 10)));
iw.addDocument(doc("2010-04-01T03:43:34"));
iw.addDocument(doc("2010-04-27T03:43:34", new CustomTermFreqField("_doc_count", "_doc_count", 5)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(10.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(6.0, 0.000001));
});
}

private static AbstractAggregationBuilder<?> randomValidMultiBucketAggBuilder(
RateAggregationBuilder rateAggregationBuilder,
DateHistogramInterval interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,44 @@ setup:
- length: { aggregations.by_date.buckets: 2 }
- match: { aggregations.by_date.buckets.0.rate.value: 1.0 }
- match: { aggregations.by_date.buckets.1.rate.value: 2.0 }


---
"rate with doc_count":
- skip:
version: " - 7.15.99"
reason: bug fixed in 7.16.0
- do:
bulk:
index: test2
refresh: true
body:
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:33:37.477Z", "_doc_count": 10}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:35:37.477Z", "_doc_count": 5}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:35:38.477Z", "_doc_count": 1}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:36:08.477Z"}'
- do:
search:
size: 0
index: "test2"
body:
aggs:
by_date:
date_histogram:
field: timestamp
fixed_interval: 60s
aggs:
rate:
rate:
unit: minute

- length: { aggregations.by_date.buckets: 4 }
- match: { aggregations.by_date.buckets.0.rate.value: 10.0 }
- match: { aggregations.by_date.buckets.1.rate.value: 0.0 }
- match: { aggregations.by_date.buckets.2.rate.value: 6.0 }
- match: { aggregations.by_date.buckets.3.rate.value: 1.0 }

0 comments on commit ffc61a2

Please sign in to comment.