Skip to content

Commit

Permalink
[SC-72790] A new approach to extract all metadata predicates in Delta
Browse files Browse the repository at this point in the history
Converting all the predicates into CNF may result in a very long predicate and the codegen become unnecessarily large.
We should follow the approach of apache/spark#29101, which extracts all convertiable predicates gracefully.

Author: Gengliang Wang <[email protected]>

GitOrigin-RevId: 9ecedbd83f85b8235262c3de0bd83b4540cbc560
  • Loading branch information
gengliangwang authored and mengtong-db committed Apr 6, 2021
1 parent e4c85e6 commit d77b0bf
Showing 1 changed file with 59 additions and 3 deletions.
62 changes: 59 additions & 3 deletions src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, NamedExpression, Or, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
Expand Down Expand Up @@ -214,8 +214,64 @@ object DeltaTableUtils extends PredicateHelper
condition: Expression,
partitionColumns: Seq[String],
spark: SparkSession): (Seq[Expression], Seq[Expression]) = {
splitConjunctivePredicates(condition).partition(
isPredicateMetadataOnly(_, partitionColumns, spark))
val (metadataPredicates, dataPredicates) =
splitConjunctivePredicates(condition).partition(
isPredicateMetadataOnly(_, partitionColumns, spark))
// Extra metadata predicates that can partially extracted from `dataPredicates`.
val extraMetadataPredicates =
if (dataPredicates.nonEmpty) {
extractMetadataPredicates(dataPredicates.reduce(And), partitionColumns, spark)
.map(splitConjunctivePredicates)
.getOrElse(Seq.empty)
} else {
Seq.empty
}
(metadataPredicates ++ extraMetadataPredicates, dataPredicates)
}

/**
* Returns a predicate that its reference is a subset of `partitionColumns` and it contains the
* maximum constraints from `condition`.
* When there is no such filter, `None` is returned.
*/
private def extractMetadataPredicates(
condition: Expression,
partitionColumns: Seq[String],
spark: SparkSession): Option[Expression] = {
condition match {
case And(left, right) =>
val lhs = extractMetadataPredicates(left, partitionColumns, spark)
val rhs = extractMetadataPredicates(right, partitionColumns, spark)
(lhs.toSeq ++ rhs.toSeq).reduceOption(And)

// The Or predicate is convertible when both of its children can be pushed down.
// That is to say, if one/both of the children can be partially pushed down, the Or
// predicate can be partially pushed down as well.
//
// Here is an example used to explain the reason.
// Let's say we have
// condition: (a1 AND a2) OR (b1 AND b2),
// outputSet: AttributeSet(a1, b1)
// a1 and b1 is convertible, while a2 and b2 is not.
// The predicate can be converted as
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
// As per the logical in And predicate, we can push down (a1 OR b1).
case Or(left, right) =>
for {
lhs <- extractMetadataPredicates(left, partitionColumns, spark)
rhs <- extractMetadataPredicates(right, partitionColumns, spark)
} yield Or(lhs, rhs)

// Here we assume all the `Not` operators is already below all the `And` and `Or` operators
// after the optimization rule `BooleanSimplification`, so that we don't need to handle the
// `Not` operators here.
case other =>
if (isPredicatePartitionColumnsOnly(other, partitionColumns, spark)) {
Some(other)
} else {
None
}
}
}

/**
Expand Down

0 comments on commit d77b0bf

Please sign in to comment.