diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f6e0273ba2ee2..9d2891e4159eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2657,13 +2657,15 @@ object SQLConf { "the side with a bigger number of buckets will be coalesced to have the same number " + "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + "number of buckets. Bucket coalescing is applied to sort-merge joins and " + + "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " + + "in join, but it also reduces parallelism and could possibly cause OOM for " + "shuffled hash join.") .version("3.1.0") .booleanConf .createWithDefault(false) - val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio") + val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced should be less than or " + "equal to this value for bucket coalescing to be applied. This configuration only " + s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") @@ -2672,18 +2674,6 @@ object SQLConf { .checkValue(_ > 0, "The difference must be positive.") .createWithDefault(4) - val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio") - .doc("The ratio of the number of two buckets being coalesced should be less than or " + - "equal to this value for bucket coalescing to be applied. This configuration only " + - s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " + - "Note as coalescing reduces parallelism, there might be a higher risk for " + - "out of memory error at shuffled hash join build side.") - .version("3.1.0") - .intConf - .checkValue(_ > 0, "The difference must be positive.") - .createWithDefault(2) - /** * Holds information about keys that have been deprecated. * @@ -3285,11 +3275,8 @@ class SQLConf extends Serializable with Logging { def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) - def coalesceBucketsInSortMergeJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO) - - def coalesceBucketsInShuffledHashJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO) + def coalesceBucketsInJoinMaxBucketRatio: Int = + getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index 607eb6fd5661f..258a1a1adc20a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -36,8 +36,7 @@ import org.apache.spark.sql.internal.SQLConf * - The larger bucket number is divisible by the smaller bucket number. * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true. * - The ratio of the number of buckets is less than the value set in - * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or, - * COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`). + * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. */ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { private def updateNumCoalescedBucketsInScan( @@ -89,19 +88,17 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { } plan transform { - case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) => - val bucketRatio = math.max(numLeftBuckets, numRightBuckets) / - math.min(numLeftBuckets, numRightBuckets) + case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) + if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= + conf.coalesceBucketsInJoinMaxBucketRatio => val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) join match { - case j: SortMergeJoinExec - if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio => + case j: SortMergeJoinExec => updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) case j: ShuffledHashJoinExec // Only coalesce the buckets for shuffled hash join stream side, // to avoid OOM for build side. - if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio && - isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => + if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) case other => other } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala index 317a34e5157c6..7d59a6bdd4fd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala @@ -179,16 +179,13 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { } test("the ratio of the number of buckets is greater than max allowed") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) - } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - } + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 7068c7ca92013..98886d271e977 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -922,7 +922,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf( SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { // Coalescing buckets is not applied because the ratio of the number of buckets (3) // is greater than max allowed (2). run(