Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Mar 13, 2024
1 parent c046547 commit 50da34d
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 145 deletions.
3 changes: 2 additions & 1 deletion kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ public interface Scan {
* Get the remaining filter that is not guaranteed to be satisfied for the data Delta Kernel
* returns. This filter is used by Delta Kernel to do data skipping when possible.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @return the remaining filter as a {@link Predicate}.
*/
Optional<Predicate> getRemainingFilter();
Optional<Predicate> getRemainingFilter(TableClient tableClient);

/**
* Get the scan state associated with the current scan. This state is common across all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,22 @@ public class ScanImpl implements Scan {
private final Metadata metadata;
private final LogReplay logReplay;
private final Path dataPath;
private final Optional<Tuple2<Predicate, Predicate>> partitionAndDataFilters;

private final Optional<Predicate> filter;

private boolean accessedScanFiles;

private boolean areFiltersSplit;
// Subset of partition predicate the expression handler can support evaluating
private Optional<Predicate> metadataPredicate;
// Subset of the given query predicate the Kernel tries to use prune scan file list as best as
// it can, but can't guarantee that the all the scan files returned contains data on which this
// predicate evaluates to true. The connector needs to apply this filter on the data from the
// returned scan files to completely remove the data that doesn't satisfy given query predicate.
//
// The predicate could be on the data columns and/or unsupported predicate on partition columns
private Optional<Predicate> remainingPredicate;

public ScanImpl(
StructType snapshotSchema,
StructType readSchema,
Expand All @@ -71,7 +84,7 @@ public ScanImpl(
this.protocol = protocol;
this.metadata = metadata;
this.logReplay = logReplay;
this.partitionAndDataFilters = splitFilters(filter);
this.filter = filter;
this.dataPath = dataPath;
}

Expand All @@ -88,7 +101,7 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(TableClient tableCl
accessedScanFiles = true;

// Generate data skipping filter and decide if we should read the stats column
Optional<DataSkippingPredicate> dataSkippingFilter = getDataSkippingFilter();
Optional<DataSkippingPredicate> dataSkippingFilter = getDataSkippingFilter(tableClient);
boolean shouldReadStats = dataSkippingFilter.isPresent();

// Get active AddFiles via log replay
Expand Down Expand Up @@ -142,22 +155,38 @@ public Row getScanState(TableClient tableClient) {
}

@Override
public Optional<Predicate> getRemainingFilter() {
return getDataFilters();
public Optional<Predicate> getRemainingFilter(TableClient tableClient) {
splitFilters(tableClient);
return remainingPredicate;
}

private Optional<Tuple2<Predicate, Predicate>> splitFilters(Optional<Predicate> filter) {
return filter.map(predicate ->
PartitionUtils.splitMetadataAndDataPredicates(
predicate, metadata.getPartitionColNames()));
private void splitFilters(TableClient tableClient) {
if (areFiltersSplit) {
return;
}
filter.map(predicate -> {
Tuple2<Predicate, Predicate> metadataAndNonMetadataFilters =
PartitionUtils.splitPredicates(
tableClient.getExpressionHandler(),
metadata.getSchema(),
predicate,
metadata.getPartitionColNames());

metadataPredicate = removeAlwaysTrue(Optional.of(metadataAndNonMetadataFilters._1));
remainingPredicate = removeAlwaysTrue(Optional.of(metadataAndNonMetadataFilters._2));
return null;
});
areFiltersSplit = true;
}

private Optional<Predicate> getDataFilters() {
return removeAlwaysTrue(partitionAndDataFilters.map(filters -> filters._2));
private Optional<Predicate> getDataFilters(TableClient tableClient) {
splitFilters(tableClient);
return remainingPredicate;
}

private Optional<Predicate> getPartitionsFilters() {
return removeAlwaysTrue(partitionAndDataFilters.map(filters -> filters._1));
private Optional<Predicate> getPartitionsFilters(TableClient tableClient) {
splitFilters(tableClient);
return metadataPredicate;
}

/**
Expand All @@ -171,7 +200,7 @@ private Optional<Predicate> removeAlwaysTrue(Optional<Predicate> predicate) {
private CloseableIterator<FilteredColumnarBatch> applyPartitionPruning(
TableClient tableClient,
CloseableIterator<FilteredColumnarBatch> scanFileIter) {
Optional<Predicate> partitionPredicate = getPartitionsFilters();
Optional<Predicate> partitionPredicate = getPartitionsFilters(tableClient);
if (!partitionPredicate.isPresent()) {
// There is no partition filter, return the scan file iterator as is.
return scanFileIter;
Expand Down Expand Up @@ -221,9 +250,9 @@ public void close() throws IOException {
};
}

private Optional<DataSkippingPredicate> getDataSkippingFilter() {
return getDataFilters().flatMap(dataFilters ->
DataSkippingUtils.constructDataSkippingFilter(dataFilters, metadata.getDataSchema())
private Optional<DataSkippingPredicate> getDataSkippingFilter(TableClient tableClient) {
return getDataFilters(tableClient).flatMap(dataFilters ->
DataSkippingUtils.constructDataSkippingFilter(dataFilters, metadata.getSchema())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import java.util.List;
import static java.lang.String.format;

import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.expressions.*;
import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

public class ExpressionUtils {
Expand Down Expand Up @@ -64,4 +65,22 @@ public static Expression getUnaryChild(Expression expression) {
format("%s: expected one inputs, but got %s", expression, children.size()));
return children.get(0);
}

/*
* Utility method to combine the given predicates with AND
*/
public static Predicate combineWithAndOp(Predicate left, Predicate right) {
String leftName = left.getName().toUpperCase();
String rightName = right.getName().toUpperCase();
if (leftName.equals("ALWAYS_FALSE") || rightName.equals("ALWAYS_FALSE")) {
return ALWAYS_FALSE;
}
if (leftName.equals("ALWAYS_TRUE")) {
return right;
}
if (rightName.equals("ALWAYS_TRUE")) {
return left;
}
return new And(left, right);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.expressions.*;
import io.delta.kernel.types.*;
import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE;
import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;

import io.delta.kernel.internal.InternalScanFileUtils;
import static io.delta.kernel.internal.util.ExpressionUtils.*;

public class PartitionUtils {
private PartitionUtils() {}
Expand Down Expand Up @@ -103,30 +103,37 @@ public static ColumnarBatch withPartitionColumns(
}

/**
* Split the given predicate into predicate on partition columns and predicate on data columns.
* Split the given predicate into predicate that a part that can be guaranteed to be satisfied
* by kernel when returning the scan file and the best effort predicate that Kernel uses for
* skipping but doesn't guarantee the returned scan files has data that doesn't satisfy the
* predicate.
*
* @param exprHandler
* @param predicate
* @param partitionColNames
* @return Tuple of partition column predicate and data column predicate.
* @return Tuple of guaranteed predicate and best effort predicate.
*/
public static Tuple2<Predicate, Predicate> splitMetadataAndDataPredicates(
Predicate predicate,
Set<String> partitionColNames) {
public static Tuple2<Predicate, Predicate> splitPredicates(
ExpressionHandler exprHandler,
StructType tableSchema,
Predicate predicate,
Set<String> partitionColNames) {
String predicateName = predicate.getName();
List<Expression> children = predicate.getChildren();
if ("AND".equalsIgnoreCase(predicateName)) {
Predicate left = (Predicate) children.get(0);
Predicate right = (Predicate) children.get(1);
Predicate left = asPredicate(getLeft(predicate));
Predicate right = asPredicate(getRight(predicate));
Tuple2<Predicate, Predicate> leftResult =
splitMetadataAndDataPredicates(left, partitionColNames);
splitPredicates(exprHandler, tableSchema, left, partitionColNames);
Tuple2<Predicate, Predicate> rightResult =
splitMetadataAndDataPredicates(right, partitionColNames);
splitPredicates(exprHandler, tableSchema, right, partitionColNames);

return new Tuple2<>(
combineWithAndOp(leftResult._1, rightResult._1),
combineWithAndOp(leftResult._2, rightResult._2));
}
if (hasNonPartitionColumns(children, partitionColNames)) {
if (hasNonPartitionColumns(children, partitionColNames) ||
!exprHandler.isSupported(tableSchema, predicate, BooleanType.BOOLEAN)) {
return new Tuple2(ALWAYS_TRUE, predicate);
} else {
return new Tuple2<>(predicate, ALWAYS_TRUE);
Expand Down Expand Up @@ -215,21 +222,6 @@ private static boolean hasNonPartitionColumns(
return false;
}

private static Predicate combineWithAndOp(Predicate left, Predicate right) {
String leftName = left.getName().toUpperCase();
String rightName = right.getName().toUpperCase();
if (leftName.equals("ALWAYS_FALSE") || rightName.equals("ALWAYS_FALSE")) {
return ALWAYS_FALSE;
}
if (leftName.equals("ALWAYS_TRUE")) {
return right;
}
if (rightName.equals("ALWAYS_TRUE")) {
return left;
}
return new And(left, right);
}

private static Literal literalForPartitionValue(DataType dataType, String partitionValue) {
if (partitionValue == null) {
return Literal.ofNull(dataType);
Expand Down
Loading

0 comments on commit 50da34d

Please sign in to comment.