Skip to content

Commit

Permalink
Add geo_shape support for the geo_centroid aggregation (#55602)
Browse files Browse the repository at this point in the history
this commit leverages the new geo_shape doc values
to register a new geo_centroid aggregator that works
on geo_shape field.
  • Loading branch information
talevy authored Apr 27, 2020
1 parent 029a925 commit 3b74015
Show file tree
Hide file tree
Showing 17 changed files with 541 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@
@FunctionalInterface
public interface GeoCentroidAggregatorSupplier extends AggregatorSupplier {

GeoCentroidAggregator build(String name, SearchContext context, Aggregator parent,
MetricsAggregator build(String name, SearchContext context, Aggregator parent,
ValuesSource valuesSource, Map<String, Object> metadata) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static double decodeLongitude(long encodedLatLon) {
return GeoEncodingUtils.decodeLongitude((int) (encodedLatLon & 0xFFFFFFFFL));
}

InternalGeoCentroid(String name, GeoPoint centroid, long count, Map<String, Object> metadata) {
public InternalGeoCentroid(String name, GeoPoint centroid, long count, Map<String, Object> metadata) {
super(name, metadata);
assert (centroid == null) == (count == 0);
this.centroid = centroid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum Feature {
SECURITY_TOKEN_SERVICE(OperationMode.GOLD, false),
SECURITY_API_KEY_SERVICE(OperationMode.MISSING, false),
SECURITY_AUTHORIZATION_REALM(OperationMode.PLATINUM, true),
SECURITY_AUTHORIZATION_ENGINE(OperationMode.PLATINUM, true);
SECURITY_AUTHORIZATION_ENGINE(OperationMode.PLATINUM, true),
SPATIAL_GEO_CENTROID(OperationMode.GOLD, true);

final OperationMode minimumOperationMode;
final boolean needsActive;
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugin/spatial/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ dependencies {

restResources {
restApi {
includeCore '_common', 'indices', 'index', 'search'
includeCore '_common', 'bulk', 'indices', 'index', 'search'
}
restTests {
includeCore 'geo_shape'
}
}

testClusters.integTest {
setting 'xpack.license.self_generated.type', 'trial'
testDistribution = 'DEFAULT'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,27 @@
import org.elasticsearch.geo.GeoPlugin;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregatorSupplier;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.spatial.search.aggregations.metrics.GeoShapeBoundsAggregator;
import org.elasticsearch.xpack.spatial.aggregations.metrics.GeoShapeCentroidAggregator;
import org.elasticsearch.xpack.spatial.index.mapper.GeoShapeWithDocValuesFieldMapper;
import org.elasticsearch.xpack.spatial.index.mapper.PointFieldMapper;
import org.elasticsearch.xpack.spatial.index.mapper.ShapeFieldMapper;
import org.elasticsearch.xpack.spatial.index.query.ShapeQueryBuilder;
import org.elasticsearch.xpack.spatial.ingest.CircleProcessor;
import org.elasticsearch.xpack.spatial.search.aggregations.metrics.GeoShapeBoundsAggregator;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSource;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSourceType;

Expand All @@ -39,6 +45,11 @@

public class SpatialPlugin extends GeoPlugin implements ActionPlugin, MapperPlugin, SearchPlugin, IngestPlugin {

// to be overriden by tests
protected XPackLicenseState getLicenseState() {
return XPackPlugin.getSharedLicenseState();
}

@Override
public List<ActionPlugin.ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
Expand All @@ -62,18 +73,29 @@ public List<QuerySpec<?>> getQueries() {

@Override
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return List.of(SpatialPlugin::registerGeoShapeBoundsAggregator);
return List.of(this::registerGeoShapeBoundsAggregator, this::registerGeoShapeCentroidAggregator);
}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Map.of(CircleProcessor.TYPE, new CircleProcessor.Factory());
}

public static void registerGeoShapeBoundsAggregator(ValuesSourceRegistry.Builder builder) {
public void registerGeoShapeBoundsAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(GeoBoundsAggregationBuilder.NAME, GeoShapeValuesSourceType.instance(),
(GeoBoundsAggregatorSupplier) (name, aggregationContext, parent, valuesSource, wrapLongitude, metadata)
-> new GeoShapeBoundsAggregator(name, aggregationContext, parent, (GeoShapeValuesSource) valuesSource,
wrapLongitude, metadata));
}

public void registerGeoShapeCentroidAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(GeoCentroidAggregationBuilder.NAME, GeoShapeValuesSourceType.instance(),
(GeoCentroidAggregatorSupplier) (name, aggregationContext, parent, valuesSource, metadata)
-> {
if (getLicenseState().isAllowed(XPackLicenseState.Feature.SPATIAL_GEO_CENTROID)) {
return new GeoShapeCentroidAggregator(name, aggregationContext, parent, (GeoShapeValuesSource) valuesSource, metadata);
}
throw LicenseUtils.newComplianceException("geo_centroid aggregation on geo_shape fields");
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/


package org.elasticsearch.xpack.spatial.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalGeoCentroid;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.spatial.index.fielddata.DimensionalShapeType;
import org.elasticsearch.xpack.spatial.index.fielddata.MultiGeoShapeValues;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSource;

import java.io.IOException;
import java.util.Map;

/**
* A geo metric aggregator that computes a geo-centroid from a {@code geo_shape} type field
*/
public final class GeoShapeCentroidAggregator extends MetricsAggregator {
private final GeoShapeValuesSource valuesSource;
private DoubleArray lonSum, lonCompensations, latSum, latCompensations, weightSum, weightCompensations;
private LongArray counts;
private ByteArray dimensionalShapeTypes;

public GeoShapeCentroidAggregator(String name, SearchContext context, Aggregator parent,
GeoShapeValuesSource valuesSource, Map<String, Object> metadata) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
lonSum = bigArrays.newDoubleArray(1, true);
lonCompensations = bigArrays.newDoubleArray(1, true);
latSum = bigArrays.newDoubleArray(1, true);
latCompensations = bigArrays.newDoubleArray(1, true);
weightSum = bigArrays.newDoubleArray(1, true);
weightCompensations = bigArrays.newDoubleArray(1, true);
counts = bigArrays.newLongArray(1, true);
dimensionalShapeTypes = bigArrays.newByteArray(1, true);
}
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
final MultiGeoShapeValues values = valuesSource.geoShapeValues(ctx);
final CompensatedSum compensatedSumLat = new CompensatedSum(0, 0);
final CompensatedSum compensatedSumLon = new CompensatedSum(0, 0);
final CompensatedSum compensatedSumWeight = new CompensatedSum(0, 0);

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
latSum = bigArrays.grow(latSum, bucket + 1);
lonSum = bigArrays.grow(lonSum, bucket + 1);
weightSum = bigArrays.grow(weightSum, bucket + 1);
lonCompensations = bigArrays.grow(lonCompensations, bucket + 1);
latCompensations = bigArrays.grow(latCompensations, bucket + 1);
weightCompensations = bigArrays.grow(weightCompensations, bucket + 1);
counts = bigArrays.grow(counts, bucket + 1);
dimensionalShapeTypes = bigArrays.grow(dimensionalShapeTypes, bucket + 1);

if (values.advanceExact(doc)) {
final int valueCount = values.docValueCount();
// increment by the number of points for this document
counts.increment(bucket, valueCount);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
DimensionalShapeType shapeType = DimensionalShapeType.fromOrdinalByte(dimensionalShapeTypes.get(bucket));
double sumLat = latSum.get(bucket);
double compensationLat = latCompensations.get(bucket);
double sumLon = lonSum.get(bucket);
double compensationLon = lonCompensations.get(bucket);
double sumWeight = weightSum.get(bucket);
double compensatedWeight = weightCompensations.get(bucket);

compensatedSumLat.reset(sumLat, compensationLat);
compensatedSumLon.reset(sumLon, compensationLon);
compensatedSumWeight.reset(sumWeight, compensatedWeight);

// update the sum
for (int i = 0; i < valueCount; ++i) {
MultiGeoShapeValues.GeoShapeValue value = values.nextValue();
int compares = shapeType.compareTo(value.dimensionalShapeType());
if (compares < 0) {
double coordinateWeight = value.weight();
compensatedSumLat.reset(coordinateWeight * value.lat(), 0.0);
compensatedSumLon.reset(coordinateWeight * value.lon(), 0.0);
compensatedSumWeight.reset(coordinateWeight, 0.0);
dimensionalShapeTypes.set(bucket, (byte) value.dimensionalShapeType().ordinal());
} else if (compares == 0) {
double coordinateWeight = value.weight();
// weighted latitude
compensatedSumLat.add(coordinateWeight * value.lat());
// weighted longitude
compensatedSumLon.add(coordinateWeight * value.lon());
// weight
compensatedSumWeight.add(coordinateWeight);
}
// else (compares > 0)
// do not modify centroid calculation since shape is of lower dimension than the running dimension

}
lonSum.set(bucket, compensatedSumLon.value());
lonCompensations.set(bucket, compensatedSumLon.delta());
latSum.set(bucket, compensatedSumLat.value());
latCompensations.set(bucket, compensatedSumLat.delta());
weightSum.set(bucket, compensatedSumWeight.value());
weightCompensations.set(bucket, compensatedSumWeight.delta());
}
}
};
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= counts.size()) {
return buildEmptyAggregation();
}
final long bucketCount = counts.get(bucket);
final double bucketWeight = weightSum.get(bucket);
final GeoPoint bucketCentroid = (bucketWeight > 0)
? new GeoPoint(latSum.get(bucket) / bucketWeight, lonSum.get(bucket) / bucketWeight)
: null;
return new InternalGeoCentroid(name, bucketCentroid , bucketCount, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalGeoCentroid(name, null, 0L, metadata());
}

@Override
public void doClose() {
Releasables.close(latSum, latCompensations, lonSum, lonCompensations, counts, weightSum, weightCompensations,
dimensionalShapeTypes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.spatial;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.license.License;
import org.elasticsearch.license.TestUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.VersionUtils;

/**
* This class overrides the {@link SpatialPlugin} in order
* to provide the integration test clusters a hook into a real
* {@link XPackLicenseState}. In the cases that this is used, the
* actual license's operation mode is not important
*/
public class LocalStateSpatialPlugin extends SpatialPlugin {
protected XPackLicenseState getLicenseState() {
TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState();
License.OperationMode operationMode = License.OperationMode.TRIAL;
licenseState.update(operationMode, true, VersionUtils.randomVersion(LuceneTestCase.random()));
return licenseState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.spatial;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.license.License;
import org.elasticsearch.license.TestUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregatorSupplier;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoShapeValuesSourceType;

import java.util.List;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;

public class SpatialPluginTests extends ESTestCase {

public void testGeoCentroidLicenseCheck() {
for (License.OperationMode operationMode : License.OperationMode.values()) {
SpatialPlugin plugin = getPluginWithOperationMode(operationMode);
ValuesSourceRegistry.Builder registryBuilder = new ValuesSourceRegistry.Builder();
List<Consumer<ValuesSourceRegistry.Builder>> registrar = plugin.getAggregationExtentions();
registrar.forEach(c -> c.accept(registryBuilder));
ValuesSourceRegistry registry = registryBuilder.build();
GeoCentroidAggregatorSupplier centroidSupplier = (GeoCentroidAggregatorSupplier) registry.getAggregator(
GeoShapeValuesSourceType.instance(), GeoCentroidAggregationBuilder.NAME);
if (License.OperationMode.TRIAL != operationMode &&
License.OperationMode.compare(operationMode, License.OperationMode.GOLD) < 0) {
ElasticsearchSecurityException exception = expectThrows(ElasticsearchSecurityException.class,
() -> centroidSupplier.build(null, null, null, null, null));
assertThat(exception.getMessage(),
equalTo("current license is non-compliant for [geo_centroid aggregation on geo_shape fields]"));
}
}
}

private SpatialPlugin getPluginWithOperationMode(License.OperationMode operationMode) {
return new SpatialPlugin() {
protected XPackLicenseState getLicenseState() {
TestUtils.UpdatableLicenseState licenseState = new TestUtils.UpdatableLicenseState();
licenseState.update(operationMode, true, VersionUtils.randomVersion(random()));
return licenseState;
}
};
}
}
Loading

0 comments on commit 3b74015

Please sign in to comment.