Skip to content

Commit

Permalink
[SPARK-20094][SQL] Preventing push down of IN subquery to Join operator
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

TPCDS q45 fails becuase:
`ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error.

We should prevent push down of IN subquery to Join operator.

## How was this patch tested?

Add a new test case in `FilterPushdownSuite`.

Author: wangzhenhua <[email protected]>

Closes #17428 from wzhfy/noSubqueryInJoinCond.
  • Loading branch information
wzhfy authored and hvanhovell committed Mar 28, 2017
1 parent a9abff2 commit 91559d2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ trait PredicateHelper {
* Returns true iff `expr` could be evaluated as a condition within join.
*/
protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
case l: ListQuery =>
// A ListQuery defines the query which we want to search in an IN subquery expression.
// Currently the only way to evaluate an IN subquery is to convert it to a
// LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
// It cannot be evaluated as part of a Join operator.
false
case e: SubqueryExpression =>
// non-correlated subquery will be replaced as literal
e.children.isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,26 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, answer)
}

test("SPARK-20094: don't push predicate with IN subquery into join condition") {
val x = testRelation.subquery('x)
val z = testRelation.subquery('z)
val w = testRelation1.subquery('w)

val queryPlan = x
.join(z)
.where(("x.b".attr === "z.b".attr) &&
("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))))
.analyze

val expectedPlan = x
.join(z, Inner, Some("x.b".attr === "z.b".attr))
.where("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr))))
.analyze

val optimized = Optimize.execute(queryPlan)
comparePlans(optimized, expectedPlan)
}

test("Window: predicate push down -- basic") {
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

Expand Down

0 comments on commit 91559d2

Please sign in to comment.