Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable #29079

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2638,21 +2638,24 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
val COALESCE_BUCKETS_IN_JOIN_ENABLED =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
.doc("When true, if two bucketed tables with the different number of buckets are joined, " +
"the side with a bigger number of buckets will be coalesced to have the same number " +
"of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
"and only when the bigger number of buckets is divisible by the smaller number of buckets.")
"of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
"number of buckets. Bucket coalescing is applied to sort-merge joins and " +
"shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " +
"in join, but it also reduces parallelism and could possibly cause OOM for " +
"shuffled hash join.")
c21 marked this conversation as resolved.
Show resolved Hide resolved
.version("3.1.0")
.booleanConf
.createWithDefault(false)

val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
c21 marked this conversation as resolved.
Show resolved Hide resolved
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The difference must be positive.")
Expand Down Expand Up @@ -3269,6 +3272,11 @@ class SQLConf extends Serializable with Logging {

def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)

def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)

def coalesceBucketsInJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInSortMergeJoin
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
Expand Down Expand Up @@ -332,7 +332,7 @@ object QueryExecution {
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
CoalesceBucketsInSortMergeJoin(sparkSession.sessionState.conf),
CoalesceBucketsInJoin(sparkSession.sessionState.conf),
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.bucketing

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

/**
* This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
* if the following conditions are met:
* - Two bucketed tables are joined.
* - Join keys match with output partition expressions on their respective sides.
* - The larger bucket number is divisible by the smaller bucket number.
* - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
* - The ratio of the number of buckets is less than the value set in
* COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
*/
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
private def updateNumCoalescedBucketsInScan(
plan: SparkPlan,
numCoalescedBuckets: Int): SparkPlan = {
plan transformUp {
case f: FileSourceScanExec =>
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
}
}

private def updateNumCoalescedBuckets(
join: BaseJoinExec,
numLeftBuckets: Int,
numRightBucket: Int,
numCoalescedBuckets: Int): BaseJoinExec = {
if (numCoalescedBuckets != numLeftBuckets) {
val leftCoalescedChild =
updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
join match {
case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
}
} else {
val rightCoalescedChild =
updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
join match {
case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
}
}
}

private def isCoalesceSHJStreamSide(
join: ShuffledHashJoinExec,
numLeftBuckets: Int,
numRightBucket: Int,
numCoalescedBuckets: Int): Boolean = {
if (numCoalescedBuckets == numLeftBuckets) {
join.buildSide != BuildRight
} else {
join.buildSide != BuildLeft
}
}

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.coalesceBucketsInJoinEnabled) {
return plan
}

plan transform {
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
conf.coalesceBucketsInJoinMaxBucketRatio =>
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
join match {
case j: SortMergeJoinExec =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case j: ShuffledHashJoinExec
// Only coalesce the buckets for shuffled hash join stream side,
// to avoid OOM for build side.
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case other => other
}
case other => other
}
}
}

/**
* An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
* where both sides of the join have the bucketed tables,
* are consisted of only the scan operation,
* and numbers of buckets are not equal but divisible.
*/
object ExtractJoinWithBuckets {
@tailrec
private def hasScanOperation(plan: SparkPlan): Boolean = plan match {
case f: FilterExec => hasScanOperation(f.child)
case p: ProjectExec => hasScanOperation(p.child)
case _: FileSourceScanExec => true
case _ => false
}

private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
plan.collectFirst {
case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
f.optionalNumCoalescedBuckets.isEmpty =>
f.relation.bucketSpec.get
}
}

/**
* The join keys should match with expressions for output partitioning. Note that
* the ordering does not matter because it will be handled in `EnsureRequirements`.
viirya marked this conversation as resolved.
Show resolved Hide resolved
*/
private def satisfiesOutputPartitioning(
keys: Seq[Expression],
partitioning: Partitioning): Boolean = {
partitioning match {
case HashPartitioning(exprs, _) if exprs.length == keys.length =>
exprs.forall(e => keys.exists(_.semanticEquals(e)))
case _ => false
}
}

private def isApplicable(j: BaseJoinExec): Boolean = {
(j.isInstanceOf[SortMergeJoinExec] ||
j.isInstanceOf[ShuffledHashJoinExec]) &&
hasScanOperation(j.left) &&
hasScanOperation(j.right) &&
satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
}

private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = {
val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2))
// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller
// number of buckets because bucket id is calculated by modding the total number of buckets.
numBuckets1 != numBuckets2 && large % small == 0
}

def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = {
plan match {
case j: BaseJoinExec if isApplicable(j) =>
val leftBucket = getBucketSpec(j.left)
val rightBucket = getBucketSpec(j.right)
if (leftBucket.isDefined && rightBucket.isDefined &&
isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) {
Some(j, leftBucket.get.numBuckets, rightBucket.get.numBuckets)
} else {
None
}
case _ => None
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
test("Coalesced bucket info should be a part of explain string") {
withTable("t1", "t2") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1")
Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2")
val df1 = spark.table("t1")
Expand Down
Loading