From 40b07073949e41fe275a38f7a367b789ed279afb Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 13 Apr 2020 16:59:31 -0700 Subject: [PATCH] Address comments --- .../spark/sql/catalyst/trees/TreeNode.scala | 8 -- .../apache/spark/sql/internal/SQLConf.scala | 14 ++- .../sql/execution/DataSourceScanExec.scala | 32 ++--- .../spark/sql/execution/SparkOptimizer.scala | 4 +- .../bucketing/CoalesceBucketInJoin.scala | 110 ++++++++++++++++++ .../bucketing/InjectBucketHint.scala | 78 ------------- .../datasources/FileSourceStrategy.scala | 2 + .../apache/spark/sql/DataFrameJoinSuite.scala | 2 +- .../org/apache/spark/sql/SubquerySuite.scala | 2 +- .../spark/sql/sources/BucketedReadSuite.scala | 18 +-- 10 files changed, 147 insertions(+), 123 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/InjectBucketHint.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index abe03e8f53fad..c4a106702a515 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -159,14 +159,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) } } - /** - * Tests whether a predicate holds for all nodes. - * @param p the predicate function to be applied to each node in the tree. - */ - def forall(p: BaseType => Boolean): Boolean = { - p(this) && children.forall(_.forall(p)) - } - /** * Runs the given function on this node and then recursively on [[children]]. * @param f the function to be applied to each node in the tree. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f96b83518e52e..d6d05814862b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2574,8 +2574,8 @@ object SQLConf { .booleanConf .createWithDefault(false) - val BUCKETING_COALESCE_ENABLED = - buildConf("spark.sql.bucketing.coalesce") + val COALESCE_BUCKET_IN_JOIN_ENABLED = + buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled") .internal() .doc("When true, if two bucketed tables with a different number of buckets are joined, " + "the side with a bigger number of buckets will be coalesced to have the same number " + @@ -2585,6 +2585,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF = + buildConf("spark.sql.bucketing.coalesceBucketInJoin.maxNumBucketsDiff") + .doc("The difference in count of two buckets being coalesced should be less than or " + + "equal to this value for bucket coalescing to be applied. This configuration only " + + s"has an effect when '${COALESCE_BUCKET_IN_JOIN_ENABLED.key}' is set to true.") + .version("3.1.0") + .intConf + .checkValue(_ > 0, "The minimum number of partitions must be positive.") + .createWithDefault(256) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 1b8d94f8b02bf..48aa7c4c3bcfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.bucketing.InjectBucketHint +import org.apache.spark.sql.execution.bucketing.CoalesceBucketInJoin import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -154,7 +154,8 @@ case class RowDataSourceScanExec( * @param output Output attributes of the scan, including data attributes and partition attributes. * @param requiredSchema Required schema of the underlying relation, excluding partition columns. * @param partitionFilters Predicates to use for partition pruning. - * @param optionalBucketSet Bucket ids for bucket pruning + * @param optionalBucketSet Bucket ids for bucket pruning. + * @param optionalCoalescedNumBuckets Coalesced number of buckets. * @param dataFilters Filters on non-partition columns. * @param tableIdentifier identifier for the table in the metastore. */ @@ -164,6 +165,7 @@ case class FileSourceScanExec( requiredSchema: StructType, partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], + optionalCoalescedNumBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { @@ -268,23 +270,6 @@ case class FileSourceScanExec( } } - /** - * A bucket can be coalesced if the number of buckets for this relation is divisible - * by the number of buckets on the other side of table for join. - */ - private lazy val coalescedNumBuckets: Option[Int] = { - val joinHintNumBuckets = relation.options.get(InjectBucketHint.JOIN_HINT_NUM_BUCKETS) - if (relation.bucketSpec.isDefined && - SQLConf.get.getConf(SQLConf.BUCKETING_COALESCE_ENABLED) && - joinHintNumBuckets.isDefined && - joinHintNumBuckets.get.toInt < relation.bucketSpec.get.numBuckets && - relation.bucketSpec.get.numBuckets % joinHintNumBuckets.get.toInt == 0) { - Some(joinHintNumBuckets.get.toInt) - } else { - None - } - } - override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { if (bucketedScan) { // For bucketed columns: @@ -307,7 +292,7 @@ case class FileSourceScanExec( // above val spec = relation.bucketSpec.get val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) - val numPartitions = coalescedNumBuckets.getOrElse(spec.numBuckets) + val numPartitions = optionalCoalescedNumBuckets.getOrElse(spec.numBuckets) val partitioning = HashPartitioning(bucketColumns, numPartitions) val sortColumns = spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) @@ -329,7 +314,7 @@ case class FileSourceScanExec( val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) // TODO Sort order is currently ignored if buckets are coalesced. - if (singleFilePartitions && coalescedNumBuckets.isEmpty) { + if (singleFilePartitions && optionalCoalescedNumBuckets.isEmpty) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future sortColumns.map(attribute => SortOrder(attribute, Ascending)) @@ -558,8 +543,8 @@ case class FileSourceScanExec( filesGroupedToBuckets } - val filePartitions = if (coalescedNumBuckets.isDefined) { - val newNumBuckets = coalescedNumBuckets.get + val filePartitions = if (optionalCoalescedNumBuckets.isDefined) { + val newNumBuckets = optionalCoalescedNumBuckets.get logInfo(s"Coalescing to ${newNumBuckets} buckets") val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets) Seq.tabulate(newNumBuckets) { bucketId => @@ -625,6 +610,7 @@ case class FileSourceScanExec( requiredSchema, QueryPlan.normalizePredicates(partitionFilters, output), optionalBucketSet, + optionalCoalescedNumBuckets, QueryPlan.normalizePredicates(dataFilters, output), None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index b0b25924c7ce7..f6d9248d97531 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.execution.bucketing.InjectBucketHint +import org.apache.spark.sql.execution.bucketing.CoalesceBucketInJoin import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.datasources.SchemaPruning import org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown @@ -45,7 +45,7 @@ class SparkOptimizer( Batch("PartitionPruning", Once, PartitionPruning, OptimizeSubqueries) :+ - Batch("Bucketing", Once, InjectBucketHint) :+ + Batch("Bucketing", Once, CoalesceBucketInJoin) :+ Batch("Pushdown Filters from PartitionPruning", fixedPoint, PushDownPredicates) :+ Batch("Cleanup filters that cannot be pushed down", Once, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala new file mode 100644 index 0000000000000..f380dbe746beb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketInJoin extends Rule[LogicalPlan] { + val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets" + + private val sqlConf = SQLConf.get + + private def isPlanEligible(plan: LogicalPlan): Boolean = { + def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) + } + + forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false + } + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { + if (isPlanEligible(plan)) { + plan.collectFirst { + case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) + if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) => + r.bucketSpec.get + } + } else { + None + } + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = { + assert(numBuckets1 != numBuckets2) + val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) + // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller + // number of buckets because bucket id is calculated by modding the total number of buckets. + if ((large % small == 0) && + (large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) { + Some(small) + } else { + None + } + } + + private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = { + plan.transformUp { + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => + l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession)) + } + } + + def apply(plan: LogicalPlan): LogicalPlan = { + if (!sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_ENABLED)) { + return plan + } + + plan transform { + case join: Join if join.joinType == Inner => + val leftBucket = getBucketSpec(join.left) + val rightBucket = getBucketSpec(join.right) + if (leftBucket.isEmpty || rightBucket.isEmpty) { + return plan + } + + val leftBucketNumber = leftBucket.get.numBuckets + val rightBucketNumber = rightBucket.get.numBuckets + if (leftBucketNumber == rightBucketNumber) { + return plan + } + + mayCoalesce(leftBucketNumber, rightBucketNumber).map { coalescedNumBuckets => + val hint = JOIN_HINT_COALESCED_NUM_BUCKETS -> coalescedNumBuckets.toString + if (coalescedNumBuckets != leftBucketNumber) { + join.copy(left = addBucketHint(join.left, hint)) + } else { + join.copy(right = addBucketHint(join.right, hint)) + } + }.getOrElse(join) + + case other => other + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/InjectBucketHint.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/InjectBucketHint.scala deleted file mode 100644 index de36686386c61..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/InjectBucketHint.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.bucketing - -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} - -/** - * This rule injects the bucket related hints and supports the following scenarios: - * - If the two bucketed tables are inner-joined and they differ in the number of buckets, - * the number of buckets for the other table of join is injected as a hint. - */ -object InjectBucketHint extends Rule[LogicalPlan] { - val JOIN_HINT_NUM_BUCKETS: String = "JoinHintNumBuckets" - - private def isPlanEligible(plan: LogicalPlan): Boolean = { - plan.forall { - case _: Filter | _: Project | _: LogicalRelation => true - case _ => false - } - } - - private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { - if (isPlanEligible(plan)) { - plan.collectFirst { - case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) - if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_NUM_BUCKETS) => - r.bucketSpec.get - } - } else { - None - } - } - - def apply(plan: LogicalPlan): LogicalPlan = { - plan transform { - case join : Join if join.joinType == Inner => - val leftBucket = getBucketSpec(join.left) - val rightBucket = getBucketSpec(join.right) - if (leftBucket.isEmpty || - rightBucket.isEmpty || - leftBucket.get.numBuckets == rightBucket.get.numBuckets) { - return plan - } - - def addBucketHint(subPlan: LogicalPlan, numBuckets: Int): LogicalPlan = { - subPlan.transformUp { - case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => - val newOption = JOIN_HINT_NUM_BUCKETS -> numBuckets.toString - l.copy(relation = r.copy(options = r.options + newOption)(r.sparkSession)) - } - } - - join.copy( - left = addBucketHint(join.left, rightBucket.get.numBuckets), - right = addBucketHint(join.right, leftBucket.get.numBuckets)) - case other => other - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index f45495121a980..37407f5fb9607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.bucketing.CoalesceBucketInJoin import org.apache.spark.util.collection.BitSet /** @@ -205,6 +206,7 @@ object FileSourceStrategy extends Strategy with Logging { outputSchema, partitionKeyFilters.toSeq, bucketSet, + fsRelation.options.get(CoalesceBucketInJoin.JOIN_HINT_COALESCED_NUM_BUCKETS).map(_.toInt), dataFilters, table.map(_.identifier)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 6b772e53ac184..0b4f43b72366b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -345,7 +345,7 @@ class DataFrameJoinSuite extends QueryTest } assert(broadcastExchanges.size == 1) val tables = broadcastExchanges.head.collect { - case FileSourceScanExec(_, _, _, _, _, _, Some(tableIdent)) => tableIdent + case FileSourceScanExec(_, _, _, _, _, _, _, Some(tableIdent)) => tableIdent } assert(tables.size == 1) assert(tables.head === TableIdentifier(table1Name, Some(dbName))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index ff8f94c68c5ee..40d90df3655a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1311,7 +1311,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark // need to execute the query before we can examine fs.inputRDDs() assert(stripAQEPlan(df.queryExecution.executedPlan) match { case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( - fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)))) => + fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _, _)))) => partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 58106dd59753b..be750ea84d38d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -876,26 +876,28 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } test("bucket coalescing eliminates shuffle") { - withSQLConf(SQLConf.BUCKETING_COALESCE_ENABLED.key -> "true") { - // Left side will be coalesced to have 4 output partitions. + withSQLConf(SQLConf.COALESCE_BUCKET_IN_JOIN_ENABLED.key -> "true") { + // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. // Currently, sort will be introduced for the side that is coalesced. - val bucketedTableTestSpecLeft = BucketedTableTestSpec( + val testSpec1 = BucketedTableTestSpec( Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), numPartitions = 1, expectedShuffle = false, expectedSort = true, expectedNumOutputPartitions = Some(4)) - val bucketedTableTestSpecRight = BucketedTableTestSpec( + val testSpec2 = BucketedTableTestSpec( Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), numPartitions = 1, expectedShuffle = false, expectedSort = false, expectedNumOutputPartitions = Some(4)) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft, - bucketedTableTestSpecRight = bucketedTableTestSpecRight, - joinCondition = joinCondition(Seq("i", "j"))) + Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => + testBucketing( + bucketedTableTestSpecLeft = specs._1, + bucketedTableTestSpecRight = specs._2, + joinCondition = joinCondition(Seq("i", "j"))) + } } } }