Skip to content

Commit

Permalink
Change back to single ratio config for SMJ and SHJ, and rebase to lat…
Browse files Browse the repository at this point in the history
…est master
  • Loading branch information
c21 committed Jul 19, 2020
1 parent 6aa17dd commit 4c65c7f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2657,13 +2657,15 @@ object SQLConf {
"the side with a bigger number of buckets will be coalesced to have the same number " +
"of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
"number of buckets. Bucket coalescing is applied to sort-merge joins and " +
"shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " +
"in join, but it also reduces parallelism and could possibly cause OOM for " +
"shuffled hash join.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
Expand All @@ -2672,18 +2674,6 @@ object SQLConf {
.checkValue(_ > 0, "The difference must be positive.")
.createWithDefault(4)

val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio")
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " +
"Note as coalescing reduces parallelism, there might be a higher risk for " +
"out of memory error at shuffled hash join build side.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The difference must be positive.")
.createWithDefault(2)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3285,11 +3275,8 @@ class SQLConf extends Serializable with Logging {

def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)

def coalesceBucketsInSortMergeJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)

def coalesceBucketsInShuffledHashJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO)
def coalesceBucketsInJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)

/** ********************** SQLConf functionality methods ************ */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
* - The larger bucket number is divisible by the smaller bucket number.
* - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
* - The ratio of the number of buckets is less than the value set in
* COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or,
* COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
* COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
*/
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
private def updateNumCoalescedBucketsInScan(
Expand Down Expand Up @@ -89,19 +88,17 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
}

plan transform {
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) =>
val bucketRatio = math.max(numLeftBuckets, numRightBuckets) /
math.min(numLeftBuckets, numRightBuckets)
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
conf.coalesceBucketsInJoinMaxBucketRatio =>
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
join match {
case j: SortMergeJoinExec
if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio =>
case j: SortMergeJoinExec =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case j: ShuffledHashJoinExec
// Only coalesce the buckets for shuffled hash join stream side,
// to avoid OOM for build side.
if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio &&
isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
case other => other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,13 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
}

test("the ratio of the number of buckets is greater than max allowed") {
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") {
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN))
}
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") {
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN,
shjBuildSide = Some(BuildLeft)))
}
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") {
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN))
run(JoinSetting(
RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN,
shjBuildSide = Some(BuildLeft)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {

withSQLConf(
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") {
SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") {
// Coalescing buckets is not applied because the ratio of the number of buckets (3)
// is greater than max allowed (2).
run(
Expand Down

0 comments on commit 4c65c7f

Please sign in to comment.