From 8aa6b65ce076c99b3f140ed59416929b82d3cb0c Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 7 May 2019 09:51:08 -0500 Subject: [PATCH 1/3] [ML] properly nesting objects in document source --- .../pivot/AggregationResultUtils.java | 37 +++++++++++++++++-- .../pivot/AggregationResultUtilsTests.java | 23 ++++++++++++ .../test/data_frame/preview_transforms.yml | 12 +++++- 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java index b17a65fc4daf1..0fa7ee9b773d5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java @@ -77,12 +77,12 @@ public static Stream> extractCompositeAggregationResults(Com // gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs. if (isNumericType(fieldType) || (aggResultSingleValue.getValueAsString().equals(String.valueOf(aggResultSingleValue.value())))) { - document.put(aggName, aggResultSingleValue.value()); + updateDocument(document, aggName, aggResultSingleValue.value()); } else { - document.put(aggName, aggResultSingleValue.getValueAsString()); + updateDocument(document, aggName, aggResultSingleValue.getValueAsString()); } } else if (aggResult instanceof ScriptedMetric) { - document.put(aggName, ((ScriptedMetric) aggResult).aggregation()); + updateDocument(document, aggName, ((ScriptedMetric) aggResult).aggregation()); } else { // Execution should never reach this point! // Creating transforms with unsupported aggregations shall not be possible @@ -97,4 +97,35 @@ public static Stream> extractCompositeAggregationResults(Com }); } + @SuppressWarnings("unchecked") + static void updateDocument(Map document, String fieldName, Object value) { + String[] fieldTokens = fieldName.split("\\."); + Map internalMap = document; + for (int i = 0; i < fieldTokens.length; i++) { + String token = fieldTokens[i]; + if (i == fieldTokens.length - 1) { + if (internalMap.containsKey(token)) { + logger.error("duplicate key value pairs key {} old value {} duplicate value {}", + token, + internalMap.get(token), + value); + assert false; + } + internalMap.put(token, value); + } else { + if (internalMap.containsKey(token)) { + if (internalMap.get(token) instanceof Map) { + internalMap = (Map)internalMap.get(token); + } else { + logger.error("mixed object types of nested {} and non-nested fields {}", fieldName, token); + assert false; + } + } else { + Map newMap = new HashMap<>(); + internalMap.put(token, newMap); + internalMap = newMap; + } + } + } + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java index 7eb4295111324..158d37fc59b42 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java @@ -66,6 +66,7 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.equalTo; public class AggregationResultUtilsTests extends ESTestCase { @@ -736,6 +737,28 @@ aggTypedName, asMap( assertEquals(documentIdsFirstRun, documentIdsSecondRun); } + @SuppressWarnings("unchecked") + public void testUpdateDocument() { + Map document = new HashMap<>(); + + AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L); + AggregationResultUtils.updateDocument(document, "foo.bar.baz2", 2000L); + AggregationResultUtils.updateDocument(document, "bar.field1", 1L); + AggregationResultUtils.updateDocument(document, "metric", 10L); + + assertThat(document.get("metric"), equalTo(10L)); + + Map bar = (Map)document.get("bar"); + + assertThat(bar.get("field1"), equalTo(1L)); + + Map foo = (Map)document.get("foo"); + Map foobar = (Map)foo.get("bar"); + + assertThat(foobar.get("baz"), equalTo(1000L)); + assertThat(foobar.get("baz2"), equalTo(2000L)); + } + private void executeTest(GroupConfig groups, Collection aggregationBuilders, Collection pipelineAggregationBuilders, diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml index 9438878417741..690383ce96ec4 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml @@ -76,18 +76,28 @@ setup: "group_by": { "airline": {"terms": {"field": "airline"}}, "by-hour": {"date_histogram": {"interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}}, - "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + "aggs": { + "avg_response": {"avg": {"field": "responsetime"}}, + "time.max": {"max": {"field": "time"}}, + "time.min": {"min": {"field": "time"}} + } } } - match: { preview.0.airline: foo } - match: { preview.0.by-hour: "2017-02-49 00" } - match: { preview.0.avg_response: 1.0 } + - match: { preview.0.time.max: "2017-02-18T00:30:00.000Z" } + - match: { preview.0.time.min: "2017-02-18T00:00:00.000Z" } - match: { preview.1.airline: bar } - match: { preview.1.by-hour: "2017-02-49 01" } - match: { preview.1.avg_response: 42.0 } + - match: { preview.1.time.max: "2017-02-18T01:00:00.000Z" } + - match: { preview.1.time.min: "2017-02-18T01:00:00.000Z" } - match: { preview.2.airline: foo } - match: { preview.2.by-hour: "2017-02-49 01" } - match: { preview.2.avg_response: 42.0 } + - match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" } + - match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" } --- "Test preview transform with invalid config": From c00c2fa444261a83269b5fdb718a761746bb8954 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 9 May 2019 10:07:40 -0500 Subject: [PATCH 2/3] Throw exception on agg extraction failure, cause it to fail df --- .../transforms/DataFrameTransformTask.java | 3 +- .../pivot/AggregationResultUtils.java | 31 ++++++++++++++----- .../pivot/AggregationResultUtilsTests.java | 23 ++++++++++++++ 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 2020300a0cf77..2f4945cbeec47 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; +import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils; import java.util.Arrays; import java.util.Map; @@ -636,7 +637,7 @@ protected void createCheckpoint(ActionListener listener) { } private boolean isIrrecoverableFailure(Exception e) { - return e instanceof IndexNotFoundException; + return e instanceof IndexNotFoundException || e instanceof AggregationResultUtils.AggregationExtractionException; } synchronized void handleFailure(Exception e) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java index 0fa7ee9b773d5..c9b8bd978cde3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; @@ -29,7 +30,7 @@ import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType; -final class AggregationResultUtils { +public final class AggregationResultUtils { private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class); /** @@ -100,16 +101,24 @@ public static Stream> extractCompositeAggregationResults(Com @SuppressWarnings("unchecked") static void updateDocument(Map document, String fieldName, Object value) { String[] fieldTokens = fieldName.split("\\."); + if (fieldTokens.length == 1) { + document.put(fieldName, value); + return; + } Map internalMap = document; for (int i = 0; i < fieldTokens.length; i++) { String token = fieldTokens[i]; if (i == fieldTokens.length - 1) { if (internalMap.containsKey(token)) { - logger.error("duplicate key value pairs key {} old value {} duplicate value {}", - token, - internalMap.get(token), - value); - assert false; + if (internalMap.get(token) instanceof Map) { + throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", + fieldName); + } else { + throw new AggregationExtractionException("duplicate key value pairs key [{}] old value [{}] duplicate value [{}]", + fieldName, + internalMap.get(token), + value); + } } internalMap.put(token, value); } else { @@ -117,8 +126,8 @@ static void updateDocument(Map document, String fieldName, Objec if (internalMap.get(token) instanceof Map) { internalMap = (Map)internalMap.get(token); } else { - logger.error("mixed object types of nested {} and non-nested fields {}", fieldName, token); - assert false; + throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", + fieldName); } } else { Map newMap = new HashMap<>(); @@ -128,4 +137,10 @@ static void updateDocument(Map document, String fieldName, Objec } } } + + public static class AggregationExtractionException extends ElasticsearchException { + AggregationExtractionException(String msg, Object... args) { + super(msg, args); + } + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java index 158d37fc59b42..1a835c9d19b59 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java @@ -759,6 +759,29 @@ public void testUpdateDocument() { assertThat(foobar.get("baz2"), equalTo(2000L)); } + public void testUpdateDocumentWithDuplicate() { + Map document = new HashMap<>(); + + AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L); + AggregationResultUtils.AggregationExtractionException exception = + expectThrows(AggregationResultUtils.AggregationExtractionException.class, + () -> AggregationResultUtils.updateDocument(document, "foo.bar.baz", 2000L)); + assertThat(exception.getMessage(), + equalTo("duplicate key value pairs key [foo.bar.baz] old value [1000] duplicate value [2000]")); + } + + public void testUpdateDocumentWithObjectAndNotObject() { + Map document = new HashMap<>(); + + AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L); + AggregationResultUtils.AggregationExtractionException exception = + expectThrows(AggregationResultUtils.AggregationExtractionException.class, + () -> AggregationResultUtils.updateDocument(document, "foo.bar", 2000L)); + assertThat(exception.getMessage(), + equalTo("mixed object types of nested and non-nested fields [foo.bar]")); + } + + private void executeTest(GroupConfig groups, Collection aggregationBuilders, Collection pipelineAggregationBuilders, From 66b17da074182d636072e0476573f0401025475d Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 9 May 2019 15:00:53 -0500 Subject: [PATCH 3/3] throwing error to stop df if unsupported agg is found --- .../dataframe/transforms/pivot/AggregationResultUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java index c9b8bd978cde3..8c4fa96a144ec 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java @@ -87,8 +87,9 @@ public static Stream> extractCompositeAggregationResults(Com } else { // Execution should never reach this point! // Creating transforms with unsupported aggregations shall not be possible - logger.error("Dataframe Internal Error: unsupported aggregation ["+ aggResult.getName() +"], ignoring"); - assert false; + throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]", + aggResult.getType(), + aggResult.getName()); } }