Skip to content

Commit

Permalink
[SPARK-32330][SQL] Preserve shuffled hash join build side partitioning
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Currently `ShuffledHashJoin.outputPartitioning` inherits from `HashJoin.outputPartitioning`, which only preserves stream side partitioning (`HashJoin.scala`):

```
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
```

This loses build side partitioning information, and causes extra shuffle if there's another join / group-by after this join.

Example:

```
withSQLConf(
    SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
    SQLConf.SHUFFLE_PARTITIONS.key -> "2",
    SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
  val df1 = spark.range(10).select($"id".as("k1"))
  val df2 = spark.range(30).select($"id".as("k2"))
  Seq("inner", "cross").foreach(joinType => {
    val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
      .queryExecution.executedPlan
    assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
    // No extra shuffle before aggregate
    assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
  })
}
```

Current physical plan (having an extra shuffle on `k1` before aggregate)

```
*(4) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L])
+- Exchange hashpartitioning(k1#220L, 2), true, [id=#117]
   +- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L])
      +- *(3) Project [k1#220L]
         +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
            :- Exchange hashpartitioning(k1#220L, 2), true, [id=#109]
            :  +- *(1) Project [id#218L AS k1#220L]
            :     +- *(1) Range (0, 10, step=1, splits=2)
            +- Exchange hashpartitioning(k2#224L, 2), true, [id=#111]
               +- *(2) Project [id#222L AS k2#224L]
                  +- *(2) Range (0, 30, step=1, splits=2)
```

Ideal physical plan (no shuffle on `k1` before aggregate)

```
*(3) HashAggregate(keys=[k1#220L], functions=[count(1)], output=[k1#220L, count#235L])
+- *(3) HashAggregate(keys=[k1#220L], functions=[partial_count(1)], output=[k1#220L, count#239L])
   +- *(3) Project [k1#220L]
      +- ShuffledHashJoin [k1#220L], [k2#224L], Inner, BuildLeft
         :- Exchange hashpartitioning(k1#220L, 2), true, [id=#107]
         :  +- *(1) Project [id#218L AS k1#220L]
         :     +- *(1) Range (0, 10, step=1, splits=2)
         +- Exchange hashpartitioning(k2#224L, 2), true, [id=#109]
            +- *(2) Project [id#222L AS k2#224L]
               +- *(2) Range (0, 30, step=1, splits=2)
```

This can be fixed by overriding `outputPartitioning` method in `ShuffledHashJoinExec`, similar to `SortMergeJoinExec`.
In addition, also fix one typo in `HashJoin`, as that code path is shared between broadcast hash join and shuffled hash join.

### Why are the changes needed?

To avoid shuffle (for queries having multiple joins or group-by), for saving CPU and IO.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `JoinSuite`.

Closes #29130 from c21/shj.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
c21 authored and cloud-fan committed Jul 20, 2020
1 parent e0ecb66 commit fe07521
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ trait HashJoin extends BaseJoinExec {
existenceJoin(streamedIter, hashed)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
s"HashJoin should not take $x as the JoinType")
}

val resultProj = createResultProjection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ case class ShuffledHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends HashJoin {
extends HashJoin with ShuffledJoin {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning

private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.joins

import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, LeftExistence, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution, Partitioning, PartitioningCollection, UnknownPartitioning}

/**
* Holds common logic for join operators by shuffling two child relations
* using the join keys.
*/
trait ShuffledJoin extends BaseJoinExec {
override def requiredChildDistribution: Seq[Distribution] = {
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
}

override def outputPartitioning: Partitioning = joinType match {
case _: InnerLike =>
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
case LeftExistence(_) => left.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"ShuffledJoin should not take $x as the JoinType")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class SortMergeJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean = false) extends BaseJoinExec with CodegenSupport {
isSkewJoin: Boolean = false) extends ShuffledJoin with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down Expand Up @@ -72,26 +72,13 @@ case class SortMergeJoinExec(
}
}

override def outputPartitioning: Partitioning = joinType match {
case _: InnerLike =>
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
// For left and right outer joins, the output is partitioned by the streamed input's join keys.
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
case LeftExistence(_) => left.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"${getClass.getSimpleName} should not take $x as the JoinType")
}

override def requiredChildDistribution: Seq[Distribution] = {
if (isSkewJoin) {
// We re-arrange the shuffle partitions to deal with skew join, and the new children
// partitioning doesn't satisfy `HashClusteredDistribution`.
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
} else {
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
super.requiredChildDistribution
}
}

Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrd
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.BatchEvalPythonExec
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1086,4 +1087,21 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
assert(df2.join(df1, "id").collect().isEmpty)
}
}

test("SPARK-32330: Preserve shuffled hash join build side partitioning") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(30).select($"id".as("k2"))
Seq("inner", "cross").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
.queryExecution.executedPlan
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra shuffle before aggregate
assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
})
}
}
}

0 comments on commit fe07521

Please sign in to comment.