Skip to content

Commit

Permalink
address more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jul 16, 2020
1 parent 543bd69 commit 452f3c9
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ trait PredicateHelper extends Logging {
}

/*
* Returns a filter that it's output is a subset of `outputSet` and it contains all possible
* Returns a filter that its reference is a subset of `outputSet` and it contains the maximum
* constraints from `condition`. This is used for predicate pushdown.
* When there is no such convertible filter, `None` is returned.
* When there is no such filter, `None` is returned.
*/
protected def extractPredicatesWithinOutputSet(
condition: Expression,
outputSet: AttributeSet): Option[Expression] = condition match {
condition: Expression,
outputSet: AttributeSet): Option[Expression] = condition match {
case And(left, right) =>
val leftResultOptional = extractPredicatesWithinOutputSet(left, outputSet)
val rightResultOptional = extractPredicatesWithinOutputSet(right, outputSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag

/**
* Try pushing down convertible disjunctive join condition into left and right child.
* Try pushing down disjunctive join condition into left and right child.
* To avoid expanding the join condition, the join condition will be kept in the original form even
* when predicate pushdown happens.
*/
Expand All @@ -37,39 +37,6 @@ object PushExtraPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHel
case _ => false
}

/**
* Splits join condition expressions (on a given join's output) into three
* categories based on the attributes required to evaluate them. Note that we explicitly exclude
* non-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
* canEvaluateInRight to prevent pushing these predicates on either side of the join.
*
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
protected def extractConvertibleFilters(
condition: Seq[Expression],
left: LogicalPlan,
right: LogicalPlan): (Seq[Expression], Seq[Expression], Seq[Expression]) = {
val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
val (leftEvaluateCondition, rest) =
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
rest.partition(expr => expr.references.subsetOf(right.outputSet))

// For the predicates in `commonCondition`, it is still possible to find sub-predicates which
// are able to be pushed down.
val leftExtraCondition =
commonCondition.flatMap(extractPredicatesWithinOutputSet(_, left.outputSet))

val rightExtraCondition =
commonCondition.flatMap(extractPredicatesWithinOutputSet(_, right.outputSet))

// To avoid expanding the join condition into conjunctive normal form and making the size
// of codegen much larger, `commonCondition` will be kept as original form in the new join
// condition.
(leftEvaluateCondition ++ leftExtraCondition, rightEvaluateCondition ++ rightExtraCondition,
commonCondition ++ nonDeterministic)
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ Join(left, right, joinType, Some(joinCondition), hint)
if canPushThrough(joinType) =>
Expand Down

0 comments on commit 452f3c9

Please sign in to comment.