diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f63d79a21ed6..a82c4b5a153b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2617,6 +2617,26 @@ object SQLConf { .booleanConf .createWithDefault(false) + val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED = + buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled") + .doc("When true, if two bucketed tables with the different number of buckets are joined, " + + "the side with a bigger number of buckets will be coalesced to have the same number " + + "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " + + "and only when the bigger number of buckets is divisible by the smaller number of buckets.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio") + .doc("The ratio of the number of two buckets being coalesced should be less than or " + + "equal to this value for bucket coalescing to be applied. This configuration only " + + s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.") + .version("3.1.0") + .intConf + .checkValue(_ > 0, "The difference must be positive.") + .createWithDefault(4) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0ae39cf8560e6..458e11b97db6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -155,7 +155,8 @@ case class RowDataSourceScanExec( * @param output Output attributes of the scan, including data attributes and partition attributes. * @param requiredSchema Required schema of the underlying relation, excluding partition columns. * @param partitionFilters Predicates to use for partition pruning. - * @param optionalBucketSet Bucket ids for bucket pruning + * @param optionalBucketSet Bucket ids for bucket pruning. + * @param optionalNumCoalescedBuckets Number of coalesced buckets. * @param dataFilters Filters on non-partition columns. * @param tableIdentifier identifier for the table in the metastore. */ @@ -165,6 +166,7 @@ case class FileSourceScanExec( requiredSchema: StructType, partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], + optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { @@ -291,7 +293,8 @@ case class FileSourceScanExec( // above val spec = relation.bucketSpec.get val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) - val partitioning = HashPartitioning(bucketColumns, spec.numBuckets) + val numPartitions = optionalNumCoalescedBuckets.getOrElse(spec.numBuckets) + val partitioning = HashPartitioning(bucketColumns, numPartitions) val sortColumns = spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) val shouldCalculateSortOrder = @@ -311,7 +314,8 @@ case class FileSourceScanExec( files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) - if (singleFilePartitions) { + // TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced. + if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future sortColumns.map(attribute => SortOrder(attribute, Ascending)) @@ -356,7 +360,8 @@ case class FileSourceScanExec( spec.numBuckets } metadata + ("SelectedBucketsCount" -> - s"$numSelectedBuckets out of ${spec.numBuckets}") + (s"$numSelectedBuckets out of ${spec.numBuckets}" + + optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) } getOrElse { metadata } @@ -544,8 +549,19 @@ case class FileSourceScanExec( filesGroupedToBuckets } - val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) + val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets => + logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") + val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) + Seq.tabulate(numCoalescedBuckets) { bucketId => + val partitionedFiles = coalescedBuckets.get(bucketId).map { + _.values.flatten.toArray + }.getOrElse(Array.empty) + FilePartition(bucketId, partitionedFiles) + } + }.getOrElse { + Seq.tabulate(bucketSpec.numBuckets) { bucketId => + FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) + } } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) @@ -599,6 +615,7 @@ case class FileSourceScanExec( requiredSchema, QueryPlan.normalizePredicates(partitionFilters, output), optionalBucketSet, + optionalNumCoalescedBuckets, QueryPlan.normalizePredicates(dataFilters, output), None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 89915d254883d..bf60427e5f3bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} +import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInSortMergeJoin import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -331,6 +332,7 @@ object QueryExecution { // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( + CoalesceBucketsInSortMergeJoin(sparkSession.sessionState.conf), PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala new file mode 100644 index 0000000000000..3bb0597ecd87c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule coalesces one side of the `SortMergeJoin` if the following conditions are met: + * - Two bucketed tables are joined. + * - Join keys match with output partition expressions on their respective sides. + * - The larger bucket number is divisible by the smaller bucket number. + * - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true. + * - The ratio of the number of buckets is less than the value set in + * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO. + */ +case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends Rule[SparkPlan] { + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): Option[Int] = { + assert(numBuckets1 != numBuckets2) + val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) + // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller + // number of buckets because bucket id is calculated by modding the total number of buckets. + if (large % small == 0 && + large / small <= conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) { + Some(small) + } else { + None + } + } + + private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: Int): SparkPlan = { + plan.transformUp { + case f: FileSourceScanExec => + f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) + } + } + + def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) { + return plan + } + + plan transform { + case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, numRightBuckets) + if numLeftBuckets != numRightBuckets => + mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { numCoalescedBuckets => + if (numCoalescedBuckets != numLeftBuckets) { + smj.copy(left = updateNumCoalescedBuckets(smj.left, numCoalescedBuckets)) + } else { + smj.copy(right = updateNumCoalescedBuckets(smj.right, numCoalescedBuckets)) + } + }.getOrElse(smj) + case other => other + } + } +} + +/** + * An extractor that extracts `SortMergeJoinExec` where both sides of the join have the bucketed + * tables and are consisted of only the scan operation. + */ +object ExtractSortMergeJoinWithBuckets { + private def isScanOperation(plan: SparkPlan): Boolean = plan match { + case f: FilterExec => isScanOperation(f.child) + case p: ProjectExec => isScanOperation(p.child) + case _: FileSourceScanExec => true + case _ => false + } + + private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = { + plan.collectFirst { + case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty && + f.optionalNumCoalescedBuckets.isEmpty => + f.relation.bucketSpec.get + } + } + + /** + * The join keys should match with expressions for output partitioning. Note that + * the ordering does not matter because it will be handled in `EnsureRequirements`. + */ + private def satisfiesOutputPartitioning( + keys: Seq[Expression], + partitioning: Partitioning): Boolean = { + partitioning match { + case HashPartitioning(exprs, _) if exprs.length == keys.length => + exprs.forall(e => keys.exists(_.semanticEquals(e))) + case _ => false + } + } + + private def isApplicable(s: SortMergeJoinExec): Boolean = { + isScanOperation(s.left) && + isScanOperation(s.right) && + satisfiesOutputPartitioning(s.leftKeys, s.left.outputPartitioning) && + satisfiesOutputPartitioning(s.rightKeys, s.right.outputPartitioning) + } + + def unapply(plan: SparkPlan): Option[(SortMergeJoinExec, Int, Int)] = { + plan match { + case s: SortMergeJoinExec if isApplicable(s) => + val leftBucket = getBucketSpec(s.left) + val rightBucket = getBucketSpec(s.right) + if (leftBucket.isDefined && rightBucket.isDefined) { + Some(s, leftBucket.get.numBuckets, rightBucket.get.numBuckets) + } else { + None + } + case _ => None + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 477937d66ad9b..512024dff0051 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -208,6 +208,7 @@ object FileSourceStrategy extends Strategy with Logging { outputSchema, partitionKeyFilters.toSeq, bucketSet, + None, dataFilters, table.map(_.identifier)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 6b772e53ac184..0b4f43b72366b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -345,7 +345,7 @@ class DataFrameJoinSuite extends QueryTest } assert(broadcastExchanges.size == 1) val tables = broadcastExchanges.head.collect { - case FileSourceScanExec(_, _, _, _, _, _, Some(tableIdent)) => tableIdent + case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent)) => tableIdent } assert(tables.size == 1) assert(tables.head === TableIdentifier(table1Name, Some(dbName))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 5aeecd2df91e9..1ad97185a564a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -343,6 +343,23 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite assert(getNormalizedExplain(df1, FormattedMode) === getNormalizedExplain(df2, FormattedMode)) } } + + test("Coalesced bucket info should be a part of explain string") { + withTable("t1", "t2") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") + Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") + val df1 = spark.table("t1") + val df2 = spark.table("t2") + val joined = df1.join(df2, df1("i") === df2("i")) + checkKeywordsExistsInExplain( + joined, + SimpleMode, + "SelectedBucketsCount: 8 out of 8 (Coalesced to 4)" :: Nil: _*) + } + } + } } class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 8118eb4f79c17..347bc735a8b76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1313,7 +1313,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark // need to execute the query before we can examine fs.inputRDDs() assert(stripAQEPlan(df.queryExecution.executedPlan) match { case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( - fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)))) => + fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _)))) => partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala new file mode 100644 index 0000000000000..6a70045c55e64 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.optimizer.BuildLeft +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, StructType} + +class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkSession { + case class RelationSetting( + cols: Seq[Attribute], + numBuckets: Int, + expectedCoalescedNumBuckets: Option[Int]) + + object RelationSetting { + def apply(numBuckets: Int, expectedCoalescedNumBuckets: Option[Int]): RelationSetting = { + val cols = Seq(AttributeReference("i", IntegerType)()) + RelationSetting(cols, numBuckets, expectedCoalescedNumBuckets) + } + } + + case class JoinSetting( + leftKeys: Seq[Attribute], + rightKeys: Seq[Attribute], + leftRelation: RelationSetting, + rightRelation: RelationSetting, + isSortMergeJoin: Boolean) + + object JoinSetting { + def apply(l: RelationSetting, r: RelationSetting, isSortMergeJoin: Boolean): JoinSetting = { + JoinSetting(l.cols, r.cols, l, r, isSortMergeJoin) + } + } + + private def newFileSourceScanExec(setting: RelationSetting): FileSourceScanExec = { + val relation = HadoopFsRelation( + location = new InMemoryFileIndex(spark, Nil, Map.empty, None), + partitionSchema = PartitionSpec.emptySpec.partitionColumns, + dataSchema = StructType.fromAttributes(setting.cols), + bucketSpec = Some(BucketSpec(setting.numBuckets, setting.cols.map(_.name), Nil)), + fileFormat = new ParquetFileFormat(), + options = Map.empty)(spark) + FileSourceScanExec(relation, setting.cols, relation.dataSchema, Nil, None, None, Nil, None) + } + + private def run(setting: JoinSetting): Unit = { + val swappedSetting = setting.copy( + leftKeys = setting.rightKeys, + rightKeys = setting.leftKeys, + leftRelation = setting.rightRelation, + rightRelation = setting.leftRelation) + + Seq(setting, swappedSetting).foreach { case s => + val lScan = newFileSourceScanExec(s.leftRelation) + val rScan = newFileSourceScanExec(s.rightRelation) + val join = if (s.isSortMergeJoin) { + SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan) + } else { + BroadcastHashJoinExec( + s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan) + } + + val plan = CoalesceBucketsInSortMergeJoin(spark.sessionState.conf)(join) + + def verify(expected: Option[Int], subPlan: SparkPlan): Unit = { + val coalesced = subPlan.collect { + case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.nonEmpty => + f.optionalNumCoalescedBuckets.get + } + if (expected.isDefined) { + assert(coalesced.size == 1 && coalesced(0) == expected.get) + } else { + assert(coalesced.isEmpty) + } + } + + verify(s.leftRelation.expectedCoalescedNumBuckets, plan.asInstanceOf[BinaryExecNode].left) + verify(s.rightRelation.expectedCoalescedNumBuckets, plan.asInstanceOf[BinaryExecNode].right) + } + } + + test("bucket coalescing - basic") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true)) + } + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") { + run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true)) + } + } + + test("bucket coalescing should work only for sort merge join") { + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> enabled.toString) { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = false)) + } + } + } + + test("bucket coalescing shouldn't be applied when the number of buckets are the same") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + run(JoinSetting(RelationSetting(8, None), RelationSetting(8, None), isSortMergeJoin = true)) + } + } + + test("number of bucket is not divisible by other number of bucket") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + run(JoinSetting(RelationSetting(3, None), RelationSetting(8, None), isSortMergeJoin = true)) + } + } + + test("the ratio of the number of buckets is greater than max allowed") { + withSQLConf( + SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting(RelationSetting(4, None), RelationSetting(16, None), isSortMergeJoin = true)) + } + } + + test("join keys should match with output partitioning") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + val lCols = Seq( + AttributeReference("l1", IntegerType)(), + AttributeReference("l2", IntegerType)()) + val rCols = Seq( + AttributeReference("r1", IntegerType)(), + AttributeReference("r2", IntegerType)()) + + val lRel = RelationSetting(lCols, 4, None) + val rRel = RelationSetting(rCols, 8, None) + + // The following should not be coalesced because join keys do not match with output + // partitioning (missing one expression). + run(JoinSetting( + leftKeys = Seq(lCols.head), + rightKeys = Seq(rCols.head), + leftRelation = lRel, + rightRelation = rRel, + isSortMergeJoin = true)) + + // The following should not be coalesced because join keys do not match with output + // partitioning (more expressions). + run(JoinSetting( + leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), + rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), + leftRelation = lRel, + rightRelation = rRel, + isSortMergeJoin = true)) + + // The following will be coalesced since ordering should not matter because it will be + // adjusted in `EnsureRequirements`. + run(JoinSetting( + leftKeys = lCols.reverse, + rightKeys = rCols.reverse, + leftRelation = lRel, + rightRelation = RelationSetting(rCols, 8, Some(4)), + isSortMergeJoin = true)) + } + } + + test("FileSourceScanExec's metadata should be updated with coalesced info") { + val scan = newFileSourceScanExec(RelationSetting(8, None)) + val value = scan.metadata("SelectedBucketsCount") + assert(value === "8 out of 8") + + val scanWithCoalescing = scan.copy(optionalNumCoalescedBuckets = Some(4)) + val valueWithCoalescing = scanWithCoalescing.metadata("SelectedBucketsCount") + assert(valueWithCoalescing == "8 out of 8 (Coalesced to 4)") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 1d8303b9e7750..b6767eb3132ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -332,7 +332,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { bucketSpec: Option[BucketSpec], numPartitions: Int = 10, expectedShuffle: Boolean = true, - expectedSort: Boolean = true) + expectedSort: Boolean = true, + expectedNumOutputPartitions: Option[Int] = None) /** * A helper method to test the bucket read functionality using join. It will save `df1` and `df2` @@ -345,10 +346,18 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { bucketedTableTestSpecRight: BucketedTableTestSpec, joinType: String = "inner", joinCondition: (DataFrame, DataFrame) => Column): Unit = { - val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = - bucketedTableTestSpecLeft - val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = - bucketedTableTestSpecRight + val BucketedTableTestSpec( + bucketSpecLeft, + numPartitionsLeft, + shuffleLeft, + sortLeft, + numOutputPartitionsLeft) = bucketedTableTestSpecLeft + val BucketedTableTestSpec( + bucketSpecRight, + numPartitionsRight, + shuffleRight, + sortRight, + numOutputPartitionsRight) = bucketedTableTestSpecRight withTable("bucketed_table1", "bucketed_table2") { def withBucket( @@ -413,6 +422,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { assert( joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") + + // check the output partitioning + if (numOutputPartitionsLeft.isDefined) { + assert(joinOperator.left.outputPartitioning.numPartitions === + numOutputPartitionsLeft.get) + } + if (numOutputPartitionsRight.isDefined) { + assert(joinOperator.right.outputPartitioning.numPartitions === + numOutputPartitionsRight.get) + } } } } @@ -855,4 +874,112 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } } + + test("bucket coalescing eliminates shuffle") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. + // Currently, sort will be introduced for the side that is coalesced. + val testSpec1 = BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = false, + expectedSort = true, + expectedNumOutputPartitions = Some(4)) + val testSpec2 = BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = false, + expectedSort = false, + expectedNumOutputPartitions = Some(4)) + + Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => + testBucketing( + bucketedTableTestSpecLeft = specs._1, + bucketedTableTestSpecRight = specs._2, + joinCondition = joinCondition(Seq("i", "j"))) + } + } + } + + test("bucket coalescing is not satisfied") { + def run(testSpec1: BucketedTableTestSpec, testSpec2: BucketedTableTestSpec): Unit = { + Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => + testBucketing( + bucketedTableTestSpecLeft = specs._1, + bucketedTableTestSpecRight = specs._2, + joinCondition = joinCondition(Seq("i", "j"))) + } + } + + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") { + // Coalescing buckets is disabled by a config. + run( + BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + } + + withSQLConf( + SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { + // Coalescing buckets is not applied because the ratio of the number of buckets (3) + // is greater than max allowed (2). + run( + BucketedTableTestSpec( + Some(BucketSpec(12, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + } + + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + run( + // Coalescing buckets is not applied because the bigger number of buckets (8) is not + // divisible by the smaller number of buckets (7). + BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(7, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + } + } + + test("bucket coalescing is applied when join expressions match with partitioning expressions") { + withTable("t1", "t2") { + df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") + df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + def verify( + query: String, + expectedNumShuffles: Int, + expectedCoalescedNumBuckets: Option[Int]): Unit = { + val plan = sql(query).queryExecution.executedPlan + val shuffles = plan.collect { case s: ShuffleExchangeExec => s } + assert(shuffles.length == expectedNumShuffles) + + val scans = plan.collect { + case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + } + if (expectedCoalescedNumBuckets.isDefined) { + assert(scans.length == 1) + assert(scans(0).optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + } else { + assert(scans.isEmpty) + } + } + + // Coalescing applied since join expressions match with the bucket columns. + verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j", 0, Some(4)) + // Coalescing applied when columns are aliased. + verify( + "SELECT * FROM t1 JOIN (SELECT i AS x, j AS y FROM t2) ON t1.i = x AND t1.j = y", + 0, + Some(4)) + // Coalescing is not applied when join expressions do not match with bucket columns. + verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None) + } + } + } }