From 19fa9979d306b65ae5cfddc5bdf95d4d39b67aaa Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 16 Jul 2020 14:22:14 +0800 Subject: [PATCH] add files --- .../PushExtraPredicateThroughJoin.scala | 113 ++++++++++++++++++ .../expressions/ConvertibleFilterSuite.scala | 79 ++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConvertibleFilterSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala new file mode 100644 index 0000000000000..2f10a33e70ac8 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{And, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreeNodeTag + +/** + * Try converting join condition to conjunctive normal form expression so that more predicates may + * be able to be pushed down. + * To avoid expanding the join condition, the join condition will be kept in the original form even + * when predicate pushdown happens. + */ +object PushExtraPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { + + private val processedJoinConditionTag = TreeNodeTag[Expression]("processedJoinCondition") + + private def canPushThrough(joinType: JoinType): Boolean = joinType match { + case _: InnerLike | LeftSemi | RightOuter | LeftOuter | LeftAnti | ExistenceJoin(_) => true + case _ => false + } + + /** + * Splits join condition expressions or filter predicates (on a given join's output) into three + * categories based on the attributes required to evaluate them. Note that we explicitly exclude + * non-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or + * canEvaluateInRight to prevent pushing these predicates on either side of the join. + * + * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth) + */ + protected def extractConvertibleFilters( + condition: Seq[Expression], + left: LogicalPlan, + right: LogicalPlan): (Seq[Expression], Seq[Expression], Seq[Expression]) = { + val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic) + val (leftEvaluateCondition, rest) = + pushDownCandidates.partition(_.references.subsetOf(left.outputSet)) + val (rightEvaluateCondition, commonCondition) = + rest.partition(expr => expr.references.subsetOf(right.outputSet)) + + // For the predicates in `commonCondition`, it is still possible to find sub-predicates which + // are able to be pushed down. + val leftExtraCondition = + commonCondition.flatMap(convertibleFilter(_, left.outputSet)) + + val rightExtraCondition = + commonCondition.flatMap(convertibleFilter(_, right.outputSet)) + + // To avoid expanding the join condition into conjunctive normal form and making the size + // of codegen much larger, `commonCondition` will be kept as original form in the new join + // condition. + (leftEvaluateCondition ++ leftExtraCondition, rightEvaluateCondition ++ rightExtraCondition, + commonCondition ++ nonDeterministic) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case j @ Join(left, right, joinType, Some(joinCondition), hint) + if canPushThrough(joinType) => + val filtersOfBothSide = splitConjunctivePredicates(joinCondition).filter { f => + f.deterministic && f.references.nonEmpty && + !f.references.subsetOf(left.outputSet) && !f.references.subsetOf(right.outputSet) + } + val leftExtraCondition = + filtersOfBothSide.flatMap(convertibleFilter(_, left.outputSet)) + + val rightExtraCondition = + filtersOfBothSide.flatMap(convertibleFilter(_, right.outputSet)) + + val alreadyProcessed = j.getTagValue(processedJoinConditionTag).exists { condition => + condition.semanticEquals(joinCondition) + } + + if ((leftExtraCondition.isEmpty && rightExtraCondition.isEmpty) || alreadyProcessed) { + j + } else { + lazy val newLeft = + leftExtraCondition.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + lazy val newRight = + rightExtraCondition.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + + val newJoin = joinType match { + case _: InnerLike | LeftSemi => + Join(newLeft, newRight, joinType, Some(joinCondition), hint) + case RightOuter => + Join(newLeft, right, RightOuter, Some(joinCondition), hint) + case LeftOuter | LeftAnti | ExistenceJoin(_) => + Join(left, newRight, joinType, Some(joinCondition), hint) + case other => + throw new IllegalStateException(s"Unexpected join type: $other") + } + newJoin.setTagValue(processedJoinConditionTag, joinCondition) + newJoin + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConvertibleFilterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConvertibleFilterSuite.scala new file mode 100644 index 0000000000000..db94573567f64 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConvertibleFilterSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.types.BooleanType + +class ConvertibleFilterSuite extends SparkFunSuite with PredicateHelper with PlanTest { + private val a = AttributeReference("A", BooleanType)(exprId = ExprId(1)) + private val b = AttributeReference("B", BooleanType)(exprId = ExprId(2)) + private val c = AttributeReference("C", BooleanType)(exprId = ExprId(3)) + private val d = AttributeReference("D", BooleanType)(exprId = ExprId(4)) + private val e = AttributeReference("E", BooleanType)(exprId = ExprId(5)) + private val f = AttributeReference("F", BooleanType)(exprId = ExprId(6)) + private val g = AttributeReference("G", BooleanType)(exprId = ExprId(7)) + private val h = AttributeReference("H", BooleanType)(exprId = ExprId(8)) + private val i = AttributeReference("I", BooleanType)(exprId = ExprId(9)) + + private def checkCondition( + input: Expression, + convertibleAttributes: Seq[Attribute], + expected: Option[Expression]): Unit = { + val result = convertibleFilter(input, AttributeSet(convertibleAttributes)) + if (expected.isEmpty) { + assert(result.isEmpty) + } else { + assert(result.isDefined && result.get.semanticEquals(expected.get)) + } + } + + test("Convertible conjunctive predicates") { + checkCondition(a && b, Seq(a, b), Some(a && b)) + checkCondition(a && b, Seq(a), Some(a)) + checkCondition(a && b, Seq(b), Some(b)) + checkCondition(a && b && c, Seq(a, c), Some(a && c)) + checkCondition(a && b && c && d, Seq(b, c), Some(b && c)) + } + + test("Convertible disjunctive predicates") { + checkCondition(a || b, Seq(a, b), Some(a || b)) + checkCondition(a || b, Seq(a), None) + checkCondition(a || b, Seq(b), None) + checkCondition(a || b || c, Seq(a, c), None) + checkCondition(a || b || c || d, Seq(a, b, d), None) + checkCondition(a || b || c || d, Seq(d, c, b, a), Some(a || b || c || d)) + } + + test("Convertible complex predicates") { + checkCondition((a && b) || (c && d), Seq(a, c), Some(a || c)) + checkCondition((a && b) || (c && d), Seq(a, b), None) + checkCondition((a && b) || (c && d), Seq(a, c, d), Some(a || (c && d))) + checkCondition((a && b && c) || (d && e && f), Seq(a, c, d, f), Some((a && c) || (d && f))) + checkCondition((a && b) || (c && d) || (e && f) || (g && h), Seq(a, c, e, g), + Some(a || c || e || g)) + checkCondition((a && b) || (c && d) || (e && f) || (g && h), Seq(a, e, g), None) + checkCondition((a || b) || (c && d) || (e && f) || (g && h), Seq(a, c, e, g), None) + checkCondition((a || b) || (c && d) || (e && f) || (g && h), Seq(a, b, c, e, g), + Some(a || b || c || e || g)) + checkCondition((a && b && c) || (d && e && f) || (g && h && i), Seq(b, e, h), Some(b || e || h)) + checkCondition((a && b && c) || (d && e && f) || (g && h && i), Seq(b, e, d), None) + } +}