Skip to content

Commit

Permalink
Change back to single ratio config for SMJ and SHJ, and rebase to lat…
Browse files Browse the repository at this point in the history
…est master
  • Loading branch information
c21 committed Jul 21, 2020
1 parent d6c9d88 commit 2e9aff9
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2644,13 +2644,15 @@ object SQLConf {
"the side with a bigger number of buckets will be coalesced to have the same number " +
"of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
"number of buckets. Bucket coalescing is applied to sort-merge joins and " +
"shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " +
"in join, but it also reduces parallelism and could possibly cause OOM for " +
"shuffled hash join.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
Expand Down Expand Up @@ -2681,7 +2683,7 @@ object SQLConf {
.intConf
.checkValue(_ >= 0, "The value must be non-negative.")
.createWithDefault(8)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3284,11 +3286,8 @@ class SQLConf extends Serializable with Logging {

def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)

def coalesceBucketsInSortMergeJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)

def coalesceBucketsInShuffledHashJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO)
def coalesceBucketsInJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)

/** ********************** SQLConf functionality methods ************ */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
* - The larger bucket number is divisible by the smaller bucket number.
* - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
* - The ratio of the number of buckets is less than the value set in
* COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or,
* COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
* COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
*/
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
private def updateNumCoalescedBucketsInScan(
Expand Down Expand Up @@ -89,19 +88,17 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
}

plan transform {
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) =>
val bucketRatio = math.max(numLeftBuckets, numRightBuckets) /
math.min(numLeftBuckets, numRightBuckets)
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
conf.coalesceBucketsInJoinMaxBucketRatio =>
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
join match {
case j: SortMergeJoinExec
if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio =>
case j: SortMergeJoinExec =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case j: ShuffledHashJoinExec
// Only coalesce the buckets for shuffled hash join stream side,
// to avoid OOM for build side.
if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio &&
isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case other => other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,13 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
}

test("the ratio of the number of buckets is greater than max allowed") {
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") {
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN))
}
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") {
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN,
shjBuildSide = Some(BuildLeft)))
}
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") {
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN))
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN,
shjBuildSide = Some(BuildLeft)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {

withSQLConf(
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") {
SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") {
// Coalescing buckets is not applied because the ratio of the number of buckets (3)
// is greater than max allowed (2).
run(
Expand Down

0 comments on commit 2e9aff9

Please sign in to comment.