Skip to content

Commit

Permalink
SPARK-30598: Detect equijoins better
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Jan 21, 2020
1 parent cfb1706 commit ca782e7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
val joinKeys = predicates.flatMap {
val explicitJoinKeys = predicates.flatMap {
case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
Expand All @@ -203,6 +203,27 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
)
case other => None
}

val literalEqualities = predicates.collect {
case EqualTo(l, r: Literal) if canEvaluate(l, left) && l.deterministic =>
r -> (Some(l), None)
case EqualTo(l, r: Literal) if canEvaluate(l, right) && l.deterministic =>
r -> (None, Some(l))
case EqualTo(l: Literal, r) if canEvaluate(r, left) && r.deterministic =>
l -> (Some(r), None)
case EqualTo(l: Literal, r) if canEvaluate(r, right) && r.deterministic =>
l -> (None, Some(r))
}.groupBy(_._1).mapValues { v =>
val (l, r) = v.map(_._2).unzip
(l.flatten, r.flatten)
}

val implicitJoinKeys = literalEqualities.values.flatMap {
case (xs, ys) => for { x <- xs; y <- ys } yield (x, y)
}

val joinKeys = (explicitJoinKeys.toSet ++ implicitJoinKeys).toSeq

val otherPredicates = predicates.filterNot {
case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => false
case Equality(l, r) =>
Expand Down
29 changes: 29 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1082,4 +1082,33 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
assert(df2.join(df1, "id").collect().isEmpty)
}
}

test("Detect equijoins better") {
val df1 = Seq((1, 1), (2, 2)).toDF("c1", "c2")
val df2 = Seq((2, 2), (3, 3)).toDF("c1", "c2")

val explicitConstraints = df1("c1") === 2 && df2("c1") === 2
val implicitConstraints = df1("c1") === df2("c1")

val explicitDF = df1.join(df2, explicitConstraints && implicitConstraints, "FullOuter")
val implicitDF = df1.join(df2, explicitConstraints, "FullOuter")

checkAnswer(explicitDF, implicitDF)
assert(
explicitDF.queryExecution.sparkPlan === implicitDF.queryExecution.sparkPlan,
"Explicit and implicit plans should match.")

val explicitConstraints2 =
df1("c1") === 2 && df1("c2") === 2 && df2("c1") === 2 && df2("c2") === 2
val implicitConstraints2 = df1("c1") === df2("c1") && df1("c1") === df2("c2") &&
df1("c2") === df2("c1") && df1("c2") === df2("c2")

val explicitDF2 = df1.join(df2, explicitConstraints2 && implicitConstraints2, "FullOuter")
val implicitDF2 = df1.join(df2, explicitConstraints2, "FullOuter")

checkAnswer(explicitDF2, implicitDF2)
assert(
explicitDF2.queryExecution.sparkPlan === implicitDF2.queryExecution.sparkPlan,
"Explicit and implicit plans should match.")
}
}

0 comments on commit ca782e7

Please sign in to comment.