Skip to content

Commit

Permalink
Implement aggregations on aggregate metrics (#53986)
Browse files Browse the repository at this point in the history
Following the implementation of the aggregate_metric_double field mapper(#49830) we are implementing the Min, Max, ValueCount, Sum and Average aggregations on aggregate metrics.

The code builds on the excellent work done for #42949 and uses the extensible ValuesSources infrastructure to wire up common metric aggregation on the aggregate_metric_double field type.

This PR is part of the rollups v2 refactoring as described in meta issue #42720
  • Loading branch information
csoulios authored May 14, 2020
1 parent 046118c commit d89f9f3
Show file tree
Hide file tree
Showing 30 changed files with 2,421 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class InternalSum extends InternalNumericMetricsAggregation.SingleValue implements Sum {
private final double sum;

public InternalSum(String name, double sum, DocValueFormat formatter, Map<String, Object> metadata) {
public InternalSum(String name, double sum, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.sum = sum;
this.format = formatter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import java.util.Map;
import java.util.function.Function;

class MinAggregator extends NumericMetricsAggregator.SingleValue {
public class MinAggregator extends NumericMetricsAggregator.SingleValue {
private static final int MAX_BKD_LOOKUPS = 1024;

final ValuesSource.Numeric valuesSource;
Expand Down Expand Up @@ -168,7 +168,7 @@ public void doClose() {
* @param parent The parent aggregator.
* @param config The config for the values source metric.
*/
static Function<byte[], Number> getPointReaderOrNull(SearchContext context, Aggregator parent,
public static Function<byte[], Number> getPointReaderOrNull(SearchContext context, Aggregator parent,
ValuesSourceConfig config) {
if (context.query() != null &&
context.query().getClass() != MatchAllDocsQuery.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
Expand Down
6 changes: 5 additions & 1 deletion x-pack/plugin/mapper-aggregate-metric/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
evaluationDependsOn(xpackModule('core'))

apply plugin: 'elasticsearch.esplugin'

esplugin {
name 'x-pack-aggregate-metric'
description 'Module for the aggregate_metric field type, which allows pre-aggregated fields to be stored a single field.'
Expand All @@ -16,7 +15,12 @@ esplugin {
}
archivesBaseName = 'x-pack-aggregate-metric'

compileJava.options.compilerArgs << "-Xlint:-rawtypes"
compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"

dependencies {
compileOnly project(":server")

compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.aggregatemetric.aggregations.metrics.AggregateMetricsAggregatorsRegistrar;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static java.util.Collections.singletonMap;

public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin {
public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin, SearchPlugin {

@Override
public Map<String, Mapper.TypeParser> getMappers() {
Expand All @@ -37,4 +41,14 @@ public Map<String, Mapper.TypeParser> getMappers() {
);
}

@Override
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return List.of(
AggregateMetricsAggregatorsRegistrar::registerSumAggregator,
AggregateMetricsAggregatorsRegistrar::registerAvgAggregator,
AggregateMetricsAggregatorsRegistrar::registerMinAggregator,
AggregateMetricsAggregatorsRegistrar::registerMaxAggregator,
AggregateMetricsAggregatorsRegistrar::registerValueCountAggregator
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;

import java.io.IOException;
import java.util.Map;

class AggregateMetricBackedAvgAggregator extends NumericMetricsAggregator.SingleValue {

final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource;

LongArray counts;
DoubleArray sums;
DoubleArray compensations;
DocValueFormat format;

AggregateMetricBackedAvgAggregator(
String name,
AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource,
DocValueFormat formatter,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
this.format = formatter;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
counts = bigArrays.newLongArray(1, true);
sums = bigArrays.newDoubleArray(1, true);
compensations = bigArrays.newDoubleArray(1, true);
}
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
// Retrieve aggregate values for metrics sum and value_count
final SortedNumericDoubleValues aggregateSums = valuesSource.getAggregateMetricValues(ctx, Metric.sum);
final SortedNumericDoubleValues aggregateValueCounts = valuesSource.getAggregateMetricValues(ctx, Metric.value_count);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, sums) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

// Read aggregate values for sums
if (aggregateSums.advanceExact(doc)) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);

kahanSummation.reset(sum, compensation);
for (int i = 0; i < aggregateSums.docValueCount(); i++) {
double value = aggregateSums.nextValue();
kahanSummation.add(value);
}

sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}

counts = bigArrays.grow(counts, bucket + 1);
// Read aggregate values for value_count
if (aggregateValueCounts.advanceExact(doc)) {
for (int i = 0; i < aggregateValueCounts.docValueCount(); i++) {
double d = aggregateValueCounts.nextValue();
long value = Double.valueOf(d).longValue();
counts.increment(bucket, value);
}
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= sums.size()) {
return Double.NaN;
}
return sums.get(owningBucketOrd) / counts.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= sums.size()) {
return buildEmptyAggregation();
}
return new InternalAvg(name, sums.get(bucket), counts.get(bucket), format, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalAvg(name, 0.0, 0L, format, metadata());
}

@Override
public void doClose() {
Releasables.close(counts, sums, compensations);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;

import java.io.IOException;
import java.util.Map;

class AggregateMetricBackedMaxAggregator extends NumericMetricsAggregator.SingleValue {

private final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource;
final DocValueFormat formatter;
DoubleArray maxes;

AggregateMetricBackedMaxAggregator(
String name,
ValuesSourceConfig config,
AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
if (valuesSource != null) {
maxes = context.bigArrays().newDoubleArray(1, false);
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
}
this.formatter = config.format();
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
if (parent != null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
}

final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.getAggregateMetricValues(ctx, Metric.max);
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {

@Override
public void collect(int doc, long bucket) throws IOException {
if (bucket >= maxes.size()) {
long from = maxes.size();
maxes = bigArrays.grow(maxes, bucket + 1);
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
}
if (values.advanceExact(doc)) {
final double value = values.doubleValue();
double max = maxes.get(bucket);
max = Math.max(max, value);
maxes.set(bucket, max);
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= maxes.size()) {
return Double.NEGATIVE_INFINITY;
}
return maxes.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= maxes.size()) {
return buildEmptyAggregation();
}
return new InternalMax(name, maxes.get(bucket), formatter, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalMax(name, Double.NEGATIVE_INFINITY, formatter, metadata());
}

@Override
public void doClose() {
Releasables.close(maxes);
}
}
Loading

0 comments on commit d89f9f3

Please sign in to comment.