Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32330][SQL] Preserve shuffled hash join build side partitioning #29130

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ trait HashJoin extends BaseJoinExec {
existenceJoin(streamedIter, hashed)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
s"HashJoin should not take $x as the JoinType")
}

val resultProj = createResultProjection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ case class ShuffledHashJoinExec(
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))

override def outputPartitioning: Partitioning = joinType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the same as SMJ. Shall we create a common trait ShuffleJoin to put it?

Copy link
Contributor Author

@c21 c21 Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - there's an extra case for sort merge join to handle full outer join. I am thinking to handle all other join types in parent trait ShuffleJoin, and override outputPartitioning in SortMergeJoinExec to handle the extra FullOuter? What do you think?

But for me it's kind of weird that ShuffleJoin not handle FullOuter as shuffled FullOuter join is one kind of ShuffleJoin. But if ShuffleJoin handles FullOuter, it seems to be also weird that ShuffledHashJoinExec extends it.

Wondering what do you think? The change itself is easy. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does shuffle hash join not support FullOuter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does shuffle hash join not support FullOuter?

@cloud-fan sorry if I miss anything, but isn't this true now? Given current spark implementation for hash join, stream side looks up in build side hash map, it can handle non-matching keys from stream side if there's no match in build side hash map. But it cannot handle non-matching keys from build side, as there's no info persisted from stream side.

I feel an interesting followup could be to handle full outer join in shuffled hash join, where when looking up stream side keys from build side HashedRelation. Mark this info inside build side HashedRelation, and after reading all rows from stream side, output all non-matching rows from build side based on modified HashedRelation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply move SortMergeJoinExec.outputPartitioning to the parent trait? It works for ShuffledHashJoinExec as well, as the planner guarantees ShuffledHashJoinExec.joinType won't be FullOuter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I agree. Updated.

case _: InnerLike =>
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
// For left and right outer joins, the output is partitioned by the streamed input's join keys.
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
case LeftExistence(_) => left.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"${getClass.getSimpleName} should not take $x as the JoinType")
}

override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil

Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrd
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.BatchEvalPythonExec
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1086,4 +1087,21 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
assert(df2.join(df1, "id").collect().isEmpty)
}
}

test("SPARK-32330: Preserve shuffled hash join build side partitioning") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: set it to "-1" to make the intention (turning off broadcast join) clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. Thanks!

SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(30).select($"id".as("k2"))
Seq("inner", "cross").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
.queryExecution.executedPlan
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra shuffle before aggregate
assert(plan.collect { case _: ShuffleExchangeExec => true }.size === 2)
})
}
}
}