Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20175][SQL] Exists should not be evaluated in Join operator #17491

Closed
wants to merge 4 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 31, 2017

What changes were proposed in this pull request?

Similar to ListQuery, Exists should not be evaluated in Join operator too.

How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

@viirya
Copy link
Member Author

viirya commented Mar 31, 2017

cc @wzhfy @hvanhovell @nsyca

@viirya viirya changed the title [SQL][WIP] Exists should not be evaluated in Join operator [SQL][WIP] Exists should not be evaluated in Join operator and can be converted to ScalarSubquery if no correlated reference Mar 31, 2017
val (withSubquery, withoutSubquery) =
splitConjunctivePredicates(condition).partition(SubqueryExpression.hasInOrExistsSubquery)
val newWithSubquery = withSubquery.map(_.transform {
case e @ Exists(sub, conditions, exprId) if conditions.isEmpty && !containsAgg(sub) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently take a conservative way, if there is already Aggregate, it skips the conversion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this conversion is, I assume a count aggregation is cheaper than an existence join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya what we should do is to mark the outer join as an "early out" in the run-time join processing. An aggregation is not cheap as it needs to read the entire table to give the (first) answer. An "early out" logic is just like what we currently have in the LeftSemi join where only the first match of the join value is returned (and the subsequent matches are discarded). A LeftSemi join is a special case of an "early out" inner join where the columns of the right table are not permitted in the output of the join.

IMO, a step forward is to implement the "early out" mechanism in all the run-time join operators, nested-loop, sorted-merge, and hash; and inner, left outer, right outer. Then LeftSemi and LeftAnti will just be special cases of one of those operators.

Copy link
Member Author

@viirya viirya Mar 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a simple count aggregation, I think it is cheap because it should prune columns and no data from the table will be shuffled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya @nsyca I am still looking at the code. If we do go the route to turn this into a scalar subquery, would a "limit" help prevent a scan of the inner table (assuming its very large). So something like "select Count(1) from (original subquery + limit 1)" ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple test will tell. If you try to do a count on a 1-billion row table versus SELECT 1 FROM <tbl> LIMIT 1. Which one is better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is a good idea. I think it definitely will help with an additional Limit operator.

@SparkQA
Copy link

SparkQA commented Mar 31, 2017

Test build #75419 has finished for PR 17491 at commit b012550.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Mar 31, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 31, 2017

Test build #75423 has started for PR 17491 at commit b012550.

@viirya
Copy link
Member Author

viirya commented Mar 31, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 31, 2017

Test build #75424 has finished for PR 17491 at commit b012550.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SQL][WIP] Exists should not be evaluated in Join operator and can be converted to ScalarSubquery if no correlated reference [SPARK-20175][SQL] Exists should not be evaluated in Join operator and can be converted to ScalarSubquery if no correlated reference Mar 31, 2017
val (withSubquery, withoutSubquery) =
splitConjunctivePredicates(condition).partition(SubqueryExpression.hasInOrExistsSubquery)
val newWithSubquery = withSubquery.map(_.transform {
case e @ Exists(sub, conditions, exprId) if conditions.isEmpty && !containsAgg(sub) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya what we should do is to mark the outer join as an "early out" in the run-time join processing. An aggregation is not cheap as it needs to read the entire table to give the (first) answer. An "early out" logic is just like what we currently have in the LeftSemi join where only the first match of the join value is returned (and the subsequent matches are discarded). A LeftSemi join is a special case of an "early out" inner join where the columns of the right table are not permitted in the output of the join.

IMO, a step forward is to implement the "early out" mechanism in all the run-time join operators, nested-loop, sorted-merge, and hash; and inner, left outer, right outer. Then LeftSemi and LeftAnti will just be special cases of one of those operators.

// A ListQuery defines the query which we want to search in an IN subquery expression.
// Currently the only way to evaluate an IN subquery is to convert it to a
// LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
// It cannot be evaluated as part of a Join operator.
// An Exists shouldn't be push into a Join operator too.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal Here is another case of a regression from #16954. Would you think we should just say the following?

case SubqueryExpression => false

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ScalarSubquery without correlated references can be pushed. Doesn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. The name of this is def canEvaluateWithinJoin so I assume it asks whether an input Expression can be processed as part of a Join operator. Can a ScalarSubquery be processed inside a Join?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsyca @viirya I just verified and the exists test fails the same way in 2.1. So its not a regression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsyca Looking at this further, there is a SubqueryExec operator that can execute a ScalarSubquery and InSubquery (PlanSubqueries). As part of my change, i had removed the case for PredicateSubquery as we removed PredicateSubquery all together. I just quickly tried the following and got the query to work. I haven't verified the semantics but just tried something quickly. Basically if we were to keep the Exists expression as it is and push it down as a join condition and execute it as a InSubquery (possibly with a additional limit clause) there seems to be an infrastructure for it already. Or perhaps we may want to introduce a ExistSubquery exec operator that can work more efficiently.

  case subquery: expressions.Exists =>
        val executedPlan = new QueryExecution(sparkSession, subquery.plan).executedPlan
        InSubquery(Literal.TrueLiteral,
          SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan), subquery.exprId)

What do you think Natt ?

Copy link
Contributor

@nsyca nsyca Mar 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this code does is around the idea of treating an uncorrelated subquery as a black box. The subquery is processed as a self-contained operation and a list of values is returned. After that, the code evaluates as if this is an IN list predicate like IN (). In your code above, is represented as a "true" literal. That means the returned values from the subquery must be in Boolean type too.

Putting a LIMIT does help to short-circuit the processing to the first row. I still think putting a LIMIT explicitly as an extra LogicalPlan operator may have some negative side effect in the way that it prevents other Optimizer rules to further optimize the query. I have not thought about a concrete example to back my belief though.

I feel this optimization could be done better in the run-time area, rather than trying to shoehorn it in the Optimizer phase. What we can do is 1) propagate the notion of "early out" deeper to the operator on the RHS of the outer join. If it's a scan, stop scanning on the first row. 2) one more step further: cache the result of the RHS without a rescan because the next row from the parent table will always get the same answer from rescanning the subquery.

Copy link
Member Author

@viirya viirya Apr 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. The name of this is def canEvaluateWithinJoin so I assume it asks whether an input Expression can be processed as part of a Join operator. Can a ScalarSubquery be processed inside a Join?

I remember ScalarSubquery without correlated reference will be evaluated as individual query plan and get its result back as an expression. So it should be no difference in run time compared with other expressions.

A Limit looks good to me for now. I can't think a negative side effect prevents possible optimization for the subquery plan. Doesn't it just like a re-written query with a limit clause added?

I think this is a corner usage case. To address this in run-time like the introduction of "early out" into physical join operators works, but it may involve a lot of code changes.

  1. one more step further: cache the result of the RHS without a rescan because the next row from the parent table will always get the same answer from rescanning the subquery.

I quickly scan physical SortMergeJoin operator. If the streamed row matches the scanned group of rows, it will reuse the scanned group. Sounds it does what you said, if I don't miss something.

I think current join operators are smart enough that they won't re-scan the subquery if the next row still matches the scaned group of rows.

@SparkQA
Copy link

SparkQA commented Apr 1, 2017

Test build #75439 has finished for PR 17491 at commit 88016bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 10, 2017

cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Apr 11, 2017

Test build #75683 has finished for PR 17491 at commit 329f067.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 11, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 11, 2017

Test build #75687 has finished for PR 17491 at commit 329f067.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val expr = Alias(GreaterThan(countExpr.toAttribute, Literal(0)), e.toString)()
ScalarSubquery(
Project(Seq(expr),
Aggregate(Nil, Seq(countExpr), LocalLimit(Literal(1), sub))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How useful is this optimization? It only works when Exists has no condition, is that a common case?

I do agree with @nsyca that we should implement "early-out" mechanism, which is more general

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a special case. Then I will remove this optimization and minimize this pr's change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can address the early-out in other work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I am not very sure this early-out can benefit the general usage, except for this kind of special case.

@viirya viirya changed the title [SPARK-20175][SQL] Exists should not be evaluated in Join operator and can be converted to ScalarSubquery if no correlated reference [SPARK-20175][SQL] Exists should not be evaluated in Join operator Apr 11, 2017
@viirya
Copy link
Member Author

viirya commented Apr 11, 2017

@cloud-fan The optimization rule is removed now. This patch now is just making Exists subquery without correlated references work. Please take a look again. Thanks.

@SparkQA
Copy link

SparkQA commented Apr 11, 2017

Test build #75703 has finished for PR 17491 at commit 24ae5ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in cd91f96 Apr 11, 2017
@nsyca
Copy link
Contributor

nsyca commented Apr 11, 2017

@cloud-fan wrote: "How useful is this optimization? It only works when Exists has no condition, is that a common case?"

One of the common cases of this usage is an application of ACL where the application asks the database whether the user has a proper authority to access a certain set of data or not.

Ex:

select ... from controlled_table where exists (select 1 from acl_table where user = CURRENT_USER and role = ...)

From a runtime perspective, an optimal access plan is placing the ACL_TABLE as an outer of a nested-loop join with a semantic to fetch only the first qualified row, once the row exists, continue to process the inner table, CONTROLLED_TABLE, or avoiding access the inner completely if no qualified row from the outer.

@viirya
Copy link
Member Author

viirya commented Apr 11, 2017

I think the current approach will have a LeftSemi join for this Exists subquery. Is it still far from the optimal access plan you said?

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
## What changes were proposed in this pull request?

Similar to `ListQuery`, `Exists` should not be evaluated in `Join` operator too.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#17491 from viirya/dont-push-exists-to-join.
@viirya viirya deleted the dont-push-exists-to-join branch December 27, 2023 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants