diff --git a/docs/reference/ml/functions/geo.asciidoc b/docs/reference/ml/functions/geo.asciidoc index 130e17d85dcfe..ee666c83094f0 100644 --- a/docs/reference/ml/functions/geo.asciidoc +++ b/docs/reference/ml/functions/geo.asciidoc @@ -54,8 +54,9 @@ detects anomalies where the geographic location of a credit card transaction is unusual for a particular customer’s credit card. An anomaly might indicate fraud. IMPORTANT: The `field_name` that you supply must be a single string that contains -two comma-separated numbers of the form `latitude,longitude`. The `latitude` and -`longitude` must be in the range -180 to 180 and represent a point on the +two comma-separated numbers of the form `latitude,longitude`, a `geo_point` field, +a `geo_shape` field that contains point values, or a `geo_centroid` aggregation. +The `latitude` and `longitude` must be in the range -180 to 180 and represent a point on the surface of the Earth. For example, JSON data might contain the following transaction coordinates: @@ -71,34 +72,9 @@ For example, JSON data might contain the following transaction coordinates: // NOTCONSOLE In {es}, location data is likely to be stored in `geo_point` fields. For more -information, see {ref}/geo-point.html[Geo-point datatype]. This data type is not -supported natively in {ml-features}. You can, however, use Painless scripts -in `script_fields` in your {dfeed} to transform the data into an appropriate -format. For example, the following Painless script transforms -`"coords": {"lat" : 41.44, "lon":90.5}` into `"lat-lon": "41.44,90.5"`: - -[source,js] --------------------------------------------------- -PUT _ml/datafeeds/datafeed-test2 -{ - "job_id": "farequote", - "indices": ["farequote"], - "query": { - "match_all": { - "boost": 1 - } - }, - "script_fields": { - "lat-lon": { - "script": { - "source": "doc['coords'].lat + ',' + doc['coords'].lon", - "lang": "painless" - } - } - } -} --------------------------------------------------- -// CONSOLE -// TEST[skip:setup:farequote_job] +information, see {ref}/geo-point.html[Geo-point datatype]. This data type is +supported natively in {ml-features}. Specifically, {dfeed} when pulling data from +a `geo_point` field, will transform the data into the appropriate `lat,lon` string +format before sending to the {ml} job. For more information, see <>. diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 426b58f686419..114f1ecd0aa83 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -321,6 +321,71 @@ public void testLookbackOnlyWithNestedFields() throws Exception { assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); } + public void testLookbackWithGeo() throws Exception { + String jobId = "test-lookback-only-with-geo"; + Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + createJobRequest.setJsonEntity("{\n" + + " \"description\": \"lat_long with geo_point\",\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"15m\",\n" + + " \"detectors\": [\n" + + " {\n" + + " \"function\": \"lat_long\",\n" + + " \"field_name\": \"location\"\n" + + " }\n" + + " ]\n" + + " }," + + " \"data_description\": {\"time_field\": \"time\"}\n" + + "}"); + client().performRequest(createJobRequest); + String datafeedId = jobId + "-datafeed"; + new DatafeedBuilder(datafeedId, jobId, "geo-data").build(); + + StringBuilder bulk = new StringBuilder(); + + Request createGeoData = new Request("PUT", "/geo-data"); + createGeoData.setJsonEntity("{" + + " \"mappings\": {" + + " \"properties\": {" + + " \"time\": { \"type\":\"date\"}," + + " \"location\": { \"type\":\"geo_point\"}" + + " }" + + " }" + + "}"); + client().performRequest(createGeoData); + + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 1}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:00:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 2}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:05:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 3}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:10:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 4}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:15:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 5}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:20:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 6}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:25:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 7}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:30:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 8}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:40:00Z\",\"location\":{\"lat\":90.0,\"lon\":-77.03653}}\n"); + bulk.append("{\"index\": {\"_index\": \"geo-data\", \"_id\": 9}}\n"); + bulk.append("{\"time\":\"2016-06-01T00:41:00Z\",\"location\":{\"lat\":38.897676,\"lon\":-77.03653}}\n"); + bulkIndex(bulk.toString()); + + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); + Response jobStatsResponse = client().performRequest( + new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); + String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity()); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":9")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":9")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + public void testLookbackOnlyGivenEmptyIndex() throws Exception { new LookbackOnlyTestHelper("test-lookback-only-given-empty-index", "airline-data-empty") .setShouldSucceedInput(false).setShouldSucceedProcessing(false).execute(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index 8cf3ed39651d6..365cfc6c54a20 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -15,6 +15,7 @@ import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.metrics.GeoCentroid; import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.Percentile; @@ -275,7 +276,7 @@ private void processBucket(MultiBucketsAggregation bucketAgg, boolean addField) } /** - * Adds a leaf key-value. It returns the name of the key added or {@code null} when nothing was added. + * Adds a leaf key-value. It returns {@code true} if the key added or {@code false} when nothing was added. * Non-finite metric values are not added. */ private boolean processLeaf(Aggregation agg) throws IOException { @@ -283,6 +284,8 @@ private boolean processLeaf(Aggregation agg) throws IOException { return processSingleValue((NumericMetricsAggregation.SingleValue) agg); } else if (agg instanceof Percentiles) { return processPercentiles((Percentiles) agg); + } else if (agg instanceof GeoCentroid){ + return processGeoCentroid((GeoCentroid) agg); } else { throw new IllegalArgumentException("Unsupported aggregation type [" + agg.getName() + "]"); } @@ -300,6 +303,14 @@ private boolean addMetricIfFinite(String key, double value) { return false; } + private boolean processGeoCentroid(GeoCentroid agg) { + if (agg.count() > 0) { + keyValuePairs.put(agg.getName(), agg.centroid().getLat() + "," + agg.centroid().getLon()); + return true; + } + return false; + } + private boolean processPercentiles(Percentiles percentiles) throws IOException { Iterator percentileIterator = percentiles.iterator(); boolean aggregationAdded = addMetricIfFinite(percentiles.getName(), percentileIterator.next().getValue()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java index 451f480b27830..049fac12a22ac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java @@ -6,9 +6,17 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.fields; import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.geo.geometry.Geometry; +import org.elasticsearch.geo.geometry.Point; +import org.elasticsearch.geo.geometry.ShapeType; +import org.elasticsearch.geo.utils.WellKnownText; import org.elasticsearch.search.SearchHit; +import java.io.IOException; +import java.text.ParseException; +import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -61,6 +69,14 @@ public static ExtractedField newTimeField(String name, ExtractionMethod extracti return new TimeField(name, extractionMethod); } + public static ExtractedField newGeoShapeField(String alias, String name, ExtractionMethod extractionMethod) { + return new GeoShapeField(alias, name, extractionMethod); + } + + public static ExtractedField newGeoPointField(String alias, String name, ExtractionMethod extractionMethod) { + return new GeoPointField(alias, name, extractionMethod); + } + public static ExtractedField newField(String name, ExtractionMethod extractionMethod) { return newField(name, name, extractionMethod); } @@ -88,12 +104,102 @@ public Object[] value(SearchHit hit) { DocumentField keyValue = hit.field(name); if (keyValue != null) { List values = keyValue.getValues(); - return values.toArray(new Object[values.size()]); + return values.toArray(new Object[0]); } return new Object[0]; } } + private static class GeoShapeField extends FromSource { + private static final WellKnownText wkt = new WellKnownText(); + + GeoShapeField(String alias, String name, ExtractionMethod extractionMethod) { + super(alias, name, extractionMethod); + } + + @Override + public Object[] value(SearchHit hit) { + Object[] value = super.value(hit); + if (value.length != 1) { + throw new IllegalStateException("Unexpected values for a geo_shape field: " + Arrays.toString(value)); + } + if (value[0] instanceof String) { + value[0] = handleString((String) value[0]); + } else if (value[0] instanceof Map) { + @SuppressWarnings("unchecked") + Map geoObject = (Map) value[0]; + value[0] = handleObject(geoObject); + } else { + throw new IllegalStateException("Unexpected value type for a geo_shape field: " + value[0].getClass()); + } + return value; + } + + private String handleString(String geoString) { + try { + if (geoString.startsWith("POINT")) { // Entry is of the form "POINT (-77.03653 38.897676)" + Geometry geometry = wkt.fromWKT(geoString); + if (geometry.type() != ShapeType.POINT) { + throw new IllegalArgumentException("Unexpected non-point geo_shape type: " + geometry.type().name()); + } + Point pt = ((Point)geometry); + return pt.getLat() + "," + pt.getLon(); + } else { + throw new IllegalArgumentException("Unexpected value for a geo_shape field: " + geoString); + } + } catch (IOException | ParseException ex) { + throw new IllegalArgumentException("Unexpected value for a geo_shape field: " + geoString); + } + } + + private String handleObject(Map geoObject) { + String geoType = (String) geoObject.get("type"); + if (geoType != null && "point".equals(geoType.toLowerCase(Locale.ROOT))) { + @SuppressWarnings("unchecked") + List coordinates = (List) geoObject.get("coordinates"); + if (coordinates == null || coordinates.size() != 2) { + throw new IllegalArgumentException("Invalid coordinates for geo_shape point: " + geoObject); + } + return coordinates.get(1) + "," + coordinates.get(0); + } else { + throw new IllegalArgumentException("Unexpected value for a geo_shape field: " + geoObject); + } + } + + } + + private static class GeoPointField extends FromFields { + + GeoPointField(String alias, String name, ExtractionMethod extractionMethod) { + super(alias, name, extractionMethod); + if (extractionMethod != ExtractionMethod.DOC_VALUE) { + throw new IllegalArgumentException("cannot use [geo_point] field with disabled doc values"); + } + } + + @Override + public Object[] value(SearchHit hit) { + Object[] value = super.value(hit); + if (value.length != 1) { + throw new IllegalStateException("Unexpected values for a geo_point field: " + Arrays.toString(value)); + } + if (value[0] instanceof String) { + value[0] = handleString((String) value[0]); + } else { + throw new IllegalStateException("Unexpected value type for a geo_point field: " + value[0].getClass()); + } + return value; + } + + private String handleString(String geoString) { + if (geoString.contains(",")) { // Entry is of the form "38.897676, -77.03653" + return geoString.replace(" ", ""); + } else { + throw new IllegalArgumentException("Unexpected value for a geo_point field: " + geoString); + } + } + } + private static class TimeField extends FromFields { private static final String EPOCH_MILLIS_FORMAT = "epoch_millis"; @@ -145,7 +251,7 @@ public Object[] value(SearchHit hit) { if (values instanceof List) { @SuppressWarnings("unchecked") List asList = (List) values; - return asList.toArray(new Object[asList.size()]); + return asList.toArray(new Object[0]); } else { return new Object[]{values}; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFields.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFields.java index f9b2467fbcfd3..f41b8324b3c21 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFields.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFields.java @@ -87,6 +87,12 @@ protected ExtractedField detect(String field) { : ExtractedField.ExtractionMethod.SOURCE; } } + if (isFieldOfType(field, "geo_point")) { + return ExtractedField.newGeoPointField(field, internalField, method); + } + if (isFieldOfType(field, "geo_shape")) { + return ExtractedField.newGeoShapeField(field, internalField, method); + } return ExtractedField.newField(field, internalField, method); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java index 38202eee0ff06..afdf9c2eaf852 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java @@ -5,12 +5,14 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.GeoCentroid; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.aggregations.metrics.Percentile; @@ -70,6 +72,15 @@ static Max createMax(String name, double value) { return max; } + static GeoCentroid createGeoCentroid(String name, long count, double lat, double lon) { + GeoCentroid centroid = mock(GeoCentroid.class); + when(centroid.count()).thenReturn(count); + when(centroid.getName()).thenReturn(name); + GeoPoint point = count > 0 ? new GeoPoint(lat, lon) : null; + when(centroid.centroid()).thenReturn(point); + return centroid; + } + static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) { NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class); when(singleValue.getName()).thenReturn(name); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index be79b461eeb18..d45f294d1887a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -27,6 +27,7 @@ import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createGeoCentroid; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramAggregation; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax; @@ -472,6 +473,20 @@ public void testSingleBucketAgg_failureWithSubMultiBucket() throws IOException { () -> aggToString(Sets.newHashSet("my_field"), histogramBuckets)); } + public void testGeoCentroidAgg() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), + createGeoCentroid("geo_field", 4, 92.1, 93.1))), + createHistogramBucket(2000L, 7, Arrays.asList( + createMax("time", 2000), + createGeoCentroid("geo_field", 0, -1, -1)))); + String json = aggToString(Sets.newHashSet("geo_field"), histogramBuckets); + + assertThat(json, equalTo("{\"time\":1000,\"geo_field\":\"92.1,93.1\",\"doc_count\":4}" + + " {\"time\":2000,\"doc_count\":7}")); + } + private String aggToString(Set fields, Histogram.Bucket bucket) throws IOException { return aggToString(fields, Collections.singletonList(bucket)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldTests.java index 098aa48a4f75c..28ad7341751e9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedFieldTests.java @@ -62,6 +62,34 @@ public void testValueGivenNestedSource() { assertThat(nested.value(hit), equalTo(new String[] { "bar" })); } + public void testGeoPoint() { + double lat = 38.897676; + double lon = -77.03653; + String[] expected = new String[] {lat + "," + lon}; + + // doc_value field + ExtractedField geo = ExtractedField.newGeoPointField("geo", "geo", ExtractedField.ExtractionMethod.DOC_VALUE); + SearchHit hit = new SearchHitBuilder(42).addField("geo", lat + ", " + lon).build(); + assertThat(geo.value(hit), equalTo(expected)); + } + + public void testGeoShape() { + double lat = 38.897676; + double lon = -77.03653; + String[] expected = new String[] {lat + "," + lon}; + // object format + SearchHit hit = new SearchHitBuilder(42) + .setSource("{\"geo\":{\"type\":\"point\", \"coordinates\": [" + lon + ", " + lat + "]}}") + .build(); + ExtractedField geo = ExtractedField.newGeoShapeField("geo", "geo", ExtractedField.ExtractionMethod.SOURCE); + assertThat(geo.value(hit), equalTo(expected)); + + // WKT format + hit = new SearchHitBuilder(42).setSource("{\"geo\":\"POINT ("+ lon + " " + lat + ")\"}").build(); + geo = ExtractedField.newGeoShapeField("geo", "geo", ExtractedField.ExtractionMethod.SOURCE); + assertThat(geo.value(hit), equalTo(expected)); + } + public void testValueGivenSourceAndHitWithNoSource() { ExtractedField missing = ExtractedField.newField("missing", ExtractedField.ExtractionMethod.SOURCE); assertThat(missing.value(new SearchHitBuilder(3).build()), equalTo(new Object[0]));