Skip to content

Commit

Permalink
[SPARK-20233][SQL] Apply star-join filter heuristics to dynamic progr…
Browse files Browse the repository at this point in the history
…amming join enumeration

## What changes were proposed in this pull request?

Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph:

```
T1       D1 - T2 - T3
  \     /
    F1
     |
    D2

star-join: {F1, D1, D2}
non-star: {T1, T2, T3}
```
The following join combinations will be generated:
```
level 0: (F1), (D1), (D2), (T1), (T2), (T3)
level 1: {F1, D1}, {F1, D2}, {T2, T3}
level 2: {F1, D1, D2}
level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2}
level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 }
level 6: {F1, D1, D2, T1, T2, T3}
```

## How was this patch tested?

New test suite ```StarJOinCostBasedReorderSuite.scala```.

Author: Ioana Delaney <[email protected]>

Closes apache#17546 from ioana-delaney/starSchemaCBOv3.
  • Loading branch information
ioana-delaney authored and Mingjie Tang committed Apr 18, 2017
1 parent f8c93e9 commit 4334fdd
Show file tree
Hide file tree
Showing 4 changed files with 571 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with Pr

private def reorder(plan: LogicalPlan, output: Seq[Attribute]): LogicalPlan = {
val (items, conditions) = extractInnerJoins(plan)
// TODO: Compute the set of star-joins and use them in the join enumeration
// algorithm to prune un-optimal plan choices.
val result =
// Do reordering if the number of items is appropriate and join conditions exist.
// We also need to check if costs of all items can be evaluated.
Expand Down Expand Up @@ -150,12 +148,15 @@ object JoinReorderDP extends PredicateHelper with Logging {
case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0))
}.toMap)

// Build filters from the join graph to be used by the search algorithm.
val filters = JoinReorderDPFilters.buildJoinGraphInfo(conf, items, conditions, itemIndex)

// Build plans for next levels until the last level has only one plan. This plan contains
// all items that can be joined, so there's no need to continue.
val topOutputSet = AttributeSet(output)
while (foundPlans.size < items.length && foundPlans.last.size > 1) {
while (foundPlans.size < items.length) {
// Build plans for the next level.
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet)
foundPlans += searchLevel(foundPlans, conf, conditions, topOutputSet, filters)
}

