From c1bee6851989c1ea769a82ae46a74550002e85b1 Mon Sep 17 00:00:00 2001 From: penghuo Date: Fri, 22 May 2020 10:19:22 -0700 Subject: [PATCH 1/6] dedup operator --- .../sql/planner/physical/DedupeOperator.java | 146 +++++++++++ .../sql/planner/physical/PhysicalPlanDSL.java | 14 + .../physical/PhysicalPlanNodeVisitor.java | 4 + .../planner/physical/DedupeOperatorTest.java | 239 ++++++++++++++++++ .../physical/PhysicalPlanNodeVisitorTest.java | 2 + 5 files changed, 405 insertions(+) create mode 100644 core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java create mode 100644 core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java new file mode 100644 index 0000000000..387e5a9226 --- /dev/null +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java @@ -0,0 +1,146 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.planner.physical; + +import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue; +import com.amazon.opendistroforelasticsearch.sql.expression.Expression; +import com.amazon.opendistroforelasticsearch.sql.storage.bindingtuple.BindingTuple; +import com.google.common.collect.ImmutableList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; + +/** + * Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link + * DedupeOperator#dedupeList} + * The result order follow the input order. + * + */ +public class DedupeOperator extends PhysicalPlan { + private final PhysicalPlan input; + private final List dedupeList; + private final Decider decider; + private Integer allowedDuplication = 1; + private Boolean keepEmpty = false; + private Boolean consecutive = false; + private ExprValue next; + + private static final Predicate NULL_OR_MISSING = v -> v.isNull() || v.isMissing(); + + public DedupeOperator(PhysicalPlan input, List dedupeList) { + this.input = input; + this.dedupeList = dedupeList; + this.decider = new Dedupe(); + } + + public DedupeOperator( + PhysicalPlan input, + List dedupeList, + Integer allowedDuplication, + Boolean keepEmpty, + Boolean consecutive) { + this.input = input; + this.dedupeList = dedupeList; + this.allowedDuplication = allowedDuplication; + this.keepEmpty = keepEmpty; + this.consecutive = consecutive; + this.decider = this.consecutive ? new ConsecutiveDedupe() : new Dedupe(); + } + + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + return visitor.visitDedupe(this, context); + } + + @Override + public List getChild() { + return Collections.singletonList(input); + } + + @Override + public boolean hasNext() { + while (input.hasNext()) { + ExprValue next = input.next(); + if (decider.keep(next)) { + this.next = next; + return true; + } + } + return false; + } + + @Override + public ExprValue next() { + return this.next; + } + + /** + * The Decider test the {@link ExprValue} should be keep (return true) or ignored (return false). + * + *

If any value evaluted by {@link DedupeOperator#dedupeList} is NULL or MISSING, then the + * return value is decided by keepEmpty option, default value is ignore. + */ + abstract class Decider { + public boolean keep(ExprValue value) { + BindingTuple bindingTuple = value.bindingTuples(); + ImmutableList.Builder dedupeKeyBuilder = new ImmutableList.Builder<>(); + for (Expression expression : dedupeList) { + ExprValue exprValue = expression.valueOf(bindingTuple); + if (NULL_OR_MISSING.test(exprValue)) { + return keepEmpty; + } + dedupeKeyBuilder.add(exprValue); + } + List dedupeKey = dedupeKeyBuilder.build(); + int seenTimes = seenTimes(dedupeKey); + return seenTimes <= allowedDuplication; + } + + /** + * Return how many times the dedupeKey has been seen before. The side effect is the seen times + * will add 1 times after calling this function. + */ + abstract int seenTimes(List dedupeKey); + } + + class Dedupe extends Decider { + private final Map, Integer> seenMap = new ConcurrentHashMap<>(); + + @Override + int seenTimes(List dedupeKey) { + seenMap.putIfAbsent(dedupeKey, 0); + return seenMap.computeIfPresent(dedupeKey, (k, v) -> v + 1); + } + } + + class ConsecutiveDedupe extends Decider { + private List lastSeenDedupeKey = null; + private Integer consecutiveCount = 0; + + @Override + int seenTimes(List dedupeKey) { + if (dedupeKey.equals(lastSeenDedupeKey)) { + return ++consecutiveCount; + } else { + lastSeenDedupeKey = dedupeKey; + consecutiveCount = 1; + return consecutiveCount; + } + } + } +} diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanDSL.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanDSL.java index 46a0ea27a7..e91dd29ceb 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanDSL.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanDSL.java @@ -61,4 +61,18 @@ public static SortOperator sort(PhysicalPlan input, Integer count, Pair... sorts) { return new SortOperator(input, count, Arrays.asList(sorts)); } + + public static DedupeOperator dedupe(PhysicalPlan input, Expression... expressions) { + return new DedupeOperator(input, Arrays.asList(expressions)); + } + + public static DedupeOperator dedupe( + PhysicalPlan input, + int allowedDuplication, + boolean keepEmpty, + boolean consecutive, + Expression... expressions) { + return new DedupeOperator( + input, Arrays.asList(expressions), allowedDuplication, keepEmpty, consecutive); + } } diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index 48e78c28c3..b676e15a15 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -56,4 +56,8 @@ public R visitRemove(RemoveOperator node, C context) { public R visitEval(EvalOperator node, C context) { return visitNode(node, context); } + + public R visitDedupe(DedupeOperator node, C context) { + return visitNode(node, context); + } } diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java new file mode 100644 index 0000000000..57475ab35d --- /dev/null +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java @@ -0,0 +1,239 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.planner.physical; + +import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.tupleValue; +import static com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanDSL.dedupe; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.mockito.Mockito.when; + +import com.amazon.opendistroforelasticsearch.sql.expression.DSL; +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupeOperatorTest extends PhysicalPlanTestBase { + @Mock private PhysicalPlan inputPlan; + + /** + * construct the map which contain null value, because {@link ImmutableMap} doesn't support null + * value. + */ + private Map NULL_MAP = + new HashMap() { + { + put("region", null); + put("action", "GET"); + } + }; + + @Test + public void dedupe_one_field() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)))); + } + + @Test + public void dedupe_one_field_no_duplication() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200))); + PhysicalPlan plan = dedupe(inputPlan, DSL.ref("action")); + + assertThat( + execute(plan), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200)))); + } + + @Test + public void dedupe_one_field_allow_2_duplication() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, 2, false, false, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200)))); + } + + @Test + public void dedupe_one_field_in_consecutive_mode() { + when(inputPlan.hasNext()).thenReturn(true, true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-west-2", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, 1, false, true, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-west-2", "action", "POST", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200)))); + } + + @Test + public void dedupe_one_field_in_consecutive_mode_all_consecutive() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, 1, false, true, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)))); + } + + @Test + public void dedupe_one_field_in_consecutive_mode_allow_2_duplication() { + when(inputPlan.hasNext()).thenReturn(true, true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-west-2", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, 2, false, true, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-west-2", "action", "POST", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "PUT", "response", 200)))); + } + + @Test + public void dedupe_two_field() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, DSL.ref("region"), DSL.ref("action"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)), + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200)))); + } + + @Test + public void dedupe_one_field_with_missing_value() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of( "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)))); + } + + @Test + public void dedupe_one_field_with_missing_value_keep_empty() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of( "action", "POST", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, 1, true, false, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)), + tupleValue(ImmutableMap.of( "action", "POST", "response", 200)))); + } + + @Test + public void dedupe_one_field_with_null_value() { + when(inputPlan.hasNext()).thenReturn(true, true, true, false); + when(inputPlan.next()) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "POST", "response", 200))) + .thenReturn( + tupleValue(NULL_MAP)) + .thenReturn( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200))); + + assertThat( + execute(dedupe(inputPlan, DSL.ref("region"))), + contains( + tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)))); + } +} diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java index 2d50ad2a01..01dcd6b788 100644 --- a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java @@ -77,6 +77,7 @@ public void test_PhysicalPlanVisitor_should_return_null() { PhysicalPlan remove = PhysicalPlanDSL.remove(plan, ref); PhysicalPlan eval = PhysicalPlanDSL.eval(plan, Pair.of(ref, ref)); PhysicalPlan sort = PhysicalPlanDSL.sort(plan, 100, Pair.of(SortOption.PPL_ASC, ref)); + PhysicalPlan dedupe = PhysicalPlanDSL.dedupe(plan, ref); assertNull(filter.accept(new PhysicalPlanNodeVisitor() {}, null)); assertNull(aggregation.accept(new PhysicalPlanNodeVisitor() {}, null)); @@ -85,6 +86,7 @@ public void test_PhysicalPlanVisitor_should_return_null() { assertNull(remove.accept(new PhysicalPlanNodeVisitor() {}, null)); assertNull(eval.accept(new PhysicalPlanNodeVisitor() {}, null)); assertNull(sort.accept(new PhysicalPlanNodeVisitor() {}, null)); + assertNull(dedupe.accept(new PhysicalPlanNodeVisitor() {}, null)); } public static class PhysicalPlanPrinter extends PhysicalPlanNodeVisitor { From 88b71d75917655c98063ad16a997ac2f71f29b88 Mon Sep 17 00:00:00 2001 From: penghuo Date: Fri, 22 May 2020 08:47:03 -0700 Subject: [PATCH 2/6] update deduper --- .../sql/planner/physical/DedupeOperator.java | 87 ++++++++++--------- .../planner/physical/DedupeOperatorTest.java | 31 +++++++ 2 files changed, 79 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java index 387e5a9226..ba32f808ff 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java @@ -27,25 +27,24 @@ /** * Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link - * DedupeOperator#dedupeList} - * The result order follow the input order. - * + * DedupeOperator#dedupeList} The result order follow the input order. */ public class DedupeOperator extends PhysicalPlan { private final PhysicalPlan input; private final List dedupeList; - private final Decider decider; - private Integer allowedDuplication = 1; - private Boolean keepEmpty = false; - private Boolean consecutive = false; + private final Deduper> deduper; + private final Integer allowedDuplication; + private final Boolean keepEmpty; private ExprValue next; + private static final Integer ALL_ONE_DUPLICATION = 1; + private static final Boolean IGNORE_EMPTY = false; + private static final Boolean NON_CONSECUTIVE = false; + private static final Predicate NULL_OR_MISSING = v -> v.isNull() || v.isMissing(); public DedupeOperator(PhysicalPlan input, List dedupeList) { - this.input = input; - this.dedupeList = dedupeList; - this.decider = new Dedupe(); + this(input, dedupeList, ALL_ONE_DUPLICATION, IGNORE_EMPTY, NON_CONSECUTIVE); } public DedupeOperator( @@ -58,8 +57,7 @@ public DedupeOperator( this.dedupeList = dedupeList; this.allowedDuplication = allowedDuplication; this.keepEmpty = keepEmpty; - this.consecutive = consecutive; - this.decider = this.consecutive ? new ConsecutiveDedupe() : new Dedupe(); + this.deduper = consecutive ? new ConsecutiveDeduper<>() : new HistoricalDeduper<>(); } @Override @@ -76,7 +74,7 @@ public List getChild() { public boolean hasNext() { while (input.hasNext()) { ExprValue next = input.next(); - if (decider.keep(next)) { + if (keep(next)) { this.next = next; return true; } @@ -90,50 +88,61 @@ public ExprValue next() { } /** - * The Decider test the {@link ExprValue} should be keep (return true) or ignored (return false). + * Test the {@link ExprValue} should be keep or ignore * - *

If any value evaluted by {@link DedupeOperator#dedupeList} is NULL or MISSING, then the + *

If any value evaluted by {@link DedupeOperator#dedupeList} is NULL or MISSING, then the * * return value is decided by keepEmpty option, default value is ignore. + * + * @param value + * @return true: keep, false: ignore */ - abstract class Decider { - public boolean keep(ExprValue value) { - BindingTuple bindingTuple = value.bindingTuples(); - ImmutableList.Builder dedupeKeyBuilder = new ImmutableList.Builder<>(); - for (Expression expression : dedupeList) { - ExprValue exprValue = expression.valueOf(bindingTuple); - if (NULL_OR_MISSING.test(exprValue)) { - return keepEmpty; - } - dedupeKeyBuilder.add(exprValue); + public boolean keep(ExprValue value) { + BindingTuple bindingTuple = value.bindingTuples(); + ImmutableList.Builder dedupeKeyBuilder = new ImmutableList.Builder<>(); + for (Expression expression : dedupeList) { + ExprValue exprValue = expression.valueOf(bindingTuple); + if (NULL_OR_MISSING.test(exprValue)) { + return keepEmpty; } - List dedupeKey = dedupeKeyBuilder.build(); - int seenTimes = seenTimes(dedupeKey); - return seenTimes <= allowedDuplication; + dedupeKeyBuilder.add(exprValue); } + List dedupeKey = dedupeKeyBuilder.build(); + int seenTimes = deduper.seenTimes(dedupeKey); + return seenTimes <= allowedDuplication; + } - /** - * Return how many times the dedupeKey has been seen before. The side effect is the seen times - * will add 1 times after calling this function. - */ - abstract int seenTimes(List dedupeKey); + /** + * Return how many times the dedupeKey has been seen before. The side effect is the seen times + * will add 1 times after calling this function. + * + * @param dedupe key + */ + interface Deduper { + + int seenTimes(K dedupeKey); } - class Dedupe extends Decider { - private final Map, Integer> seenMap = new ConcurrentHashMap<>(); + /** The Historical Deduper monitor the duplicated element with all the seen value. */ + static class HistoricalDeduper implements Deduper { + private final Map seenMap = new ConcurrentHashMap<>(); @Override - int seenTimes(List dedupeKey) { + public int seenTimes(K dedupeKey) { seenMap.putIfAbsent(dedupeKey, 0); return seenMap.computeIfPresent(dedupeKey, (k, v) -> v + 1); } } - class ConsecutiveDedupe extends Decider { - private List lastSeenDedupeKey = null; + /** + * The Consecutive Deduper monitor the duplicated element with consecutive seen value. It means + * only the consecutive duplicated value will be counted. + */ + static class ConsecutiveDeduper implements Deduper { + private K lastSeenDedupeKey = null; private Integer consecutiveCount = 0; @Override - int seenTimes(List dedupeKey) { + public int seenTimes(K dedupeKey) { if (dedupeKey.equals(lastSeenDedupeKey)) { return ++consecutiveCount; } else { diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java index 57475ab35d..f9b27ad05b 100644 --- a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java @@ -19,9 +19,12 @@ import static com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanDSL.dedupe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.when; import com.amazon.opendistroforelasticsearch.sql.expression.DSL; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.DedupeOperator.ConsecutiveDeduper; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.DedupeOperator.HistoricalDeduper; import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; @@ -236,4 +239,32 @@ public void dedupe_one_field_with_null_value() { contains( tupleValue(ImmutableMap.of("region", "us-east-1", "action", "GET", "response", 200)))); } + + @Test + public void historical_deduper() { + HistoricalDeduper deduper = new HistoricalDeduper<>(); + + // first time seen 1 + assertEquals(1, deduper.seenTimes(1)); + // second time seen 1 + assertEquals(2, deduper.seenTimes(1)); + // first time seen 2 + assertEquals(1, deduper.seenTimes(2)); + // third time seen 1 + assertEquals(3, deduper.seenTimes(1)); + } + + @Test + public void consecutive_deduper() { + ConsecutiveDeduper deduper = new ConsecutiveDeduper<>(); + + // first time seen 1 + assertEquals(1, deduper.seenTimes(1)); + // consecutive second time seen 1 + assertEquals(2, deduper.seenTimes(1)); + // first time seen 2 + assertEquals(1, deduper.seenTimes(2)); + // first time seen 1 + assertEquals(1, deduper.seenTimes(1)); + } } From bfe17d20097853aae167839cd79c2e4a586fc2fe Mon Sep 17 00:00:00 2001 From: penghuo Date: Fri, 22 May 2020 13:39:00 -0700 Subject: [PATCH 3/6] add dedup ast node --- .../sql/ast/AbstractNodeVisitor.java | 5 ++ .../sql/ast/dsl/AstDSL.java | 5 ++ .../sql/ast/tree/Dedupe.java | 59 +++++++++++++++++++ .../sql/planner/logical/LogicalDedupe.java | 46 +++++++++++++++ .../logical/LogicalPlanNodeVisitor.java | 4 ++ .../sql/ppl/parser/AstBuilder.java | 17 ++---- .../sql/ppl/parser/AstBuilderTest.java | 12 ++-- .../sql/ppl/utils/ArgumentFactoryTest.java | 9 ++- 8 files changed, 134 insertions(+), 23 deletions(-) create mode 100644 core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java create mode 100644 core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/AbstractNodeVisitor.java index 5e5a943b7b..fd32458892 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/AbstractNodeVisitor.java @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.sql.ast.expression.QualifiedName; import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedAttribute; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation; +import com.amazon.opendistroforelasticsearch.sql.ast.tree.Dedupe; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project; @@ -155,4 +156,8 @@ public T visitLet(Let node, C context) { public T visitSort(Sort node, C context) { return visitChildren(node, context); } + + public T visitDedupe(Dedupe node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java index f3cfc1d6f9..b7691f3c73 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java @@ -33,6 +33,7 @@ import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedAttribute; import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedExpression; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation; +import com.amazon.opendistroforelasticsearch.sql.ast.tree.Dedupe; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project; @@ -235,4 +236,8 @@ public static List defaultSortFieldArgs() { public static Sort sort(UnresolvedPlan input, List options, Field... sorts) { return new Sort(input, options, Arrays.asList(sorts)); } + + public static Dedupe dedupe(UnresolvedPlan input, List options, Field... fields) { + return new Dedupe(input, options, Arrays.asList(fields)); + } } diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java new file mode 100644 index 0000000000..d54b13953b --- /dev/null +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.ast.tree; + +import com.amazon.opendistroforelasticsearch.sql.ast.AbstractNodeVisitor; +import com.amazon.opendistroforelasticsearch.sql.ast.expression.Argument; +import com.amazon.opendistroforelasticsearch.sql.ast.expression.Field; +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +/** + * AST node represent Dedupe operation. + */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +@AllArgsConstructor +public class Dedupe extends UnresolvedPlan { + private UnresolvedPlan child; + private final List options; + private final List sortList; + + @Override + public Dedupe attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visit(this, context); + } +} diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java new file mode 100644 index 0000000000..bfa7261023 --- /dev/null +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.planner.logical; + +import com.amazon.opendistroforelasticsearch.sql.expression.Expression; +import java.util.Arrays; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +/** + * Logical Dedupe Plan + */ +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +public class LogicalDedupe extends LogicalPlan { + private final LogicalPlan child; + private final List dedupeList; + private final Integer allowedDuplication; + private final Boolean keepEmpty; + + @Override + public List getChild() { + return Arrays.asList(child); + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitDedupe(this, context); + } +} diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitor.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitor.java index 2a360b6d88..92629ff71b 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitor.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitor.java @@ -38,6 +38,10 @@ public R visitAggregation(LogicalAggregation plan, C context) { return visitNode(plan, context); } + public R visitDedupe(LogicalDedupe plan, C context) { + return visitNode(plan, context); + } + public R visitRename(LogicalRename plan, C context) { return visitNode(plan, context); } diff --git a/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilder.java index 27f5b65183..08d5696cb4 100644 --- a/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilder.java @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.sql.ast.expression.Map; import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedExpression; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation; +import com.amazon.opendistroforelasticsearch.sql.ast.tree.Dedupe; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project; @@ -154,21 +155,13 @@ public UnresolvedPlan visitStatsCommand(StatsCommandContext ctx) { /** Dedup command */ @Override public UnresolvedPlan visitDedupCommand(DedupCommandContext ctx) { - List sortList = ctx.sortbyClause() == null ? null : - ctx.sortbyClause() - .sortField() - .stream() - .map(this::visitExpression) - .collect(Collectors.toList()); - return new Aggregation( + return new Dedupe( + ArgumentFactory.getArgumentList(ctx), ctx.fieldList() .fieldExpression() .stream() - .map(this::visitExpression) - .collect(Collectors.toList()), - sortList, - null, - ArgumentFactory.getArgumentList(ctx) + .map(field -> (Field) visitExpression(field)) + .collect(Collectors.toList()) ); } diff --git a/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilderTest.java index c5f931b103..733cfac064 100644 --- a/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/parser/AstBuilderTest.java @@ -18,6 +18,7 @@ import com.amazon.opendistroforelasticsearch.sql.ppl.antlr.PPLSyntaxParser; import com.amazon.opendistroforelasticsearch.sql.ast.Node; import java.util.Collections; +import org.junit.Ignore; import org.junit.Test; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.agg; @@ -25,6 +26,7 @@ import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.argument; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.booleanLiteral; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.compare; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.dedupe; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.defaultDedupArgs; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.defaultFieldsArgs; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.defaultSortOptions; @@ -188,16 +190,14 @@ public void testStatsCommandWithAlias() { @Test public void testDedupCommand() { assertEqual("source=t | dedup f1, f2", - agg( + dedupe( relation("t"), - exprList(field("f1"), field("f2")), - null, - null, - defaultDedupArgs() + defaultDedupArgs(), + field("f1"), field("f2") )); } - @Test + @Ignore(value = "disable sortby from the dedup command syntax") public void testDedupCommandWithSortby() { assertEqual("source=t | dedup f1, f2 sortby f3", agg( diff --git a/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java b/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java index 66195410f9..ec5d77b524 100644 --- a/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java +++ b/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java @@ -21,6 +21,7 @@ import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.aggregate; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.argument; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.booleanLiteral; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.dedupe; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.defaultSortOptions; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.defaultSortFieldArgs; import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.exprList; @@ -75,17 +76,15 @@ public void testStatsCommandDefaultArgument() { @Test public void testDedupCommandArgument() { assertEqual("source=t | dedup 3 field0 keepevents=true keepempty=false consecutive=true", - agg( + dedupe( relation("t"), - exprList(field("field0")), - null, - null, exprList( argument("number", intLiteral(3)), argument("keepevents", booleanLiteral(true)), argument("keepempty", booleanLiteral(false)), argument("consecutive", booleanLiteral(true)) - ) + ), + field("field0") )); } From 9f9ffcc5e362fea7ccef3f389b3be059250fb24d Mon Sep 17 00:00:00 2001 From: penghuo Date: Fri, 22 May 2020 14:01:01 -0700 Subject: [PATCH 4/6] add logical dedup --- .../sql/analysis/Analyzer.java | 23 +++++++ .../sql/ast/dsl/AstDSL.java | 1 - .../sql/ast/tree/Dedupe.java | 4 +- .../sql/planner/logical/LogicalDedupe.java | 1 + .../sql/planner/logical/LogicalPlanDSL.java | 17 ++++- .../planner/logical/LogicalDedupeTest.java | 65 +++++++++++++++++++ .../logical/LogicalPlanNodeVisitorTest.java | 2 + ppl/src/main/antlr/OpenDistroPPLLexer.g4 | 1 - ppl/src/main/antlr/OpenDistroPPLParser.g4 | 2 - .../sql/ppl/utils/ArgumentFactory.java | 3 - .../sql/ppl/utils/ArgumentFactoryTest.java | 5 +- 11 files changed, 109 insertions(+), 15 deletions(-) create mode 100644 core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/analysis/Analyzer.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/analysis/Analyzer.java index cd56d7cf0f..5499c77ac4 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/analysis/Analyzer.java @@ -18,9 +18,11 @@ import com.amazon.opendistroforelasticsearch.sql.ast.AbstractNodeVisitor; import com.amazon.opendistroforelasticsearch.sql.ast.expression.Argument; import com.amazon.opendistroforelasticsearch.sql.ast.expression.Field; +import com.amazon.opendistroforelasticsearch.sql.ast.expression.In; import com.amazon.opendistroforelasticsearch.sql.ast.expression.Let; import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedExpression; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation; +import com.amazon.opendistroforelasticsearch.sql.ast.tree.Dedupe; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project; @@ -36,6 +38,7 @@ import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalAggregation; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalDedupe; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalEval; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalFilter; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; @@ -189,4 +192,24 @@ public LogicalPlan visitSort(Sort node, AnalysisContext context) { return new LogicalSort(child, count, sortList); } + + /** Build {@link LogicalDedupe} */ + @Override + public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) { + LogicalPlan child = node.getChild().get(0).accept(this, context); + List options = node.getOptions(); + // Todo, refactor the option. + Integer allowedDuplication = (Integer) options.get(0).getValue().getValue(); + Boolean keepEmpty = (Boolean) options.get(1).getValue().getValue(); + Boolean consecutive = (Boolean) options.get(2).getValue().getValue(); + + return new LogicalDedupe( + child, + node.getFields().stream() + .map(f -> expressionAnalyzer.analyze(f, context)) + .collect(Collectors.toList()), + allowedDuplication, + keepEmpty, + consecutive); + } } diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java index b7691f3c73..906b469d5a 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/dsl/AstDSL.java @@ -216,7 +216,6 @@ public static List defaultStatsArgs() { public static List defaultDedupArgs() { return exprList( argument("number", intLiteral(1)), - argument("keepevents", booleanLiteral(false)), argument("keepempty", booleanLiteral(false)), argument("consecutive", booleanLiteral(false))); } diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java index d54b13953b..226c6eda7a 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/ast/tree/Dedupe.java @@ -39,7 +39,7 @@ public class Dedupe extends UnresolvedPlan { private UnresolvedPlan child; private final List options; - private final List sortList; + private final List fields; @Override public Dedupe attach(UnresolvedPlan child) { @@ -54,6 +54,6 @@ public List getChild() { @Override public T accept(AbstractNodeVisitor nodeVisitor, C context) { - return nodeVisitor.visit(this, context); + return nodeVisitor.visitDedupe(this, context); } } diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java index bfa7261023..69f380df85 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java @@ -33,6 +33,7 @@ public class LogicalDedupe extends LogicalPlan { private final List dedupeList; private final Integer allowedDuplication; private final Boolean keepEmpty; + private final Boolean consecutive; @Override public List getChild() { diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java index 85b8f75f2f..bc88739bdf 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java @@ -56,12 +56,23 @@ public static LogicalPlan remove(LogicalPlan input, ReferenceExpression... field return new LogicalRemove(input, ImmutableSet.copyOf(fields)); } - public static LogicalPlan eval(LogicalPlan input, Pair... expressions) { + public static LogicalPlan eval( + LogicalPlan input, Pair... expressions) { return new LogicalEval(input, Arrays.asList(expressions)); } - public static LogicalPlan sort(LogicalPlan input, - Integer count, Pair... sorts) { + public static LogicalPlan sort( + LogicalPlan input, Integer count, Pair... sorts) { return new LogicalSort(input, count, Arrays.asList(sorts)); } + + public static LogicalPlan dedup( + LogicalPlan input, + int allowedDuplication, + boolean keepEmpty, + boolean consecutive, + Expression... fields) { + return new LogicalDedupe( + input, Arrays.asList(fields), allowedDuplication, keepEmpty, consecutive); + } } diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java new file mode 100644 index 0000000000..062423696c --- /dev/null +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.planner.logical; + +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.argument; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.booleanLiteral; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.dedupe; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.defaultDedupArgs; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.exprList; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.field; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.intLiteral; +import static com.amazon.opendistroforelasticsearch.sql.ast.dsl.AstDSL.relation; + +import com.amazon.opendistroforelasticsearch.sql.analysis.AnalyzerTestBase; +import com.amazon.opendistroforelasticsearch.sql.expression.DSL; +import org.junit.jupiter.api.Test; + +class LogicalDedupeTest extends AnalyzerTestBase { + @Test + public void analyze_dedup_with_two_field_with_default_option() { + assertAnalyzeEqual( + LogicalPlanDSL.dedup( + LogicalPlanDSL.relation("schema"), + 1, false, false, + DSL.ref("integer_value"), + DSL.ref("double_value")), + dedupe( + relation("schema"), + defaultDedupArgs(), + field("integer_value"), field("double_value") + )); + } + + @Test + public void analyze_dedup_with_one_field_with_customize_option() { + assertAnalyzeEqual( + LogicalPlanDSL.dedup( + LogicalPlanDSL.relation("schema"), + 3, false, true, + DSL.ref("integer_value"), + DSL.ref("double_value")), + dedupe( + relation("schema"), + exprList( + argument("number", intLiteral(3)), + argument("keepempty", booleanLiteral(false)), + argument("consecutive", booleanLiteral(true)) + ), + field("integer_value"), field("double_value") + )); + } +} \ No newline at end of file diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index f53b02438f..2b54b57987 100644 --- a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -64,6 +64,7 @@ public void testAbstractPlanNodeVisitorShouldReturnNull() { LogicalPlan remove = LogicalPlanDSL.remove(relation, ref); LogicalPlan eval = LogicalPlanDSL.eval(relation, Pair.of(ref, expression)); LogicalPlan sort = LogicalPlanDSL.sort(relation, 100, Pair.of(SortOption.PPL_ASC, expression)); + LogicalPlan dedup = LogicalPlanDSL.dedup(relation, 1, false, false, expression); assertNull(relation.accept(new LogicalPlanNodeVisitor() {}, null)); assertNull(filter.accept(new LogicalPlanNodeVisitor() {}, null)); @@ -73,6 +74,7 @@ public void testAbstractPlanNodeVisitorShouldReturnNull() { assertNull(remove.accept(new LogicalPlanNodeVisitor() {}, null)); assertNull(eval.accept(new LogicalPlanNodeVisitor() {}, null)); assertNull(sort.accept(new LogicalPlanNodeVisitor() {}, null)); + assertNull(dedup.accept(new LogicalPlanNodeVisitor() {}, null)); } private static class NodesCount extends LogicalPlanNodeVisitor { diff --git a/ppl/src/main/antlr/OpenDistroPPLLexer.g4 b/ppl/src/main/antlr/OpenDistroPPLLexer.g4 index c543d086cb..f8ff8560ab 100644 --- a/ppl/src/main/antlr/OpenDistroPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenDistroPPLLexer.g4 @@ -47,7 +47,6 @@ IP: 'IP'; NUM: 'NUM'; // ARGUMENT KEYWORDS -KEEPEVENTS: 'KEEPEVENTS'; KEEPEMPTY: 'KEEPEMPTY'; CONSECUTIVE: 'CONSECUTIVE'; DEDUP_SPLITVALUES: 'DEDUP_SPLITVALUES'; diff --git a/ppl/src/main/antlr/OpenDistroPPLParser.g4 b/ppl/src/main/antlr/OpenDistroPPLParser.g4 index 41648ef709..10927b5141 100644 --- a/ppl/src/main/antlr/OpenDistroPPLParser.g4 +++ b/ppl/src/main/antlr/OpenDistroPPLParser.g4 @@ -63,10 +63,8 @@ dedupCommand : DEDUP (number=integerLiteral)? fieldList - (KEEPEVENTS EQUAL keeevents=booleanLiteral)? (KEEPEMPTY EQUAL keepempty=booleanLiteral)? (CONSECUTIVE EQUAL consecutive=booleanLiteral)? - (SORTBY sortbyClause)? ; sortCommand diff --git a/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactory.java b/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactory.java index 9b254ae372..e2a20a8b07 100644 --- a/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactory.java +++ b/ppl/src/main/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactory.java @@ -80,9 +80,6 @@ public static List getArgumentList(DedupCommandContext ctx) { ctx.number != null ? new Argument("number", getArgumentValue(ctx.number)) : new Argument("number", new Literal(1, DataType.INTEGER)), - ctx.keeevents != null - ? new Argument("keepevents", getArgumentValue(ctx.keeevents)) - : new Argument("keepevents", new Literal(false, DataType.BOOLEAN)), ctx.keepempty != null ? new Argument("keepempty", getArgumentValue(ctx.keepempty)) : new Argument("keepempty", new Literal(false, DataType.BOOLEAN)), diff --git a/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java b/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java index ec5d77b524..60d2866f73 100644 --- a/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java +++ b/ppl/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/utils/ArgumentFactoryTest.java @@ -75,12 +75,11 @@ public void testStatsCommandDefaultArgument() { @Test public void testDedupCommandArgument() { - assertEqual("source=t | dedup 3 field0 keepevents=true keepempty=false consecutive=true", + assertEqual("source=t | dedup 3 field0 keepempty=false consecutive=true", dedupe( relation("t"), exprList( argument("number", intLiteral(3)), - argument("keepevents", booleanLiteral(true)), argument("keepempty", booleanLiteral(false)), argument("consecutive", booleanLiteral(true)) ), @@ -91,7 +90,7 @@ public void testDedupCommandArgument() { @Test public void testDedupCommandDefaultArgument() { assertEqual( - "source=t | dedup 1 field0 keepevents=false keepempty=false consecutive=false", + "source=t | dedup 1 field0 keepempty=false consecutive=false", "source=t | dedup field0" ); } From 0659600ede34751777786623ca206ea5f1cb9ecc Mon Sep 17 00:00:00 2001 From: penghuo Date: Fri, 22 May 2020 15:09:32 -0700 Subject: [PATCH 5/6] add dedup operator to ElasticsearchIndex --- .../sql/planner/logical/LogicalDedupe.java | 2 + .../sql/planner/logical/LogicalPlanDSL.java | 6 +- .../sql/planner/physical/DedupeOperator.java | 11 ++- .../planner/logical/LogicalDedupeTest.java | 5 +- .../logical/LogicalPlanNodeVisitorTest.java | 2 +- .../storage/ElasticsearchIndex.java | 7 ++ .../storage/ElasticsearchIndexTest.java | 70 +++++++++++-------- 7 files changed, 65 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java index 69f380df85..f410e56d46 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupe.java @@ -19,12 +19,14 @@ import java.util.Arrays; import java.util.List; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; /** * Logical Dedupe Plan */ +@Getter @ToString @EqualsAndHashCode(callSuper = false) @RequiredArgsConstructor diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java index bc88739bdf..fb1cbb69f4 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanDSL.java @@ -66,7 +66,11 @@ public static LogicalPlan sort( return new LogicalSort(input, count, Arrays.asList(sorts)); } - public static LogicalPlan dedup( + public static LogicalPlan dedupe(LogicalPlan input, Expression... fields) { + return dedupe(input, 1, false, false, fields); + } + + public static LogicalPlan dedupe( LogicalPlan input, int allowedDuplication, boolean keepEmpty, diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java index ba32f808ff..7232893007 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java @@ -24,18 +24,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; +import lombok.EqualsAndHashCode; /** * Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link * DedupeOperator#dedupeList} The result order follow the input order. */ +@EqualsAndHashCode public class DedupeOperator extends PhysicalPlan { private final PhysicalPlan input; private final List dedupeList; - private final Deduper> deduper; private final Integer allowedDuplication; private final Boolean keepEmpty; - private ExprValue next; + private final Boolean consecutive; + + @EqualsAndHashCode.Exclude private final Deduper> deduper; + @EqualsAndHashCode.Exclude private ExprValue next; private static final Integer ALL_ONE_DUPLICATION = 1; private static final Boolean IGNORE_EMPTY = false; @@ -57,7 +61,8 @@ public DedupeOperator( this.dedupeList = dedupeList; this.allowedDuplication = allowedDuplication; this.keepEmpty = keepEmpty; - this.deduper = consecutive ? new ConsecutiveDeduper<>() : new HistoricalDeduper<>(); + this.consecutive = consecutive; + this.deduper = this.consecutive ? new ConsecutiveDeduper<>() : new HistoricalDeduper<>(); } @Override diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java index 062423696c..1589c3e41d 100644 --- a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalDedupeTest.java @@ -32,9 +32,8 @@ class LogicalDedupeTest extends AnalyzerTestBase { @Test public void analyze_dedup_with_two_field_with_default_option() { assertAnalyzeEqual( - LogicalPlanDSL.dedup( + LogicalPlanDSL.dedupe( LogicalPlanDSL.relation("schema"), - 1, false, false, DSL.ref("integer_value"), DSL.ref("double_value")), dedupe( @@ -47,7 +46,7 @@ public void analyze_dedup_with_two_field_with_default_option() { @Test public void analyze_dedup_with_one_field_with_customize_option() { assertAnalyzeEqual( - LogicalPlanDSL.dedup( + LogicalPlanDSL.dedupe( LogicalPlanDSL.relation("schema"), 3, false, true, DSL.ref("integer_value"), diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index 2b54b57987..53157ac01b 100644 --- a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -64,7 +64,7 @@ public void testAbstractPlanNodeVisitorShouldReturnNull() { LogicalPlan remove = LogicalPlanDSL.remove(relation, ref); LogicalPlan eval = LogicalPlanDSL.eval(relation, Pair.of(ref, expression)); LogicalPlan sort = LogicalPlanDSL.sort(relation, 100, Pair.of(SortOption.PPL_ASC, expression)); - LogicalPlan dedup = LogicalPlanDSL.dedup(relation, 1, false, false, expression); + LogicalPlan dedup = LogicalPlanDSL.dedupe(relation, 1, false, false, expression); assertNull(relation.accept(new LogicalPlanNodeVisitor() {}, null)); assertNull(filter.accept(new LogicalPlanNodeVisitor() {}, null)); diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java index ee6f8e9232..569b78fa79 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.sql.elasticsearch.client.ElasticsearchClient; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.mapping.IndexMapping; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalAggregation; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalDedupe; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalEval; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalFilter; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; @@ -30,6 +31,7 @@ import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalRename; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalSort; import com.amazon.opendistroforelasticsearch.sql.planner.physical.AggregationOperator; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.DedupeOperator; import com.amazon.opendistroforelasticsearch.sql.planner.physical.EvalOperator; import com.amazon.opendistroforelasticsearch.sql.planner.physical.FilterOperator; import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; @@ -104,6 +106,11 @@ public PhysicalPlan implement(LogicalPlan plan) { * will accumulate (push down) Elasticsearch query and aggregation DSL on index scan. */ return plan.accept(new LogicalPlanNodeVisitor() { + @Override + public PhysicalPlan visitDedupe(LogicalDedupe node, ElasticsearchIndexScan context) { + return new DedupeOperator(visitChild(node, context), node.getDedupeList(), + node.getAllowedDuplication(), node.getKeepEmpty(), node.getConsecutive()); + } @Override public PhysicalPlan visitProject(LogicalProject node, ElasticsearchIndexScan context) { diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java index cea0c57702..2024e6b0f3 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java @@ -27,6 +27,7 @@ import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator; import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL; import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanDSL; import com.amazon.opendistroforelasticsearch.sql.storage.Table; import com.google.common.collect.ImmutableMap; @@ -44,6 +45,7 @@ import static com.amazon.opendistroforelasticsearch.sql.expression.DSL.literal; import static com.amazon.opendistroforelasticsearch.sql.expression.DSL.ref; import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.aggregation; +import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.dedupe; import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.eval; import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.filter; import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.project; @@ -117,6 +119,7 @@ void implementOtherLogicalOperators() { String indexName = "test"; ReferenceExpression include = ref("age"); ReferenceExpression exclude = ref("name"); + ReferenceExpression dedupeField = ref("name"); Expression filterExpr = literal(ExprBooleanValue.ofTrue()); List groupByExprs = Arrays.asList(ref("age")); List aggregators = Arrays.asList(new AvgAggregator(groupByExprs, ExprType.DOUBLE)); @@ -125,28 +128,32 @@ void implementOtherLogicalOperators() { Integer sortCount = 100; Pair sortField = ImmutablePair.of(SortOption.PPL_ASC, ref("name1")); + LogicalPlan plan = project( - sort( - eval( - remove( - rename( - aggregation( - filter( - relation(indexName), - filterExpr + LogicalPlanDSL.dedupe( + sort( + eval( + remove( + rename( + aggregation( + filter( + relation(indexName), + filterExpr + ), + aggregators, + groupByExprs ), - aggregators, - groupByExprs + mappings ), - mappings + exclude ), - exclude + newEvalField ), - newEvalField + sortCount, + sortField ), - sortCount, - sortField + dedupeField ), include ); @@ -154,26 +161,29 @@ void implementOtherLogicalOperators() { Table index = new ElasticsearchIndex(client, indexName); assertEquals( PhysicalPlanDSL.project( - PhysicalPlanDSL.sort( - PhysicalPlanDSL.eval( - PhysicalPlanDSL.remove( - PhysicalPlanDSL.rename( - PhysicalPlanDSL.agg( - PhysicalPlanDSL.filter( - new ElasticsearchIndexScan(client, indexName), - filterExpr + PhysicalPlanDSL.dedupe( + PhysicalPlanDSL.sort( + PhysicalPlanDSL.eval( + PhysicalPlanDSL.remove( + PhysicalPlanDSL.rename( + PhysicalPlanDSL.agg( + PhysicalPlanDSL.filter( + new ElasticsearchIndexScan(client, indexName), + filterExpr + ), + aggregators, + groupByExprs ), - aggregators, - groupByExprs + mappings ), - mappings + exclude ), - exclude + newEvalField ), - newEvalField + sortCount, + sortField ), - sortCount, - sortField + dedupeField ), include ), From 842837031dd67e5dc4fee637bb5d7d0df2602532 Mon Sep 17 00:00:00 2001 From: penghuo Date: Tue, 26 May 2020 09:12:22 -0700 Subject: [PATCH 6/6] address comments --- .../sql/planner/physical/DedupeOperator.java | 59 ++++++++++--------- .../planner/physical/DedupeOperatorTest.java | 7 +-- 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java index 7232893007..bdd22d186a 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperator.java @@ -23,8 +23,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; import java.util.function.Predicate; import lombok.EqualsAndHashCode; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; /** * Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link @@ -44,13 +47,15 @@ public class DedupeOperator extends PhysicalPlan { private static final Integer ALL_ONE_DUPLICATION = 1; private static final Boolean IGNORE_EMPTY = false; private static final Boolean NON_CONSECUTIVE = false; - private static final Predicate NULL_OR_MISSING = v -> v.isNull() || v.isMissing(); + private static final Integer SEEN_FIRST_TIME = 1; + @NonNull public DedupeOperator(PhysicalPlan input, List dedupeList) { this(input, dedupeList, ALL_ONE_DUPLICATION, IGNORE_EMPTY, NON_CONSECUTIVE); } + @NonNull public DedupeOperator( PhysicalPlan input, List dedupeList, @@ -62,7 +67,7 @@ public DedupeOperator( this.allowedDuplication = allowedDuplication; this.keepEmpty = keepEmpty; this.consecutive = consecutive; - this.deduper = this.consecutive ? new ConsecutiveDeduper<>() : new HistoricalDeduper<>(); + this.deduper = this.consecutive ? Deduper.consecutiveDeduper() : Deduper.historicalDeduper(); } @Override @@ -122,38 +127,38 @@ public boolean keep(ExprValue value) { * * @param dedupe key */ - interface Deduper { - - int seenTimes(K dedupeKey); - } - - /** The Historical Deduper monitor the duplicated element with all the seen value. */ - static class HistoricalDeduper implements Deduper { + @RequiredArgsConstructor + static class Deduper { + private final BiFunction, K, Integer> seenFirstTime; private final Map seenMap = new ConcurrentHashMap<>(); - @Override - public int seenTimes(K dedupeKey) { - seenMap.putIfAbsent(dedupeKey, 0); - return seenMap.computeIfPresent(dedupeKey, (k, v) -> v + 1); + /** The Historical Deduper monitor the duplicated element with all the seen value. */ + public static Deduper historicalDeduper() { + return new Deduper<>( + (map, key) -> { + map.put(key, SEEN_FIRST_TIME); + return SEEN_FIRST_TIME; + }); } - } - /** - * The Consecutive Deduper monitor the duplicated element with consecutive seen value. It means - * only the consecutive duplicated value will be counted. - */ - static class ConsecutiveDeduper implements Deduper { - private K lastSeenDedupeKey = null; - private Integer consecutiveCount = 0; + /** + * The Consecutive Deduper monitor the duplicated element with consecutive seen value. It means + * only the consecutive duplicated value will be counted. + */ + public static Deduper consecutiveDeduper() { + return new Deduper<>( + (map, key) -> { + map.clear(); + map.put(key, SEEN_FIRST_TIME); + return SEEN_FIRST_TIME; + }); + } - @Override public int seenTimes(K dedupeKey) { - if (dedupeKey.equals(lastSeenDedupeKey)) { - return ++consecutiveCount; + if (seenMap.containsKey(dedupeKey)) { + return seenMap.computeIfPresent(dedupeKey, (k, v) -> v + 1); } else { - lastSeenDedupeKey = dedupeKey; - consecutiveCount = 1; - return consecutiveCount; + return seenFirstTime.apply(seenMap, dedupeKey); } } } diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java index f9b27ad05b..7906f89eb2 100644 --- a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/planner/physical/DedupeOperatorTest.java @@ -23,8 +23,7 @@ import static org.mockito.Mockito.when; import com.amazon.opendistroforelasticsearch.sql.expression.DSL; -import com.amazon.opendistroforelasticsearch.sql.planner.physical.DedupeOperator.ConsecutiveDeduper; -import com.amazon.opendistroforelasticsearch.sql.planner.physical.DedupeOperator.HistoricalDeduper; +import com.amazon.opendistroforelasticsearch.sql.planner.physical.DedupeOperator.Deduper; import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; @@ -242,7 +241,7 @@ public void dedupe_one_field_with_null_value() { @Test public void historical_deduper() { - HistoricalDeduper deduper = new HistoricalDeduper<>(); + Deduper deduper = Deduper.historicalDeduper(); // first time seen 1 assertEquals(1, deduper.seenTimes(1)); @@ -256,7 +255,7 @@ public void historical_deduper() { @Test public void consecutive_deduper() { - ConsecutiveDeduper deduper = new ConsecutiveDeduper<>(); + Deduper deduper = Deduper.consecutiveDeduper(); // first time seen 1 assertEquals(1, deduper.seenTimes(1));