From fe07521c9efd9ce0913eee0d42b0ffd98b1225ec Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 20 Jul 2020 14:38:43 +0000 Subject: [PATCH] [SPARK-32330][SQL] Preserve shuffled hash join build side partitioning ### What changes were proposed in this pull request? Currently `ShuffledHashJoin.outputPartitioning` inherits from `HashJoin.outputPartitioning`, which only preserves stream side partitioning (`HashJoin.scala`): ``` override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning ``` This loses build side partitioning information, and causes extra shuffle if there's another join / group-by after this join. Example: ``` withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { val df1 = spark.range(10).select($"id".as("k1")) val df2 = spark.range(30).select($"id".as("k2")) Seq("inner", "cross").foreach(joinType => { val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count() .queryExecution.executedPlan assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1) // No extra shuffle before aggregate assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2) }) } ``` Current physical plan (having an extra shuffle on `k1` before aggregate) ``` *(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L]) +- Exchange hashpartitioning(k1#220L, 2), true, [id=#117] +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L]) +- *(3) Project [k1#220L] +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109] : +- *(1) Project [id#218L AS k1#220L] : +- *(1) Range (0, 10, step=1, splits=2) +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111] +- *(2) Project [id#222L AS k2#224L] +- *(2) Range (0, 30, step=1, splits=2) ``` Ideal physical plan (no shuffle on `k1` before aggregate) ``` *(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L]) +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L]) +- *(3) Project [k1#220L] +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107] : +- *(1) Project [id#218L AS k1#220L] : +- *(1) Range (0, 10, step=1, splits=2) +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109] +- *(2) Project [id#222L AS k2#224L] +- *(2) Range (0, 30, step=1, splits=2) ``` This can be fixed by overriding `outputPartitioning` method in `ShuffledHashJoinExec`, similar to `SortMergeJoinExec`. In addition, also fix one typo in `HashJoin`, as that code path is shared between broadcast hash join and shuffled hash join. ### Why are the changes needed? To avoid shuffle (for queries having multiple joins or group-by), for saving CPU and IO. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `JoinSuite`. Closes #29130 from c21/shj. Authored-by: Cheng Su Signed-off-by: Wenchen Fan --- .../spark/sql/execution/joins/HashJoin.scala | 2 +- .../joins/ShuffledHashJoinExec.scala | 5 +-- .../sql/execution/joins/ShuffledJoin.scala | 43 +++++++++++++++++++ .../execution/joins/SortMergeJoinExec.scala | 17 +------- .../org/apache/spark/sql/JoinSuite.scala | 18 ++++++++ 5 files changed, 66 insertions(+), 19 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 7c3c53b0fa54c..8d9ba54f6568d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -215,7 +215,7 @@ trait HashJoin extends BaseJoinExec { existenceJoin(streamedIter, hashed) case x => throw new IllegalArgumentException( - s"BroadcastHashJoin should not take $x as the JoinType") + s"HashJoin should not take $x as the JoinType") } val resultProj = createResultProjection diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 1120850fdddaf..3b398dd7120c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -40,15 +40,14 @@ case class ShuffledHashJoinExec( condition: Option[Expression], left: SparkPlan, right: SparkPlan) - extends HashJoin { + extends HashJoin with ShuffledJoin { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"), "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map")) - override def requiredChildDistribution: Seq[Distribution] = - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = { val buildDataSize = longMetric("buildDataSize") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala new file mode 100644 index 0000000000000..7035ddc35be9c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, LeftExistence, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution, Partitioning, PartitioningCollection, UnknownPartitioning} + +/** + * Holds common logic for join operators by shuffling two child relations + * using the join keys. + */ +trait ShuffledJoin extends BaseJoinExec { + override def requiredChildDistribution: Seq[Distribution] = { + HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + } + + override def outputPartitioning: Partitioning = joinType match { + case _: InnerLike => + PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) + case LeftOuter => left.outputPartitioning + case RightOuter => right.outputPartitioning + case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions) + case LeftExistence(_) => left.outputPartitioning + case x => + throw new IllegalArgumentException( + s"ShuffledJoin should not take $x as the JoinType") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 2c57956de5bca..b9f6684447dd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -41,7 +41,7 @@ case class SortMergeJoinExec( condition: Option[Expression], left: SparkPlan, right: SparkPlan, - isSkewJoin: Boolean = false) extends BaseJoinExec with CodegenSupport { + isSkewJoin: Boolean = false) extends ShuffledJoin with CodegenSupport { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -72,26 +72,13 @@ case class SortMergeJoinExec( } } - override def outputPartitioning: Partitioning = joinType match { - case _: InnerLike => - PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) - // For left and right outer joins, the output is partitioned by the streamed input's join keys. - case LeftOuter => left.outputPartitioning - case RightOuter => right.outputPartitioning - case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions) - case LeftExistence(_) => left.outputPartitioning - case x => - throw new IllegalArgumentException( - s"${getClass.getSimpleName} should not take $x as the JoinType") - } - override def requiredChildDistribution: Seq[Distribution] = { if (isSkewJoin) { // We re-arrange the shuffle partitions to deal with skew join, and the new children // partitioning doesn't satisfy `HashClusteredDistribution`. UnspecifiedDistribution :: UnspecifiedDistribution :: Nil } else { - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + super.requiredChildDistribution } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index f24da6df67ca0..b4f626270cfc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrd import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python.BatchEvalPythonExec import org.apache.spark.sql.internal.SQLConf @@ -1086,4 +1087,21 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan assert(df2.join(df1, "id").collect().isEmpty) } } + + test("SPARK-32330: Preserve shuffled hash join build side partitioning") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = spark.range(10).select($"id".as("k1")) + val df2 = spark.range(30).select($"id".as("k2")) + Seq("inner", "cross").foreach(joinType => { + val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count() + .queryExecution.executedPlan + assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1) + // No extra shuffle before aggregate + assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2) + }) + } + } }