val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
Expand All @@ -179,7 +180,8 @@ object JoinReorderDP extends PredicateHelper with Logging {
existingLevels: Seq[JoinPlanMap],
conf: SQLConf,
conditions: Set[Expression],
topOutput: AttributeSet): JoinPlanMap = {
topOutput: AttributeSet,
filters: Option[JoinGraphInfo]): JoinPlanMap = {

val nextLevel = mutable.Map.empty[Set[Int], JoinPlan]
var k = 0
Expand All @@ -200,7 +202,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

otherSideCandidates.foreach { otherSidePlan =>
buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput) match {
buildJoin(oneSidePlan, otherSidePlan, conf, conditions, topOutput, filters) match {
case Some(newJoinPlan) =>
// Check if it's the first plan for the item set, or it's a better plan than
// the existing one due to lower cost.
Expand All @@ -218,28 +220,48 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

/**
* Builds a new JoinPlan when both conditions hold:
* Builds a new JoinPlan if the following conditions hold:
* - the sets of items contained in left and right sides do not overlap.
* - there exists at least one join condition involving references from both sides.
* - if star-join filter is enabled, allow the following combinations:
* 1) (oneJoinPlan U otherJoinPlan) is a subset of star-join
* 2) star-join is a subset of (oneJoinPlan U otherJoinPlan)
* 3) (oneJoinPlan U otherJoinPlan) is a subset of non star-join
*
* @param oneJoinPlan One side JoinPlan for building a new JoinPlan.
* @param otherJoinPlan The other side JoinPlan for building a new join node.
* @param conf SQLConf for statistics computation.
* @param conditions The overall set of join conditions.
* @param topOutput The output attributes of the final plan.
* @param filters Join graph info to be used as filters by the search algorithm.
* @return Builds and returns a new JoinPlan if both conditions hold. Otherwise, returns None.
*/
private def buildJoin(
oneJoinPlan: JoinPlan,
otherJoinPlan: JoinPlan,
conf: SQLConf,
conditions: Set[Expression],
topOutput: AttributeSet): Option[JoinPlan] = {
topOutput: AttributeSet,
filters: Option[JoinGraphInfo]): Option[JoinPlan] = {

if (oneJoinPlan.itemIds.intersect(otherJoinPlan.itemIds).nonEmpty) {
// Should not join two overlapping item sets.
return None
}

if (filters.isDefined) {
// Apply star-join filter, which ensures that tables in a star schema relationship
// are planned together. The star-filter will eliminate joins among star and non-star
// tables until the star joins are built. The following combinations are allowed:
// 1. (oneJoinPlan U otherJoinPlan) is a subset of star-join
// 2. star-join is a subset of (oneJoinPlan U otherJoinPlan)
// 3. (oneJoinPlan U otherJoinPlan) is a subset of non star-join
val isValidJoinCombination =
JoinReorderDPFilters.starJoinFilter(oneJoinPlan.itemIds, otherJoinPlan.itemIds,
filters.get)
if (!isValidJoinCombination) return None
}

val onePlan = oneJoinPlan.plan
val otherPlan = otherJoinPlan.plan
val joinConds = conditions
Expand Down Expand Up @@ -327,3 +349,109 @@ object JoinReorderDP extends PredicateHelper with Logging {
case class Cost(card: BigInt, size: BigInt) {
def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
}

/**
* Implements optional filters to reduce the search space for join enumeration.
*
* 1) Star-join filters: Plan star-joins together since they are assumed
* to have an optimal execution based on their RI relationship.
* 2) Cartesian products: Defer their planning later in the graph to avoid
* large intermediate results (expanding joins, in general).
* 3) Composite inners: Don't generate "bushy tree" plans to avoid materializing
* intermediate results.
*
* Filters (2) and (3) are not implemented.
*/
object JoinReorderDPFilters extends PredicateHelper {
/**
* Builds join graph information to be used by the filtering strategies.
* Currently, it builds the sets of star/non-star joins.
* It can be extended with the sets of connected/unconnected joins, which
* can be used to filter Cartesian products.
*/
def buildJoinGraphInfo(
conf: SQLConf,
items: Seq[LogicalPlan],
conditions: Set[Expression],
itemIndex: Seq[(LogicalPlan, Int)]): Option[JoinGraphInfo] = {

if (conf.joinReorderDPStarFilter) {
// Compute the tables in a star-schema relationship.
val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq)
val nonStarJoin = items.filterNot(starJoin.contains(_))

if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {
val itemMap = itemIndex.toMap
Some(JoinGraphInfo(starJoin.map(itemMap).toSet, nonStarJoin.map(itemMap).toSet))
} else {
// Nothing interesting to return.
None
}
} else {
// Star schema filter is not enabled.
None
}
}

/**
* Applies the star-join filter that eliminates join combinations among star
* and non-star tables until the star join is built.
*
* Given the oneSideJoinPlan/otherSideJoinPlan, which represent all the plan
* permutations generated by the DP join enumeration, and the star/non-star plans,
* the following plan combinations are allowed:
* 1. (oneSideJoinPlan U otherSideJoinPlan) is a subset of star-join
* 2. star-join is a subset of (oneSideJoinPlan U otherSideJoinPlan)
* 3. (oneSideJoinPlan U otherSideJoinPlan) is a subset of non star-join
*
* It assumes the sets are disjoint.
*
* Example query graph:
*
* t1 d1 - t2 - t3
* \ /
* f1
* |
* d2
*
* star: {d1, f1, d2}
* non-star: {t2, t1, t3}
*
* level 0: (f1 ), (d2 ), (t3 ), (d1 ), (t1 ), (t2 )
* level 1: {t3 t2 }, {f1 d2 }, {f1 d1 }
* level 2: {d2 f1 d1 }
* level 3: {t1 d1 f1 d2 }, {t2 d1 f1 d2 }
* level 4: {d1 t2 f1 t1 d2 }, {d1 t3 t2 f1 d2 }
* level 5: {d1 t3 t2 f1 t1 d2 }
*
* @param oneSideJoinPlan One side of the join represented as a set of plan ids.
* @param otherSideJoinPlan The other side of the join represented as a set of plan ids.
* @param filters Star and non-star plans represented as sets of plan ids
*/
def starJoinFilter(
oneSideJoinPlan: Set[Int],
otherSideJoinPlan: Set[Int],
filters: JoinGraphInfo) : Boolean = {
val starJoins = filters.starJoins
val nonStarJoins = filters.nonStarJoins
val join = oneSideJoinPlan.union(otherSideJoinPlan)

// Disjoint sets
oneSideJoinPlan.intersect(otherSideJoinPlan).isEmpty &&
// Either star or non-star is empty
(starJoins.isEmpty || nonStarJoins.isEmpty ||
// Join is a subset of the star-join
join.subsetOf(starJoins) ||
// Star-join is a subset of join
starJoins.subsetOf(join) ||
// Join is a subset of non-star
join.subsetOf(nonStarJoins))
}
}

/**
* Helper class that keeps information about the join graph as sets of item/plan ids.
* It currently stores the star/non-star plans. It can be
* extended with the set of connected/unconnected plans.
*/
case class JoinGraphInfo (starJoins: Set[Int], nonStarJoins: Set[Int])
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {

val emptyStarJoinPlan = Seq.empty[LogicalPlan]

if (!conf.starSchemaDetection || input.size < 2) {
if (input.size < 2) {
emptyStarJoinPlan
} else {
// Find if the input plans are eligible for star join detection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,12 @@ object SQLConf {
.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
.createWithDefault(0.7)

val JOIN_REORDER_DP_STAR_FILTER =
buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
.doc("Applies star-join filter heuristics to cost based join enumeration.")
.booleanConf
.createWithDefault(false)

val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
.doc("When true, it enables join reordering based on star schema detection. ")
.booleanConf
Expand Down Expand Up @@ -1011,6 +1017,8 @@ class SQLConf extends Serializable with Logging {

def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT)

def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)

def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)

def sortMergeJoinExecBufferSpillThreshold: Int =
Expand Down
Loading

0 comments on commit 4334fdd

Please sign in to comment.