Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Jan 12, 2024
1 parent 1f7a475 commit 39c1502
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 132 deletions.
3 changes: 2 additions & 1 deletion kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ public interface Scan {
* Get the remaining filter that is not guaranteed to be satisfied for the data Delta Kernel
* returns. This filter is used by Delta Kernel to do data skipping when possible.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @return the remaining filter as a {@link Predicate}.
*/
Optional<Predicate> getRemainingFilter();
Optional<Predicate> getRemainingFilter(TableClient tableClient);

/**
* Get the scan state associated with the current scan. This state is common across all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,18 @@ public class ScanImpl implements Scan {
private final Metadata metadata;
private final LogReplay logReplay;
private final Path dataPath;
private final Optional<Tuple2<Predicate, Predicate>> partitionAndDataFilters;
private final Optional<Predicate> filter;
// Partition column names in lower case.
private final Set<String> partitionColumnNames;
private boolean accessedScanFiles;

private boolean areFiltersSplit;
// Subset of partition predicate the expression handler can support evaluating
private Optional<Predicate> metadataPredicate;
// Subset of the given predicate that Kernel can't guarantee that it can complete satisfy
// It could be predicate on the data columns and/or unsupported predicate on partition columns
private Optional<Predicate> remainingPredicate;

public ScanImpl(
StructType snapshotSchema,
StructType readSchema,
Expand All @@ -78,8 +85,8 @@ public ScanImpl(
this.protocol = protocol;
this.metadata = metadata;
this.logReplay = logReplay;
this.partitionColumnNames = loadPartitionColNames(); // must be called before `splitFilters`
this.partitionAndDataFilters = splitFilters(filter);
this.filter = filter;
this.partitionColumnNames = loadPartitionColNames();
this.dataPath = dataPath;
}

Expand All @@ -96,7 +103,7 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(TableClient tableCl
accessedScanFiles = true;

// Generate data skipping filter and decide if we should read the stats column
Optional<Predicate> dataSkippingFilter = getDataSkippingFilter();
Optional<Predicate> dataSkippingFilter = getDataSkippingFilter(tableClient);
boolean shouldReadStats = dataSkippingFilter.isPresent();

// Get active AddFiles via log replay
Expand Down Expand Up @@ -150,21 +157,38 @@ public Row getScanState(TableClient tableClient) {
}

@Override
public Optional<Predicate> getRemainingFilter() {
return getDataFilters();
public Optional<Predicate> getRemainingFilter(TableClient tableClient) {
splitFilters(tableClient);
return remainingPredicate;
}

private Optional<Tuple2<Predicate, Predicate>> splitFilters(Optional<Predicate> filter) {
return filter.map(predicate ->
PartitionUtils.splitMetadataAndDataPredicates(predicate, partitionColumnNames));
private void splitFilters(TableClient tableClient) {
if (areFiltersSplit) {
return;
}
filter.map(predicate -> {
Tuple2<Predicate, Predicate> metadataAndNonMetadataFilters =
PartitionUtils.splitPredicates(
tableClient.getExpressionHandler(),
metadata.getSchema(),
predicate,
partitionColumnNames);

metadataPredicate = removeAlwaysTrue(Optional.of(metadataAndNonMetadataFilters._1));
remainingPredicate = removeAlwaysTrue(Optional.of(metadataAndNonMetadataFilters._2));
return null;
});
areFiltersSplit = true;
}

private Optional<Predicate> getDataFilters() {
return removeAlwaysTrue(partitionAndDataFilters.map(filters -> filters._2));
private Optional<Predicate> getDataFilters(TableClient tableClient) {
splitFilters(tableClient);
return remainingPredicate;
}

private Optional<Predicate> getPartitionsFilters() {
return removeAlwaysTrue(partitionAndDataFilters.map(filters -> filters._1));
private Optional<Predicate> getPartitionsFilters(TableClient tableClient) {
splitFilters(tableClient);
return metadataPredicate;
}

/**
Expand All @@ -178,7 +202,7 @@ private Optional<Predicate> removeAlwaysTrue(Optional<Predicate> predicate) {
private CloseableIterator<FilteredColumnarBatch> applyPartitionPruning(
TableClient tableClient,
CloseableIterator<FilteredColumnarBatch> scanFileIter) {
Optional<Predicate> partitionPredicate = getPartitionsFilters();
Optional<Predicate> partitionPredicate = getPartitionsFilters(tableClient);
if (!partitionPredicate.isPresent()) {
// There is no partition filter, return the scan file iterator as is.
return scanFileIter;
Expand Down Expand Up @@ -227,8 +251,8 @@ public void close() throws IOException {
};
}

private Optional<Predicate> getDataSkippingFilter() {
return getDataFilters().flatMap(dataFilters ->
private Optional<Predicate> getDataSkippingFilter(TableClient tableClient) {
return getDataFilters(tableClient).flatMap(dataFilters ->
DataSkippingUtils.constructDataSkippingFilter(dataFilters, metadata.getSchema())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import java.util.List;
import static java.lang.String.format;

import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.expressions.*;
import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

public class ExpressionUtils {
Expand Down Expand Up @@ -53,4 +54,23 @@ public static Expression getRight(Expression expression) {
format("%s: expected two inputs, but got %s", expression, children.size()));
return children.get(1);
}

/**
* Utility method to combine the given predicates with AND
* @return
*/
public static Predicate combineWithAndOp(Predicate left, Predicate right) {
String leftName = left.getName().toUpperCase();
String rightName = right.getName().toUpperCase();
if (leftName.equals("ALWAYS_FALSE") || rightName.equals("ALWAYS_FALSE")) {
return ALWAYS_FALSE;
}
if (leftName.equals("ALWAYS_TRUE")) {
return right;
}
if (rightName.equals("ALWAYS_TRUE")) {
return left;
}
return new And(left, right);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.expressions.*;
import io.delta.kernel.types.*;
import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE;
import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;

import io.delta.kernel.internal.InternalScanFileUtils;
import static io.delta.kernel.internal.util.ExpressionUtils.*;

public class PartitionUtils {
private PartitionUtils() {}
Expand Down Expand Up @@ -102,30 +102,37 @@ public static ColumnarBatch withPartitionColumns(
}

/**
* Split the given predicate into predicate on partition columns and predicate on data columns.
* Split the given predicate into predicate that a part that can be guaranteed to be satisfied
* by kernel when returning the scan file and the best effort predicate that Kernel uses for
* skipping but doesn't guarantee the returned scan files has data that doesn't satisfy the
* predicate.
*
* @param exprHandler
* @param predicate
* @param partitionColNames
* @return Tuple of partition column predicate and data column predicate.
* @return Tuple of guaranteed predicate and best effort predicate.
*/
public static Tuple2<Predicate, Predicate> splitMetadataAndDataPredicates(
Predicate predicate,
Set<String> partitionColNames) {
public static Tuple2<Predicate, Predicate> splitPredicates(
ExpressionHandler exprHandler,
StructType tableSchema,
Predicate predicate,
Set<String> partitionColNames) {
String predicateName = predicate.getName();
List<Expression> children = predicate.getChildren();
if ("AND".equalsIgnoreCase(predicateName)) {
Predicate left = (Predicate) children.get(0);
Predicate right = (Predicate) children.get(1);
Predicate left = asPredicate(getLeft(predicate));
Predicate right = asPredicate(getRight(predicate));
Tuple2<Predicate, Predicate> leftResult =
splitMetadataAndDataPredicates(left, partitionColNames);
splitPredicates(exprHandler, tableSchema, left, partitionColNames);
Tuple2<Predicate, Predicate> rightResult =
splitMetadataAndDataPredicates(right, partitionColNames);
splitPredicates(exprHandler, tableSchema, right, partitionColNames);

return new Tuple2<>(
combineWithAndOp(leftResult._1, rightResult._1),
combineWithAndOp(leftResult._2, rightResult._2));
}
if (hasNonPartitionColumns(children, partitionColNames)) {
if (hasNonPartitionColumns(children, partitionColNames) ||
hasUnsupportedExpr(exprHandler, tableSchema, predicate)) {
return new Tuple2(ALWAYS_TRUE, predicate);
} else {
return new Tuple2<>(predicate, ALWAYS_TRUE);
Expand Down Expand Up @@ -211,19 +218,20 @@ private static boolean hasNonPartitionColumns(
return false;
}

private static Predicate combineWithAndOp(Predicate left, Predicate right) {
String leftName = left.getName().toUpperCase();
String rightName = right.getName().toUpperCase();
if (leftName.equals("ALWAYS_FALSE") || rightName.equals("ALWAYS_FALSE")) {
return ALWAYS_FALSE;
}
if (leftName.equals("ALWAYS_TRUE")) {
return right;
}
if (rightName.equals("ALWAYS_TRUE")) {
return left;
/**
* Utility method to check whether the given predicate has any expressions that are not
* supported by the given expression handler.
*/
private static boolean hasUnsupportedExpr(
ExpressionHandler exprHandler,
StructType tableSchema,
Predicate predicate) {
try {
exprHandler.getPredicateEvaluator(tableSchema, predicate);
return false;
} catch (UnsupportedOperationException ex) {
return true;
}
return new And(left, right);
}

private static Literal literalForPartitionValue(DataType dataType, String partitionValue) {
Expand Down
Loading

0 comments on commit 39c1502

Please sign in to comment.