-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17791][SQL] Join reordering using star schema detection #15363
Conversation
ok to test |
The design doc can be downloaded from the link: https://issues.apache.org/jira/secure/attachment/12831827/StarJoinReordering1005.doc Below is the slides with the performance number: The performance testing using 1TB TPC-DS workload shows an overall improvement of 19%. Compared to baseline (Negative = improvement; Positive = Degradation):
|
ok to test |
Test build #66416 has finished for PR 15363 at commit
|
// | | ||
// s2 - d3 | ||
// Uses Local Relations to easily control the size of the tables. | ||
// e.g. f1 > s2 > d1 > d2 > d3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, you might need a description about the snowflake schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I’ve updated the comments and made some changes to the schema.
*/ | ||
private def isSelectiveStarJoin( | ||
starJoinPlan: Seq[LogicalPlan], | ||
conditions: Seq[Expression]): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing the function signature to?
private def isSelectiveStarJoin(
factTable: LogicalPlan,
dimTables: Seq[LogicalPlan],
conditions: Seq[Expression]): Boolean = {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Thank you for reviewing the changes. I agree with your suggestions. It’s more clear if we pass fact + dimension tables.
518d8e5
to
9bddb86
Compare
Test build #67474 has finished for PR 15363 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR looks great overall, the benchmark result is great. Currently we do not have a good cost estimation (may not have in long term due to the fact that Spark SQL is an open engine for many different data sources) that limit the affect from this optimization. I think we should be more defensive to avoid potential regressions (user will see any regression as a blocker for them to use this feature or upgrade). Have you check the queries that regressed in the benchmark? It will be good know that what the cases it make a bad assumption.
val predicates = splitConjunctivePredicates(filterCond).filter(canEvaluate(_, t)) | ||
Some(t, predicates) | ||
|
||
case Filter(filterCond, p @ Project(_, t: LeafNode)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the pattern recursively to avoid these combinations?
case t: LeafNode =>
case Project(_, BaseTableAccess(t, cons)) =>
case Filter(c, BaseTableAccess(t, cons)) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will look into that. Thank you.
val STARJOIN_OPTIMIZATION = SQLConfigBuilder("spark.sql.starJoinOptimization") | ||
.doc("When true, it enables join reordering based on star schema detection. ") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use a internal config and enable this by default to have better test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you suggest to have another internal config that will be enabled for testing purposes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I means make this config internal, and true by default, if we got enough confidence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ioana-delaney Could you address @davies 's comment? Thanks!
createOrderedJoin(input, conditions) | ||
case ExtractFiltersAndInnerJoins(input, conditions) | ||
if input.size >= 2 && conditions.nonEmpty => | ||
val starJoinPlan = findEligibleStarJoinPlan(input, input, conditions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we put this behind the feature flag (in case that we have a bug in it, we could use the feature flag to workaround it)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I will change that.
// Return an empty plan list and fall back to the default join reordering. | ||
Seq.empty[(LogicalPlan, InnerLike)] | ||
|
||
case table1 :: table2 :: _ if table1.size == table2.size => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we replace this equality check with a approximate one?
The size of bytes or cardinality are usually not accurate, should we just use the scale (log of size) instead of accurate size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The “size” should represent table cardinality after applying the pushed down local predicates i.e. num_rows*selectivity. Temporarily, I used the sizeinBytes value since the join strategies are also using this value to make planning decisions. Long term, a fact table will be determined based on the referential integrity constraints with the other tables. Then, the star join will be planned based on joins’ selectivity.
Seq.empty[(LogicalPlan, InnerLike)] | ||
|
||
case table1 :: table2 :: _ if table1.size == table2.size => | ||
// There are more tables with the same size. Conservatively, fall back to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same size => similar size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we assume that the fact table should be much larger (1+ magnitude) than others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Yesterday, I forgot to reply to this comment.
Here, we are considering the case when we have multiple fact tables in the query, or a fact table is referenced multiple times. For example, if we have multiple star joins with the fact table referencing the same base table, we cannot make good planning decisions. Therefore, I am conservatively falling back to the positional join.
Similarly, if the query references multiple fact tables that have comparable sizes, we might want to fall back to the positional join. For this case, I also thought of introducing some scale factor, but it's hard to come up with an estimate. I can follow up with some people that have more experience with the warehouse db design and find out what they think.
// This is a selective star join and all dimensions are base table access. | ||
// Compute the size of the dimensions and return the star join | ||
// with the most selective dimensions joined lower in the plan. | ||
val sortedDims = eligibleDimPlans.map { plan => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the order of dimension table should be based on the selective other than the size of dimension table.
Without a good approximate of selectivity, I'd preserve the order of them so user have the ability to adjust them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing could be useful is that placing the selective broadcast join before shuffle join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the order of the dimensions should be determine based on the join selectivity. Using table size is a temporary approximation. But preserving the order of the dimensions would be a too conservative approach. Based on our performance results in most cases this heuristic led to a good join ordering. In case we make a mistake, we can always switch to the default, positional join order.
I will look into the broadcast vs shuffle join ordering and get back to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Sorry for the delay in replying. Regarding the broadcast vs shuffle join comment, I’ve looked at the join strategies. The broadcast join is the default strategy and applies if the inner is smaller than the recommended threshold. Given that the algorithm reorders the dimensions with the smallest dimension lower in the plan, the broadcast join is favored over the repartition/shuffle join. In the future, I assume that the two join alternatives will be evaluated as part of the CBO cost model.
@davies Thank you for reviewing the code! I see this work as evolving and improving with the support of CBO. Without statistics and features such as cardinality and selectivity, we cannot provide an optimal join reordering. There were two types of regressions. The first type was caused by reordering a non-selective star join. The query did not apply any local predicate on the dimension tables and the join between two large fact tables happen to be very selective. To fix this category of queries, the algorithm will not attempt to reorder a non-selective join. A non-selective join is a join that does not apply local predicates on dimension tables. The other category of problem was caused by the more general issue of lacking predicate selectivity. To overcome this problem, we introduced the “predicate selectivity hint” feature, to allow the user to specify the selectivity of the predicate. With that, we are able to plan selective dimension first. The JIRA for predicate selectivity was not yet opened. Then, to further guard against bad plans, we put the feature under the starJoinOptimization option. I was thinking that, to be more conservatives, I can further enforce a certain number of joins in the star. In general, a star join consist of a fact table and at least two dimensions. I can add this restriction to the algorithm. |
9bddb86
to
c21de3e
Compare
Test build #68089 has finished for PR 15363 at commit
|
retest this please |
c21de3e
to
cca4b9f
Compare
Test build #68099 has finished for PR 15363 at commit
|
cca4b9f
to
9151a13
Compare
The following updates were made:
|
Test build #70159 has finished for PR 15363 at commit
|
9151a13
to
ed46536
Compare
Test build #70839 has finished for PR 15363 at commit
|
@@ -42,7 +366,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { | |||
* @param conditions a list of condition for join. | |||
*/ | |||
@tailrec | |||
def createOrderedJoin(input: Seq[(LogicalPlan, InnerLike)], conditions: Seq[Expression]) | |||
private def createOrderedJoin(input: Seq[(LogicalPlan, InnerLike)], conditions: Seq[Expression]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid making it private ? Like we in snappydata plugin external rules for join order and utilize this from that other rule. I suppose there might be others too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compiler will complain if the method is public. But we can keep it final.
ed46536
to
072e3a9
Compare
Test build #74068 has finished for PR 15363 at commit
|
@gatorsmile @wzhfy Would you please review this PR. Thank you. |
* 2) If the top largest tables have comparable number of rows, fall back to the default | ||
* join reordering. This will prevent changing the position of the large tables in the join. | ||
*/ | ||
def findStarJoinPlan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: -> private def
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile The star join is called from join reordering.
case Nil => | ||
emptyStarJoinPlan | ||
case table1 :: table2 :: _ if table2.size.get.toDouble > | ||
conf.starJoinFactTableRatio * table1.size.get.toDouble => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: style issue.
case table1 :: table2 :: _
if table2.size.get.toDouble > conf.starJoinFactTableRatio * table1.size.get.toDouble =>
// The largest tables have comparable number of rows.
emptyStarJoinPlan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Done.
|
||
// Verify if the join columns have valid statistics | ||
val areStatsAvailable = allFactJoins.forall { plan => | ||
val dimTable = plan._1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found you used plan._1
multiple times. We prefer to using another way:
val areStatsAvailable = allFactJoins.forall { case (dimTable, _) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Done.
/** | ||
* Computes table cardinality after applying the predicates. | ||
* Currently, the function returns table cardinality. | ||
* When predicate selectivity is implemented in Catalyst, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible we can use the work in the resolved JIRA SPARK-17075: Cardinality Estimation of Predicate Expressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Yes, thank you. I forgot about the recent cbo cardinality changes. I've incorporated them.
val distinctCount = colStats.get.distinctCount | ||
val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d) | ||
// ndvMaxErr adjusted based on TPCDS 1TB data results | ||
if (relDiff <= conf.ndvMaxError * 2) true else false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line can be simplified to relDiff <= conf.ndvMaxError * 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #74804 has finished for PR 15363 at commit
|
… dropped ### What changes were proposed in this pull request? This PR is to fix the following test failure in maven and the PR apache#15363. > org.apache.spark.sql.hive.orc.OrcSourceSuite SPARK-19459/SPARK-18220: read char/varchar column written by Hive The[ test history](https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.hive.orc.OrcSourceSuite&test_name=SPARK-19459%2FSPARK-18220%3A+read+char%2Fvarchar+column+written+by+Hive) shows all the maven builds failed this test case with the same error message. ``` FAILED: SemanticException [Error 10072]: Database does not exist: db2 org.apache.spark.sql.execution.QueryExecutionException: FAILED: SemanticException [Error 10072]: Database does not exist: db2 at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:637) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:621) at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:288) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:229) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:228) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:271) at org.apache.spark.sql.hive.client.HiveClientImpl.runHive(HiveClientImpl.scala:621) at org.apache.spark.sql.hive.client.HiveClientImpl.runSqlHive(HiveClientImpl.scala:611) at org.apache.spark.sql.hive.orc.OrcSuite$$anonfun$7.apply$mcV$sp(OrcSourceSuite.scala:160) at org.apache.spark.sql.hive.orc.OrcSuite$$anonfun$7.apply(OrcSourceSuite.scala:155) at org.apache.spark.sql.hive.orc.OrcSuite$$anonfun$7.apply(OrcSourceSuite.scala:155) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) ``` ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes apache#17344 from gatorsmile/testtest.
1f6a3d6
to
891813f
Compare
@gatorsmile @cloud-fan I rewrote the test cases to align to the join reorder suite. Please take a look. Thanks. |
Test build #74842 has finished for PR 15363 at commit
|
retest this please |
Test build #74847 has finished for PR 15363 at commit
|
// and d3_fk1 = s3_pk1 | ||
// | ||
// Default join reordering: d1, f1, d2, d3, s3 | ||
// Star join reordering: f1, d1, d3, d2,, d3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 d3
, typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan It's a typo. I will fix in my next PR.
// and d3_fk1 = s3_pk1 | ||
// | ||
// Default join reordering: d1, f1, d2, d3, s3 | ||
// Star join reordering: f1, d1, d3, d2, d3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the last d3
should be s3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Yes, it's a typo like above. I did some small changes to the queries when I rewrote the test suite and didn't update the code comments properly. I will fix. Thanks!
thanks, merging to master! The next step is consolidating this with CBO, looking forward to it :) |
What changes were proposed in this pull request?
Star schema consists of one or more fact tables referencing a number of dimension tables. In general, queries against star schema are expected to run fast because of the established RI constraints among the tables. This design proposes a join reordering based on natural, generally accepted heuristics for star schema queries:
The design document was included in SPARK-17791.
Link to the google doc: StarSchemaDetection
How was this patch tested?
A new test suite StarJoinSuite.scala was implemented.