Skip to content

Commit

Permalink
[SPARK-23405] Generate additional constraints for Join's children
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)
I run a sql: `select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number`, The `ls` table is a small table ,and the number is one. The `catalog_sales` table is a big table,  and the number is 10 billion. The task will be hang up. And i find the many null values of `cs_order_number` in the `catalog_sales` table. I think the null value should be removed in the logical plan.

>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   +- MetastoreRelation 100t, catalog_sales

Now, use this patch, the plan will be:
>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   : **+- Filter isnotnull(cs_order_number#22)**
>     :+- MetastoreRelation 100t, catalog_sales

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: KaiXinXiaoLei <[email protected]>
Author: hanghang <[email protected]>

Closes apache#20670 from KaiXinXiaoLei/Spark-23405.
  • Loading branch information
huleilei authored and cloud-fan committed Mar 1, 2018
1 parent ff14801 commit cdcccd7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
case join @ Join(left, right, joinType, conditionOpt) =>
// Only consider constraints that can be pushed down completely to either the left or the
// right child
val constraints = join.constraints.filter { c =>
val constraints = join.allConstraints.filter { c =>
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
}
// Remove those constraints that are already enforced by either the left or the right child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
trait QueryPlanConstraints { self: LogicalPlan =>

/**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
* An [[ExpressionSet]] that contains an additional set of constraints, such as equality
* constraints and `isNotNull` constraints, etc.
*/
lazy val constraints: ExpressionSet = {
lazy val allConstraints: ExpressionSet = {
if (conf.constraintPropagationEnabled) {
ExpressionSet(
validConstraints
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints))
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
}
)
ExpressionSet(validConstraints
.union(inferAdditionalConstraints(validConstraints))
.union(constructIsNotNullConstraints(validConstraints)))
} else {
ExpressionSet(Set.empty)
}
}

/**
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
* evaluate to `true` for all rows produced.
*/
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
})

/**
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
* based on the given operator's constraint propagation logic. These constraints are then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,16 @@ class InferFiltersFromConstraintsSuite extends PlanTest {

comparePlans(Optimize.execute(original.analyze), correct.analyze)
}

test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val condition = Some("x.a".attr === "y.a".attr)
val originalQuery = x.join(y, LeftSemi, condition).analyze
val left = x.where(IsNotNull('a))
val right = y.where(IsNotNull('a))
val correctAnswer = left.join(right, LeftSemi, condition).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
}

0 comments on commit cdcccd7

Please sign in to comment.