Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17791][SQL] Join reordering using star schema detection #15363

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ case class SimpleCatalystConf(
override val cboEnabled: Boolean = false,
override val joinReorderEnabled: Boolean = false,
override val joinReorderDPThreshold: Int = 12,
override val starSchemaDetection: Boolean = false,
override val warehousePath: String = "/user/hive/warehouse",
override val sessionLocalTimeZone: String = TimeZone.getDefault().getID,
override val maxNestedViewDepth: Int = 100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr

def reorder(plan: LogicalPlan, output: AttributeSet): LogicalPlan = {
val (items, conditions) = extractInnerJoins(plan)
// TODO: Compute the set of star-joins and use them in the join enumeration
// algorithm to prune un-optimal plan choices.
val result =
// Do reordering if the number of items is appropriate and join conditions exist.
// We also need to check if costs of all items can be evaluated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
Batch("Operator Optimizations", fixedPoint,
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
ReorderJoin(conf),
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
: (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match {
case Join(left, right, joinType: InnerLike, cond) =>
val (plans, conditions) = flattenJoin(left, joinType)
(plans ++ Seq((right, joinType)), conditions ++ cond.toSeq)

(plans ++ Seq((right, joinType)), conditions ++
cond.toSeq.flatMap(splitConjunctivePredicates))
case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) =>
val (plans, conditions) = flattenJoin(j)
(plans, conditions ++ splitConjunctivePredicates(filterCondition))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,18 @@ object SQLConf {
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
.createWithDefault(0.7)

val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
.doc("When true, it enables join reordering based on star schema detection. ")
.booleanConf
.createWithDefault(false)

val STARSCHEMA_FACT_TABLE_RATIO = buildConf("spark.sql.cbo.starJoinFTRatio")
.internal()
.doc("Specifies the upper limit of the ratio between the largest fact tables" +
" for a star join to be considered. ")
.doubleConf
.createWithDefault(0.9)

val SESSION_LOCAL_TIMEZONE =
buildConf("spark.sql.session.timeZone")
.doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""")
Expand Down Expand Up @@ -988,6 +1000,10 @@ class SQLConf extends Serializable with Logging {

def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)

def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)

def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, InnerLike, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.catalyst.SimpleCatalystConf

class JoinOptimizationSuite extends PlanTest {

Expand All @@ -38,7 +38,7 @@ class JoinOptimizationSuite extends PlanTest {
CombineFilters,
PushDownPredicate,
BooleanSimplification,
ReorderJoin,
ReorderJoin(SimpleCatalystConf(true)),
PushPredicateThroughJoin,
ColumnPruning,
CollapseProject) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan}
import org.apache.spark.sql.catalyst.util._


class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
Expand All @@ -38,7 +37,7 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
Batch("Operator Optimizations", FixedPoint(100),
CombineFilters,
PushDownPredicate,
ReorderJoin,
ReorderJoin(conf),
PushPredicateThroughJoin,
ColumnPruning,
CollapseProject) ::
Expand Down Expand Up @@ -203,27 +202,7 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
originalPlan: LogicalPlan,
groundTruthBestPlan: LogicalPlan): Unit = {
val optimized = Optimize.execute(originalPlan.analyze)
val normalized1 = normalizePlan(normalizeExprIds(optimized))
val normalized2 = normalizePlan(normalizeExprIds(groundTruthBestPlan.analyze))
if (!sameJoinPlan(normalized1, normalized2)) {
fail(
s"""
|== FAIL: Plans do not match ===
|${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
""".stripMargin)
}
}

/** Consider symmetry for joins when comparing plans. */
private def sameJoinPlan(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
(plan1, plan2) match {
case (j1: Join, j2: Join) =>
(sameJoinPlan(j1.left, j2.left) && sameJoinPlan(j1.right, j2.right)) ||
(sameJoinPlan(j1.left, j2.right) && sameJoinPlan(j1.right, j2.left))
case _ if plan1.children.nonEmpty && plan2.children.nonEmpty =>
(plan1.children, plan2.children).zipped.forall { case (c1, c2) => sameJoinPlan(c1, c2) }
case _ =>
plan1 == plan2
}
val expected = groundTruthBestPlan.analyze
compareJoinOrder(optimized, expected)
}
}
Loading