Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32282][SQL] Improve EnsureRquirement.reorderJoinKeys to handle more scenarios such as PartitioningCollection #29074

Closed
wants to merge 13 commits into from

Conversation

imback82
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to improve EnsureRquirement.reorderJoinKeys to handle the following scenarios:

  1. If the keys cannot be reordered to match the left-side HashPartitioning, consider the right-side HashPartitioning.
  2. Handle PartitioningCollection, which may contain HashPartitioning

Why are the changes needed?

  1. For the scenario 1), the current behavior matches either the left-side HashPartitioning or the right-side HashPartitioning. This means that if both sides are HashPartitioning, it will try to match only the left side.
    The following will not consider the right-side HashPartitioning:
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
 join.explain

== Physical Plan ==
*(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0.
   +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79].       <===== This can be removed
      +- *(3) Project [i2#30, j2#31]
         +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4

  1. For the scenario 2), the current behavior does not handle PartitioningCollection:
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain

== Physical Plan ==
*(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner
:- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0.       <===== This can be removed
:  +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58]             <===== This can be removed
:     +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:        :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:        :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45]
:        :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:        :        +- *(1) LocalTableScan [_1#2, _2#3]
:        +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:           +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51]
:              +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:                 +- *(3) LocalTableScan [_1#13, _2#14]
+- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64]
      +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(7) LocalTableScan [_1#24, _2#25]

Does this PR introduce any user-facing change?

Yes, now from the above examples, the shuffle/sort nodes pointed by This can be removed are now removed:

  1. Senario 1):
== Physical Plan ==
*(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0
   +- *(3) Project [i2#30, j2#31]
      +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
         +- *(3) ColumnarToRow
            +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
  1. Scenario 2):
== Physical Plan ==
*(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner
:- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:  :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43]
:  :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:  :        +- *(1) LocalTableScan [_1#2, _2#3]
:  +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49]
:        +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:           +- *(3) LocalTableScan [_1#13, _2#14]
+- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58]
      +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(6) LocalTableScan [_1#24, _2#25]

How was this patch tested?

Added tests.

@imback82 imback82 changed the title [SPARK-xxx][SQL] Improve EnsureRquirement.reorderJoinKeys to handle more scenarios such as PartitioningCollection [SPARK-32282][SQL] Improve EnsureRquirement.reorderJoinKeys to handle more scenarios such as PartitioningCollection Jul 12, 2020
leftKeys, rightKeys, UnknownPartitioning(0), rightPartitioning))
case (_, HashPartitioning(rightExpressions, _)) =>
reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys)
.orElse(reorderJoinKeysRecursively(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be also implemented by looking at left partitioning first then move to the right partitionoing:

    (leftPartitioning, rightPartitioning) match {
      case (HashPartitioning(leftExpressions, _), _) =>
        reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leftExpressions, leftKeys)
          .orElse(reorderJoinKeysRecursively(
            leftKeys, rightKeys, UnknownPartitioning(0), rightPartitioning))
      case (PartitioningCollection(partitionings), _) =>
        partitionings.foreach { p =>
          reorderJoinKeysRecursively(leftKeys, rightKeys, p, rightPartitioning).map { k =>
            return Some(k)
          }
        }
        reorderJoinKeysRecursively(leftKeys, rightKeys, UnknownPartitioning(0), rightPartitioning)
      case (_, HashPartitioning(rightExpressions, _)) =>
        reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys)
      case (_, PartitioningCollection(partitionings)) =>
        partitionings.foreach { p =>
          reorderJoinKeysRecursively(leftKeys, rightKeys, leftPartitioning, p).map { k =>
            return Some(k)
          }
        }
        None
      case _ =>
        None
    }

However, I chose this way so that the behavior remains the same. If you have leftPartitioning = PartitioningCollection and rightPartitioning = HashPartitioning, it will match the rightPartitioning first, which is the existing behavior.

@SparkQA
Copy link

SparkQA commented Jul 12, 2020

Test build #125704 has finished for PR 29074 at commit 99493e4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 12, 2020

Test build #125722 has finished for PR 29074 at commit 99493e4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

cc: @maropu @cloud-fan

@maropu
Copy link
Member

maropu commented Jul 13, 2020

also cc: @viirya

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125890 has finished for PR 29074 at commit ab237bc.

  • This patch fails to build.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125917 has finished for PR 29074 at commit 8308649.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

Gentle ping @cloud-fan / @maropu / @viirya

@SparkQA
Copy link

SparkQA commented Aug 2, 2020

Test build #126928 has finished for PR 29074 at commit 268326b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2020

Test build #127164 has finished for PR 29074 at commit e5b078f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class EnsureRequirementsSuite extends SharedSparkSession

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay except for the existing minor comments. @cloud-fan @viirya

@SparkQA
Copy link

SparkQA commented Aug 7, 2020

Test build #127213 has finished for PR 29074 at commit 89ad6ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Aug 22, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Aug 22, 2020

Test build #127775 has finished for PR 29074 at commit 89ad6ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @imback82 for adding this. This is important for complex bucketed-related queries to avoid shuffle. LGTM, just a nit.

@SparkQA
Copy link

SparkQA commented Aug 24, 2020

Test build #127850 has finished for PR 29074 at commit 1729c8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

Gentle ping. @cloud-fan, do you think this PR can move forward? Thanks in advance!

@maropu
Copy link
Member

maropu commented Sep 9, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Sep 9, 2020

Test build #128455 has finished for PR 29074 at commit 1729c8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Sep 28, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33799/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33799/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Test build #129183 has finished for PR 29074 at commit 1729c8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case (Some(PartitioningCollection(partitionings)), _) =>
partitionings.foreach { p =>
reorderJoinKeysRecursively(leftKeys, rightKeys, Some(p), rightPartitioning).map { k =>
return Some(k)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

partitionings.foldLeft(None) { (res, p) =>
  res.orElse(reorderJoinKeysRecursively...)
}.getOrElse(reorderJoinKeysRecursively(leftKeys, rightKeys, None, rightPartitioning))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _), _, _), _), _) =>
assert(leftKeys !== smjExec1.leftKeys)
assert(rightKeys !== smjExec1.rightKeys)
assert(leftKeys === leftPartitionings.head.asInstanceOf[HashPartitioning].expressions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simply check leftKeys === Seq(exprA, exprB)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I simplified the checks in this test.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except some minor comments

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34089/

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34089/

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Test build #129483 has finished for PR 29074 at commit 10b4d5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

res.orElse(reorderJoinKeysRecursively(leftKeys, rightKeys, Some(p), rightPartitioning))
}.orElse(reorderJoinKeysRecursively(leftKeys, rightKeys, None, rightPartitioning))
case (_, Some(PartitioningCollection(partitionings))) =>
partitionings.foreach { p =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do the same refactor here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

case SortMergeJoinExec(leftKeys, rightKeys, _, _,
SortExec(_, _, DummySparkPlan(_, _, _: PartitioningCollection, _, _), _),
SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _), _), _) =>
assert(leftKeys !== smjExec1.leftKeys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check needed? We already check leftKeys === Seq(exprA, exprB) and it's obvious that leftKeys !== smjExec1.leftKeys

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks!

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34128/

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34128/

@SparkQA
Copy link

SparkQA commented Oct 7, 2020

Test build #129523 has finished for PR 29074 at commit 3cd6df9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 1c781a4 Oct 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants