-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31705][SQL] Push more possible predicates through Join via CNF conversion #28733
Conversation
As I talked to @wangyum offline, I am taking #28575 over for the CNF implementation and config naming. There have been PRs for CNF conversion, such as #10444, #15558, #28575. The common issue is the recursive implementation can slow, or even cause a stack overflow exception. With this non-recursive implementation, the rule should be faster and more robust. |
@@ -1230,4 +1237,134 @@ class FilterPushdownSuite extends PlanTest { | |||
|
|||
comparePlans(Optimize.execute(query.analyze), expected) | |||
} | |||
|
|||
test("inner join: rewrite filter predicates to conjunctive normal form") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test cases are copied from #28575
(testRelation.subquery('x), testRelation.subquery('y)) | ||
} else { | ||
(testRelation.subquery('x), | ||
testRelation.where(('c <= 5 || 'c < 1) && ('c <=5 || 'a > 2)).subquery('y)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum To make it simple, this PR didn't convert the pushed down predicate to a shorter form.
We can have a follow-up PR if you like that feature.
Test build #123556 has finished for PR 28733 at commit
|
node match { | ||
case Not(a And b) => stack.push(Or(Not(a), Not(b))) | ||
case Not(a Or b) => stack.push(And(Not(a), Not(b))) | ||
case Not(Not(a)) => stack.push(a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle these NOT
cases? It seems that the NOT
operator is removed by BooleanSimplification
:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
Lines 391 to 394 in 240840f
case Not(a Or b) => And(Not(a), Not(b)) | |
case Not(a And b) => Or(Not(a), Not(b)) | |
case Not(Not(e)) => e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping here is OK since it is also very straightforward
Test build #123577 has finished for PR 28733 at commit
|
Test build #123578 has finished for PR 28733 at commit
|
test failure from TPCDSQuerySuite:
I will update the PR to simplify the pushed down predicates |
* If the conversion repeatedly expands nondeterministic expressions, return Seq.empty. | ||
* Otherwise, return the converted result as sequence of disjunctive expressions. | ||
*/ | ||
protected def conjunctiveNormalForm(condition: Expression): Seq[Expression] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests for this method? We should have particular tests to verify the CNF conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thanks, I will add more test cases
Test build #123585 has finished for PR 28733 at commit
|
*/ | ||
protected def conjunctiveNormalForm(condition: Expression): Seq[Expression] = { | ||
val postOrderNodes = postOrderTraversal(condition) | ||
val resultStack = new scala.collection.mutable.Stack[Seq[Expression]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala.collection.mutable.Stack[Seq[Expression]]
-> mutable.Stack[Seq[Expression]]
?
resultStack.top | ||
} | ||
|
||
private def aggregateExpressionsOfSameReference(expressions: Seq[Expression]): Seq[Expression] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggregateExpressionsOfSameReference
-> aggregateExpressionsOfSameQualifier
?
|
||
private def aggregateExpressionsOfSameReference(expressions: Seq[Expression]): Seq[Expression] = { | ||
expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a new empty line below?
Test build #123601 has finished for PR 28733 at commit
|
private def aggregateExpressionsOfSameQualifiers( | ||
expressions: Seq[Expression]): Seq[Expression] = { | ||
expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a test case dt = '1' OR (dt = '2' AND id = 1)
passed to conjunctiveNormalForm, still return dt = '1' OR (dt = '2' AND id = 1)
.
See qualifier when groupby , they are
List(List(spark_catalog, default, t))
List(List(spark_catalog, default, t))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can try
expressions.groupBy(_.references.flatMap(_.qualifier).toSet).map(_._2.reduceLeft(And)).toSeq
I will update this PR later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expressions.groupBy(_.references.flatMap(_.qualifier).toSet).map(_._2.reduceLeft(And)).toSeq
Not work, just
expressions.groupBy(_.references).map(_._2.reduceLeft(And)).toSeq
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The qualifier is the table name which is able to be used for aggregating more expressions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The qualifier is the table name which is able to be used for aggregating more expressions
Got the point, you did this for split condition to join children, I want convert scan predicate condition to optimize scan predicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is complex enough. Let's keep this part in this way for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I will raise pr for other problem base on your code and change a little after your pr merged.
val rightFilterConditions = | ||
pushDownCandidates.filter(_.references.subsetOf(right.outputSet)) | ||
|
||
val newLeft = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang Question: Can newLeft and newRight be declared lazy ? Seems like we need to compute it conditionally based on join type ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal sure, thanks for the suggestion.
Test build #123617 has finished for PR 28733 at commit
|
Test build #123618 has finished for PR 28733 at commit
|
Test build #123686 has finished for PR 28733 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
} | ||
resultStack.push(cnf) | ||
} | ||
assert(resultStack.length == 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just logWarning()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
Test build #123705 has finished for PR 28733 at commit
|
@wangyum @maropu @viirya @dilipbiswal @AngersZhuuuu @cloud-fan Thanks for the review. I think this PR is ready to be merged once the tests are passed. Let me know if you still have more comments. |
Test build #123803 has finished for PR 28733 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @gengliangwang. Looks okay to me.
(Just to check) Btw, it seems the previous works about CNF tried to implement this conversion in an independent rule for |
@maropu Yes pushing down predicates through join should be the major scenario. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
LGTM |
Test build #123831 has finished for PR 28733 at commit
|
Merging to master |
### What changes were proposed in this pull request? Spark can't push down scan predicate condition of **Or**: Such as if I have a table `default.test`, it's partition col is `dt`, If we use query : ``` select * from default.test where dt=20190625 or (dt = 20190626 and id in (1,2,3) ) ``` In this case, Spark will resolve **Or** condition as one expression, and since this expr has reference of "id", then it can't been push down. Base on pr #28733, In my PR , for SQL like `select * from default.test` `where dt = 20190626 or (dt = 20190627 and xxx="a") ` For this condition `dt = 20190626 or (dt = 20190627 and xxx="a" )`, it will been converted to CNF ``` (dt = 20190626 or dt = 20190627) and (dt = 20190626 or xxx = "a" ) ``` then condition `dt = 20190626 or dt = 20190627` will be push down when partition pruning ### Why are the changes needed? Optimize partition pruning ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Added UT Closes #28805 from AngersZhuuuu/cnf-for-partition-pruning. Lead-authored-by: angerszhu <[email protected]> Co-authored-by: AngersZhuuuu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
… Join/Partitions ### What changes were proposed in this pull request? In #28733 and #28805, CNF conversion is used to push down disjunctive predicates through join and partitions pruning. It's a good improvement, however, converting all the predicates in CNF can lead to a very long result, even with grouping functions over expressions. For example, for the following predicate ``` (p0 = '1' AND p1 = '1') OR (p0 = '2' AND p1 = '2') OR (p0 = '3' AND p1 = '3') OR (p0 = '4' AND p1 = '4') OR (p0 = '5' AND p1 = '5') OR (p0 = '6' AND p1 = '6') OR (p0 = '7' AND p1 = '7') OR (p0 = '8' AND p1 = '8') OR (p0 = '9' AND p1 = '9') OR (p0 = '10' AND p1 = '10') OR (p0 = '11' AND p1 = '11') OR (p0 = '12' AND p1 = '12') OR (p0 = '13' AND p1 = '13') OR (p0 = '14' AND p1 = '14') OR (p0 = '15' AND p1 = '15') OR (p0 = '16' AND p1 = '16') OR (p0 = '17' AND p1 = '17') OR (p0 = '18' AND p1 = '18') OR (p0 = '19' AND p1 = '19') OR (p0 = '20' AND p1 = '20') ``` will be converted into a long query(130K characters) in Hive metastore, and there will be error: ``` javax.jdo.JDOException: Exception thrown when executing query : SELECT DISTINCT 'org.apache.hadoop.hive.metastore.model.MPartition' AS NUCLEUS_TYPE,A0.CREATE_TIME,A0.LAST_ACCESS_TIME,A0.PART_NAME,A0.PART_ID,A0.PART_NAME AS NUCORDER0 FROM PARTITIONS A0 LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID WHERE B0.TBL_NAME = ? AND C0."NAME" = ? AND ((((((A0.PART_NAME LIKE '%/p1=1' ESCAPE '\' ) OR (A0.PART_NAME LIKE '%/p1=2' ESCAPE '\' )) OR (A0.PART_NAME LIKE '%/p1=3' ESCAPE '\' )) OR ((A0.PART_NAME LIKE '%/p1=4' ESCAPE '\' ) O ... ``` Essentially, we just need to traverse predicate and extract the convertible sub-predicates like what we did in #24598. There is no need to maintain the CNF result set. ### Why are the changes needed? A better implementation for pushing down disjunctive and complex predicates. The pushed down predicates is always equal or shorter than the CNF result. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #29101 from gengliangwang/pushJoin. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR add a new rule to support push predicate through join by rewriting join condition to CNF(conjunctive normal form). The following example is the steps of this rule:
Why are the changes needed?
Improve query performance. PostgreSQL, Impala and Hive support this feature.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test.