Skip to content

Commit

Permalink
[ML] adds geo_centroid aggregation support to data frames (elastic#42088
Browse files Browse the repository at this point in the history
)
  • Loading branch information
benwtrent authored and Gurkan Kaymak committed May 27, 2019
1 parent 61ce90e commit db765c0
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,56 @@ public void testPivotWithBucketScriptAgg() throws Exception {
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
}

public void testPivotWithGeoCentroidAgg() throws Exception {
String transformId = "geoCentroidPivot";
String dataFrameIndex = "geo_centroid_pivot_reviews";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);

final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId,
BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);

String config = "{"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } },"
+ " \"location\": {"
+ " \"geo_centroid\": {\"field\": \"location\"}"
+ " } } }"
+ "}";

createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(dataFrameIndex));

// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

// get and check some users
Map<String, Object> searchResult = getAsMap(dataFrameIndex + "/_search?q=reviewer:user_4");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
String actualString = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.location", searchResult)).get(0);
String[] latlon = actualString.split(",");
assertEquals((4 + 10), Double.valueOf(latlon[0]), 0.000001);
assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001);
}

private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ protected void createReviewsIndex() throws IOException {
.startObject("stars")
.field("type", "integer")
.endObject()
.startObject("location")
.field("type", "geo_point")
.endObject()
.endObject()
.endObject();
}
Expand Down Expand Up @@ -104,6 +107,7 @@ protected void createReviewsIndex() throws IOException {
min = 10 + (i % 49);
}
int sec = 10 + (i % 49);
String location = (user + 10) + "," + (user + 15);

String date_string = "2017-01-" + day + "T" + hour + ":" + min + ":" + sec + "Z";
bulk.append("{\"user_id\":\"")
Expand All @@ -114,7 +118,9 @@ protected void createReviewsIndex() throws IOException {
.append(business)
.append("\",\"stars\":")
.append(stars)
.append(",\"timestamp\":\"")
.append(",\"location\":\"")
.append(location)
.append("\",\"timestamp\":\"")
.append(date_string)
.append("\"}\n");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
Expand Down Expand Up @@ -84,6 +85,8 @@ public static Stream<Map<String, Object>> extractCompositeAggregationResults(Com
}
} else if (aggResult instanceof ScriptedMetric) {
updateDocument(document, aggName, ((ScriptedMetric) aggResult).aggregation());
} else if (aggResult instanceof GeoCentroid) {
updateDocument(document, aggName, ((GeoCentroid) aggResult).centroid().toString());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum AggregationType {
MAX("max", SOURCE),
MIN("min", SOURCE),
SUM("sum", SOURCE),
GEO_CENTROID("geo_centroid", "geo_point"),
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ public void testResolveTargetMapping() {
assertEquals("double", Aggregations.resolveTargetMapping("sum", "double"));
assertEquals("half_float", Aggregations.resolveTargetMapping("sum", "half_float"));

// geo_centroid
assertEquals("geo_point", Aggregations.resolveTargetMapping("geo_centroid", "geo_point"));
assertEquals("geo_point", Aggregations.resolveTargetMapping("geo_centroid", null));

// scripted_metric
assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", null));
assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", "int"));

// scripted_metric
// bucket_script
assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", null));
assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", "int"));
}
Expand Down

0 comments on commit db765c0

Please sign in to comment.