Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

[PPL] Support dedup command #485

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import com.amazon.opendistroforelasticsearch.sql.ast.AbstractNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Argument;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Field;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.In;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Let;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedExpression;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Dedupe;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project;
Expand All @@ -36,6 +38,7 @@
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalAggregation;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalDedupe;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalEval;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalFilter;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
Expand Down Expand Up @@ -189,4 +192,24 @@ public LogicalPlan visitSort(Sort node, AnalysisContext context) {

return new LogicalSort(child, count, sortList);
}

/** Build {@link LogicalDedupe} */
@Override
public LogicalPlan visitDedupe(Dedupe node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
List<Argument> options = node.getOptions();
// Todo, refactor the option.
Integer allowedDuplication = (Integer) options.get(0).getValue().getValue();
Boolean keepEmpty = (Boolean) options.get(1).getValue().getValue();
Boolean consecutive = (Boolean) options.get(2).getValue().getValue();

return new LogicalDedupe(
child,
node.getFields().stream()
.map(f -> expressionAnalyzer.analyze(f, context))
.collect(Collectors.toList()),
allowedDuplication,
keepEmpty,
consecutive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.amazon.opendistroforelasticsearch.sql.ast.expression.QualifiedName;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedAttribute;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Dedupe;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project;
Expand Down Expand Up @@ -155,4 +156,8 @@ public T visitLet(Let node, C context) {
public T visitSort(Sort node, C context) {
return visitChildren(node, context);
}

public T visitDedupe(Dedupe node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedAttribute;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedExpression;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Dedupe;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project;
Expand Down Expand Up @@ -215,7 +216,6 @@ public static List<Argument> defaultStatsArgs() {
public static List<Argument> defaultDedupArgs() {
return exprList(
argument("number", intLiteral(1)),
argument("keepevents", booleanLiteral(false)),
argument("keepempty", booleanLiteral(false)),
argument("consecutive", booleanLiteral(false)));
}
Expand All @@ -235,4 +235,8 @@ public static List<Argument> defaultSortFieldArgs() {
public static Sort sort(UnresolvedPlan input, List<Argument> options, Field... sorts) {
return new Sort(input, options, Arrays.asList(sorts));
}

public static Dedupe dedupe(UnresolvedPlan input, List<Argument> options, Field... fields) {
return new Dedupe(input, options, Arrays.asList(fields));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.ast.tree;

import com.amazon.opendistroforelasticsearch.sql.ast.AbstractNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Argument;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Field;
import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* AST node represent Dedupe operation.
*/
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
public class Dedupe extends UnresolvedPlan {
private UnresolvedPlan child;
private final List<Argument> options;
private final List<Field> fields;

@Override
public Dedupe attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitDedupe(this, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.planner.logical;

import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import java.util.Arrays;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;

/**
* Logical Dedupe Plan
*/
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class LogicalDedupe extends LogicalPlan {
private final LogicalPlan child;
private final List<Expression> dedupeList;
private final Integer allowedDuplication;
private final Boolean keepEmpty;
private final Boolean consecutive;

@Override
public List<LogicalPlan> getChild() {
return Arrays.asList(child);
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitDedupe(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,27 @@ public static LogicalPlan remove(LogicalPlan input, ReferenceExpression... field
return new LogicalRemove(input, ImmutableSet.copyOf(fields));
}

public static LogicalPlan eval(LogicalPlan input, Pair<ReferenceExpression, Expression>... expressions) {
public static LogicalPlan eval(
LogicalPlan input, Pair<ReferenceExpression, Expression>... expressions) {
return new LogicalEval(input, Arrays.asList(expressions));
}

public static LogicalPlan sort(LogicalPlan input,
Integer count, Pair<SortOption, Expression>... sorts) {
public static LogicalPlan sort(
LogicalPlan input, Integer count, Pair<SortOption, Expression>... sorts) {
return new LogicalSort(input, count, Arrays.asList(sorts));
}

public static LogicalPlan dedupe(LogicalPlan input, Expression... fields) {
return dedupe(input, 1, false, false, fields);
}

public static LogicalPlan dedupe(
LogicalPlan input,
int allowedDuplication,
boolean keepEmpty,
boolean consecutive,
Expression... fields) {
return new LogicalDedupe(
input, Arrays.asList(fields), allowedDuplication, keepEmpty, consecutive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public R visitAggregation(LogicalAggregation plan, C context) {
return visitNode(plan, context);
}

public R visitDedupe(LogicalDedupe plan, C context) {
return visitNode(plan, context);
}

public R visitRename(LogicalRename plan, C context) {
return visitNode(plan, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.planner.physical;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.storage.bindingtuple.BindingTuple;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import lombok.EqualsAndHashCode;

/**
* Dedupe operator. Dedupe the input {@link ExprValue} by using the {@link
* DedupeOperator#dedupeList} The result order follow the input order.
*/
@EqualsAndHashCode
public class DedupeOperator extends PhysicalPlan {
private final PhysicalPlan input;
private final List<Expression> dedupeList;
private final Integer allowedDuplication;
private final Boolean keepEmpty;
private final Boolean consecutive;

@EqualsAndHashCode.Exclude private final Deduper<List<ExprValue>> deduper;
@EqualsAndHashCode.Exclude private ExprValue next;

private static final Integer ALL_ONE_DUPLICATION = 1;
private static final Boolean IGNORE_EMPTY = false;
private static final Boolean NON_CONSECUTIVE = false;

private static final Predicate<ExprValue> NULL_OR_MISSING = v -> v.isNull() || v.isMissing();

public DedupeOperator(PhysicalPlan input, List<Expression> dedupeList) {
this(input, dedupeList, ALL_ONE_DUPLICATION, IGNORE_EMPTY, NON_CONSECUTIVE);
}

public DedupeOperator(
PhysicalPlan input,
List<Expression> dedupeList,
Integer allowedDuplication,
Boolean keepEmpty,
Boolean consecutive) {
this.input = input;
this.dedupeList = dedupeList;
this.allowedDuplication = allowedDuplication;
this.keepEmpty = keepEmpty;
this.consecutive = consecutive;
this.deduper = this.consecutive ? new ConsecutiveDeduper<>() : new HistoricalDeduper<>();
}

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitDedupe(this, context);
}

@Override
public List<PhysicalPlan> getChild() {
return Collections.singletonList(input);
}

@Override
public boolean hasNext() {
while (input.hasNext()) {
ExprValue next = input.next();
if (keep(next)) {
this.next = next;
return true;
}
}
return false;
}

@Override
public ExprValue next() {
return this.next;
}

/**
* Test the {@link ExprValue} should be keep or ignore
*
* <p>If any value evaluted by {@link DedupeOperator#dedupeList} is NULL or MISSING, then the *
* return value is decided by keepEmpty option, default value is ignore.
*
* @param value
* @return true: keep, false: ignore
*/
public boolean keep(ExprValue value) {
BindingTuple bindingTuple = value.bindingTuples();
ImmutableList.Builder<ExprValue> dedupeKeyBuilder = new ImmutableList.Builder<>();
for (Expression expression : dedupeList) {
ExprValue exprValue = expression.valueOf(bindingTuple);
if (NULL_OR_MISSING.test(exprValue)) {
return keepEmpty;
penghuo marked this conversation as resolved.
Show resolved Hide resolved
}
dedupeKeyBuilder.add(exprValue);
}
List<ExprValue> dedupeKey = dedupeKeyBuilder.build();
int seenTimes = deduper.seenTimes(dedupeKey);
return seenTimes <= allowedDuplication;
}

/**
* Return how many times the dedupeKey has been seen before. The side effect is the seen times
* will add 1 times after calling this function.
*
* @param <K> dedupe key
*/
interface Deduper<K> {

int seenTimes(K dedupeKey);
}

/** The Historical Deduper monitor the duplicated element with all the seen value. */
static class HistoricalDeduper<K> implements Deduper<K> {
private final Map<K, Integer> seenMap = new ConcurrentHashMap<>();

@Override
public int seenTimes(K dedupeKey) {
seenMap.putIfAbsent(dedupeKey, 0);
return seenMap.computeIfPresent(dedupeKey, (k, v) -> v + 1);
}
}

/**
* The Consecutive Deduper monitor the duplicated element with consecutive seen value. It means
* only the consecutive duplicated value will be counted.
*/
static class ConsecutiveDeduper<K> implements Deduper<K> {
private K lastSeenDedupeKey = null;
private Integer consecutiveCount = 0;

@Override
public int seenTimes(K dedupeKey) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
if (dedupeKey.equals(lastSeenDedupeKey)) {
return ++consecutiveCount;
} else {
lastSeenDedupeKey = dedupeKey;
consecutiveCount = 1;
return consecutiveCount;
}
}
}
}
Loading