Skip to content

Commit

Permalink
add files
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jul 16, 2020
1 parent 16c78d6 commit 19fa997
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions.{And, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag

/**
* Try converting join condition to conjunctive normal form expression so that more predicates may
* be able to be pushed down.
* To avoid expanding the join condition, the join condition will be kept in the original form even
* when predicate pushdown happens.
*/
object PushExtraPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {

private val processedJoinConditionTag = TreeNodeTag[Expression]("processedJoinCondition")

private def canPushThrough(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | LeftSemi | RightOuter | LeftOuter | LeftAnti | ExistenceJoin(_) => true
case _ => false
}

/**
* Splits join condition expressions or filter predicates (on a given join's output) into three
* categories based on the attributes required to evaluate them. Note that we explicitly exclude
* non-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
* canEvaluateInRight to prevent pushing these predicates on either side of the join.
*
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
protected def extractConvertibleFilters(
condition: Seq[Expression],
left: LogicalPlan,
right: LogicalPlan): (Seq[Expression], Seq[Expression], Seq[Expression]) = {
val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
val (leftEvaluateCondition, rest) =
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
rest.partition(expr => expr.references.subsetOf(right.outputSet))

// For the predicates in `commonCondition`, it is still possible to find sub-predicates which
// are able to be pushed down.
val leftExtraCondition =
commonCondition.flatMap(convertibleFilter(_, left.outputSet))

val rightExtraCondition =
commonCondition.flatMap(convertibleFilter(_, right.outputSet))

// To avoid expanding the join condition into conjunctive normal form and making the size
// of codegen much larger, `commonCondition` will be kept as original form in the new join
// condition.
(leftEvaluateCondition ++ leftExtraCondition, rightEvaluateCondition ++ rightExtraCondition,
commonCondition ++ nonDeterministic)
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ Join(left, right, joinType, Some(joinCondition), hint)
if canPushThrough(joinType) =>
val filtersOfBothSide = splitConjunctivePredicates(joinCondition).filter { f =>
f.deterministic && f.references.nonEmpty &&
!f.references.subsetOf(left.outputSet) && !f.references.subsetOf(right.outputSet)
}
val leftExtraCondition =
filtersOfBothSide.flatMap(convertibleFilter(_, left.outputSet))

val rightExtraCondition =
filtersOfBothSide.flatMap(convertibleFilter(_, right.outputSet))

val alreadyProcessed = j.getTagValue(processedJoinConditionTag).exists { condition =>
condition.semanticEquals(joinCondition)
}

if ((leftExtraCondition.isEmpty && rightExtraCondition.isEmpty) || alreadyProcessed) {
j
} else {
lazy val newLeft =
leftExtraCondition.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
lazy val newRight =
rightExtraCondition.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)

val newJoin = joinType match {
case _: InnerLike | LeftSemi =>
Join(newLeft, newRight, joinType, Some(joinCondition), hint)
case RightOuter =>
Join(newLeft, right, RightOuter, Some(joinCondition), hint)
case LeftOuter | LeftAnti | ExistenceJoin(_) =>
Join(left, newRight, joinType, Some(joinCondition), hint)
case other =>
throw new IllegalStateException(s"Unexpected join type: $other")
}
newJoin.setTagValue(processedJoinConditionTag, joinCondition)
newJoin
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.types.BooleanType

class ConvertibleFilterSuite extends SparkFunSuite with PredicateHelper with PlanTest {
private val a = AttributeReference("A", BooleanType)(exprId = ExprId(1))
private val b = AttributeReference("B", BooleanType)(exprId = ExprId(2))
private val c = AttributeReference("C", BooleanType)(exprId = ExprId(3))
private val d = AttributeReference("D", BooleanType)(exprId = ExprId(4))
private val e = AttributeReference("E", BooleanType)(exprId = ExprId(5))
private val f = AttributeReference("F", BooleanType)(exprId = ExprId(6))
private val g = AttributeReference("G", BooleanType)(exprId = ExprId(7))
private val h = AttributeReference("H", BooleanType)(exprId = ExprId(8))
private val i = AttributeReference("I", BooleanType)(exprId = ExprId(9))

private def checkCondition(
input: Expression,
convertibleAttributes: Seq[Attribute],
expected: Option[Expression]): Unit = {
val result = convertibleFilter(input, AttributeSet(convertibleAttributes))
if (expected.isEmpty) {
assert(result.isEmpty)
} else {
assert(result.isDefined && result.get.semanticEquals(expected.get))
}
}

test("Convertible conjunctive predicates") {
checkCondition(a && b, Seq(a, b), Some(a && b))
checkCondition(a && b, Seq(a), Some(a))
checkCondition(a && b, Seq(b), Some(b))
checkCondition(a && b && c, Seq(a, c), Some(a && c))
checkCondition(a && b && c && d, Seq(b, c), Some(b && c))
}

test("Convertible disjunctive predicates") {
checkCondition(a || b, Seq(a, b), Some(a || b))
checkCondition(a || b, Seq(a), None)
checkCondition(a || b, Seq(b), None)
checkCondition(a || b || c, Seq(a, c), None)
checkCondition(a || b || c || d, Seq(a, b, d), None)
checkCondition(a || b || c || d, Seq(d, c, b, a), Some(a || b || c || d))
}

test("Convertible complex predicates") {
checkCondition((a && b) || (c && d), Seq(a, c), Some(a || c))
checkCondition((a && b) || (c && d), Seq(a, b), None)
checkCondition((a && b) || (c && d), Seq(a, c, d), Some(a || (c && d)))
checkCondition((a && b && c) || (d && e && f), Seq(a, c, d, f), Some((a && c) || (d && f)))
checkCondition((a && b) || (c && d) || (e && f) || (g && h), Seq(a, c, e, g),
Some(a || c || e || g))
checkCondition((a && b) || (c && d) || (e && f) || (g && h), Seq(a, e, g), None)
checkCondition((a || b) || (c && d) || (e && f) || (g && h), Seq(a, c, e, g), None)
checkCondition((a || b) || (c && d) || (e && f) || (g && h), Seq(a, b, c, e, g),
Some(a || b || c || e || g))
checkCondition((a && b && c) || (d && e && f) || (g && h && i), Seq(b, e, h), Some(b || e || h))
checkCondition((a && b && c) || (d && e && f) || (g && h && i), Seq(b, e, d), None)
}
}

0 comments on commit 19fa997

Please sign in to comment.