Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable #28123

Closed
wants to merge 27 commits into from

Conversation

imback82
Copy link
Contributor

@imback82 imback82 commented Apr 4, 2020

What changes were proposed in this pull request?

When two bucketed tables with different number of buckets are joined, it can introduce a full shuffle:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain

== Physical Plan ==
*(5) SortMergeJoin [i#44], [i#50], Inner
:- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
:     +- *(1) Project [i#44, j#45, k#46]
:        +- *(1) Filter isnotnull(i#44)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
      +- *(3) Project [i#50, j#51, k#52]
         +- *(3) Filter isnotnull(i#50)
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4

This PR proposes to introduce coalescing buckets when the following conditions are met to eliminate the full shuffle:

  • Join is the sort merge one (which is created only for equi-join).
  • Join keys match with output partition expressions on their respective sides.
  • The larger bucket number is divisible by the smaller bucket number.
  • spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled is set to true.
  • The ratio of the number of buckets should be less than the value set in spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio.

Why are the changes needed?

Eliminating the full shuffle can benefit for scenarios where two large tables are joined. Especially when the tables are already bucketed but differ in the number of buckets, we could take advantage of it.

Does this PR introduce any user-facing change?

If the bucket coalescing conditions explained above are met, a full shuffle can be eliminated (also note that you will see SelectedBucketsCount: 8 out of 8 (Coalesced to 4) in the physical plan):

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled", "true")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain

== Physical Plan ==
*(3) SortMergeJoin [i#44], [i#50], Inner
:- *(1) Sort [i#44 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i#44, j#45, k#46]
:     +- *(1) Filter isnotnull(i#44)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, DataFilters: [isnotnull(i#44)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8 (Coalesced to 4)
+- *(2) Sort [i#50 ASC NULLS FIRST], false, 0
   +- *(2) Project [i#50, j#51, k#52]
      +- *(2) Filter isnotnull(i#50)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4

How was this patch tested?

Added unit tests

@imback82 imback82 changed the title [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable [WIP][SPARK-31350][SQL] Coalesce bucketed tables for join if applicable Apr 5, 2020
@imback82
Copy link
Contributor Author

imback82 commented Apr 5, 2020

@cloud-fan @maropu @gatorsmile Could you check this PR and see what you think about the approach? Thanks in advance!

@SparkQA
Copy link

SparkQA commented Apr 5, 2020

Test build #120822 has finished for PR 28123 at commit a15af6b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2020

Test build #120821 has finished for PR 28123 at commit 94720e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82 imback82 changed the title [WIP][SPARK-31350][SQL] Coalesce bucketed tables for join if applicable [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable Apr 6, 2020
@maropu
Copy link
Member

maropu commented Apr 9, 2020

This optimization always works well? e.g., 100000 buckets vs 2 buckets.

@imback82
Copy link
Contributor Author

imback82 commented Apr 9, 2020

This optimization always works well? e.g., 100000 buckets vs 2 buckets.

Not always, so this is enabled if spark.sql.bucketing.coalesce is set to true.

@maropu
Copy link
Member

maropu commented Apr 10, 2020

This optimization always works well? e.g., 100000 buckets vs 2 buckets.

Not always, so this is enabled if spark.sql.bucketing.coalesce is set to true.

If so, I think we need a threshold config to turn of this optimization if the numbers of joined buckets have a large gap.

@maropu
Copy link
Member

maropu commented Apr 10, 2020

also cc: @viirya

@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121238 has finished for PR 28123 at commit 40b0707.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124194 has finished for PR 28123 at commit e231268.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

isScanOperation(s.left) &&
isScanOperation(s.right) &&
satisfiesOutputPartitioning(s.leftKeys, s.left.outputPartitioning) &&
satisfiesOutputPartitioning(s.rightKeys, s.right.outputPartitioning)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR but just an idea: we don't need to do bucket scan at all if it can't save shuffles. This can increase parallelism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a follow-up issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give it a shot after this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to do bucket scan at all if it can't save shuffles. This can increase parallelism.

@cloud-fan IMO there's other benefit to do bucket scan even though it can't save shuffle, e.g. bucket filter push down. So we probably need to take that into consideration before disabling bucketing.

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124205 has finished for PR 28123 at commit e231268.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124229 has finished for PR 28123 at commit 38c7d6e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124258 has finished for PR 28123 at commit 62a04a3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124271 has finished for PR 28123 at commit 62a04a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 7b86838 Jun 19, 2020
@maropu
Copy link
Member

maropu commented Jun 19, 2020

Thanks! I appreciate your hard work, @imback82 ! Merged to master. Also, thanks for the reviews, @cloud-fan @viirya !

@c21
Copy link
Contributor

c21 commented Jul 11, 2020

Thanks @imback82 for making this change!

Sorry for late comment, just a few questions:

(1).Is there a reason why we don't cover ShuffledHashJoin as well? (we are seeing in production, people also use ShuffledHashJoin a lot for joining bucketed tables when one side is small)

(2).Per this PR, the ordering property of coalesced bucket files does not preserve (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L317), and the ordering can be preserved through a sort-merge-way read of all sorted buckets file. This can help when reading multiple partitions of bucketed table.

(3).We are seeing in production, coalescing might hurt the parallelism, if the number of buckets are too few. Another way to avoid shuffle and sort, is to split/divide the table with less buckets. E.g. joining tables with t1 (8 buckets) and t2 (32 buckets), we can keep number of tasks to be 32, and each task for reading t1 table will have a filter at run-time to only keep its portion of table (divide the table with less buckets). This has downside of reading the t1 more than once from multiple tasks, but if the size of t1 is not big, it's a good trade off to have more parallelism (and may be better than shuffling t1 directly).

We are running above 3 features years in facebook (https://databricks.com/session_eu19/spark-sql-bucketing-at-facebook), and I would like to make or help the followup change if this sounds reasonable for everyone. cc @imback82, @cloud-fan, @maropu , @viirya, @gatorsmile and @sameeragarwal.

@maropu
Copy link
Member

maropu commented Jul 12, 2020

Thanks for your interest, @c21

(1).Is there a reason why we don't cover ShuffledHashJoin as well? (we are seeing in production, people also use ShuffledHashJoin a lot for joining bucketed tables when one side is small)

As you said in (3), too, I think that's because there is a concern where coalescing might hurt the parallelism. You can see the related discussion in the history: #28123 (comment)
As for (1) and (3), IMO its worth digging into it for more improvements.

(2).Per this PR, the ordering property of coalesced bucket files does not preserve (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L317), and the ordering can be preserved through a sort-merge-way read of all sorted buckets file. This can help when reading multiple partitions of bucketed table.

I think that's the long-standing issue we have. Have you checked the discussion in SPARK-24528? If you're interested in the issue, you can revisit it there.

@imback82
Copy link
Contributor Author

(3).We are seeing in production, coalescing might hurt the parallelism, if the number of buckets are too few. Another way to avoid shuffle and sort, is to split/divide the table with less buckets. E.g. joining tables with t1 (8 buckets) and t2 (32 buckets), we can keep number of tasks to be 32, and each task for reading t1 table will have a filter at run-time to only keep its portion of table (divide the table with less buckets). This has downside of reading the t1 more than once from multiple tasks, but if the size of t1 is not big, it's a good trade off to have more parallelism (and may be better than shuffling t1 directly).

I had a rough POC few months back, where each row is filtered out based on its bucket id, but I never got a chance to run benchmarks. @c21 Do you have some numbers to share? I am wondering how reading multiple copies impacts the overall runtime.

@c21
Copy link
Contributor

c21 commented Jul 12, 2020

Thank you @maropu and @imback82

As for (1) and (3), IMO its worth digging into it for more improvements.

For (1): I created a PR to cover shuffled hash join as well - #29079. Could you help take a look? Thanks.

I had a rough POC few months back, where each row is filtered out based on its bucket id, but I never got a chance to run benchmarks. @c21 Do you have some numbers to share? I am wondering how reading multiple copies impacts the overall runtime.

We have some internal numbers in production, e.g. we are seeing like 50% CPU improvement for specific queries. So comparing divide (extra N times for reading input table, N is the ratio between two tables buckets) and non-divide (extra 1 time for shuffle write, and 1 time for read), if the join big tables buckets ratio is 2 (e.g. join 510 buckets with 1024 buckets), dividing can be better than non-dividing as we only do 1 extra read for input table, but avoid 1 shuffle write and 1 shuffle read (but this also depends on efficiency of shuffled service). I think we can add benchmark in JoinBenchmark for joining table with buckets ratio to be 2 to showcase this improvement.

Re POC - I feel overall approach looks good to me. But IMO I think we should do the coalesce/divide in physical plan rule, but not logical plan rule. Also I think we probably can leave vectorization code path aside now, as it introduces too much change to handle vectorization when doing filter for it. Let me know if you are still on it, or I can help with this feature as well. Thanks.

@imback82
Copy link
Contributor Author

Thanks @c21!

Re POC - I feel overall approach looks good to me. But IMO I think we should do the coalesce/divide in physical plan rule, but not logical plan rule.

Yes, I agree. (The POC was done before this PR, which also started with logical plan but changed to physical plan rule per suggestion)

Also I think we probably can leave vectorization code path aside now, as it introduces too much change to handle vectorization when doing filter for it.

Without supporting ColumnarBatch, you cannot enable the wholestage codegen. Is that desirable?

Let me know if you are still on it, or I can help with this feature as well. Thanks.

I will clean up the POC and create a PR with some benchmark numbers.

@c21
Copy link
Contributor

c21 commented Jul 13, 2020

Here are some numbers when I joined two tables (store_sales from TPC-DS - 100 SF) and did count on it. It's run on 8 executors (8 cores each) and generates about 47GB of shuffle.

Bucket Size #1 Bucket Size #2 Coalesce On (ms) Coalesce Off (ms) Gain (%)
512 256 40495 48435 16.39310416
512 128 42459 49597 14.39199952
512 64 45760 48888 6.398298151
256 128 41241 49034 15.8930538
256 64 42902 51063 15.98221804
128 64 44131 53192 17.03451647
There is a modest 15% gain (for ratio up to 4), WDYT?

@imback82 sounds good. I am refactoring and to making the coalesce cover shuffled hash join as well in (#29079).

I would like to run same query as you did here, and verified performance numbers. Wondering could you share more instructions for which query in TPCDS you ran, or the query you used if you changed based the benchmark? Thanks.

Without supporting ColumnarBatch, you cannot enable the wholestage codegen

Sorry I might miss something, but I feel it's not true. From my understanding, wholestage codegen is on row-based, cannot work with vectorization yet.

maropu pushed a commit that referenced this pull request Jul 21, 2020
…applicable

### What changes were proposed in this pull request?
Based on a follow up comment in #28123, where we can coalesce buckets for shuffled hash join as well. The note here is we only coalesce the buckets from shuffled hash join stream side (i.e. the side not building hash map), so we don't need to worry about OOM when coalescing multiple buckets in one task for building hash map.

> If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.

Refactor existing physical plan rule `CoalesceBucketsInSortMergeJoin` to `CoalesceBucketsInJoin`, for covering shuffled hash join as well.
Refactor existing unit test `CoalesceBucketsInSortMergeJoinSuite` to `CoalesceBucketsInJoinSuite`, for covering shuffled hash join as well.

### Why are the changes needed?
Avoid shuffle for joining different bucketed tables, is also useful for shuffled hash join. In production, we are seeing users to use shuffled hash join to join bucketed tables (set `spark.sql.join.preferSortMergeJoin`=false, to avoid sort), and this can help avoid shuffle if number of buckets are not same.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added unit tests in `CoalesceBucketsInJoinSuite` for verifying shuffled hash join physical plan.

### Performance number per request from maropu

I was looking at TPCDS per suggestion from maropu. But I found most of queries from TPCDS are doing aggregate, and only several ones are doing join. None of input tables are bucketed. So I took the approach to test a modified version of `TPCDS q93` as

```
SELECT ss_ticket_number, sr_ticket_number
FROM store_sales
JOIN store_returns
ON ss_ticket_number = sr_ticket_number
```

And make `store_sales` and `store_returns` to be bucketed tables.

Physical query plan without coalesce:

```
ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft
:- Exchange hashpartitioning(ss_ticket_number#109L, 4), true, [id=#67]
:  +- *(1) Project [ss_ticket_number#109L]
:     +- *(1) Filter isnotnull(ss_ticket_number#109L)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2
+- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130]
   +- *(2) Filter isnotnull(sr_ticket_number#120L)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4
```

Physical query plan with coalesce:

```
ShuffledHashJoin [ss_ticket_number#109L], [sr_ticket_number#120L], Inner, BuildLeft
:- *(1) Project [ss_ticket_number#109L]
:  +- *(1) Filter isnotnull(ss_ticket_number#109L)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.store_sales[ss_ticket_number#109L] Batched: true, DataFilters: [isnotnull(ss_ticket_number#109L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_ticket_number:bigint>, SelectedBucketsCount: 2 out of 2
+- *(2) Project [sr_returned_date_sk#111L, sr_return_time_sk#112L, sr_item_sk#113L, sr_customer_sk#114L, sr_cdemo_sk#115L, sr_hdemo_sk#116L, sr_addr_sk#117L, sr_store_sk#118L, sr_reason_sk#119L, sr_ticket_number#120L, sr_return_quantity#121L, sr_return_amt#122, sr_return_tax#123, sr_return_amt_inc_tax#124, sr_fee#125, sr_return_ship_cost#126, sr_refunded_cash#127, sr_reversed_charge#128, sr_store_credit#129, sr_net_loss#130]
   +- *(2) Filter isnotnull(sr_ticket_number#120L)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.store_returns[sr_returned_date_sk#111L,sr_return_time_sk#112L,sr_item_sk#113L,sr_customer_sk#114L,sr_cdemo_sk#115L,sr_hdemo_sk#116L,sr_addr_sk#117L,sr_store_sk#118L,sr_reason_sk#119L,sr_ticket_number#120L,sr_return_quantity#121L,sr_return_amt#122,sr_return_tax#123,sr_return_amt_inc_tax#124,sr_fee#125,sr_return_ship_cost#126,sr_refunded_cash#127,sr_reversed_charge#128,sr_store_credit#129,sr_net_loss#130] Batched: true, DataFilters: [isnotnull(sr_ticket_number#120L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/chengsu/spark/spark-warehouse/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_returned_date_sk:bigint,sr_return_time_sk:bigint,sr_item_sk:bigint,sr_customer_sk:bigin..., SelectedBucketsCount: 4 out of 4 (Coalesced to 2)
```

Run time improvement as 50% of wall clock time:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join coalesce bucket off              1541           1664         106          1.9         535.1       1.0X
shuffle hash join coalesce bucket on               1060           1169          81          2.7         368.1       1.5X
```

Closes #29079 from c21/split-bucket.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
cloud-fan pushed a commit that referenced this pull request Apr 14, 2023
### What changes were proposed in this pull request?

Add `CoalesceBucketsInJoin` to AQE `preprocessingRules`.

### Why are the changes needed?

Previously optimized bucket join: 'CoalesceBucketsInJoin'` : #28123

But when using AQE , `CoalesceBucketsInJoin` can not match beacuse the top of the spark plan is `AdaptiveSparkPlan`.

The code :
```
  val spark = SparkSession.builder()
    .appName("BucketJoin")
    .master("local[*]")
    .config("spark.sql.adaptive.enabled", true)
    .config("spark.driver.memory", "4")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .config("spark.sql.bucketing.coalesceBucketsInJoin.enabled", true)
    .enableHiveSupport()
    .getOrCreate()

    val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
    val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
    df1.write.format("parquet").bucketBy(4, "i").saveAsTable("t1")
    df2.write.format("parquet").bucketBy(2, "i").saveAsTable("t2")
    val t1 = spark.table("t1")
    val t2 = spark.table("t2")
    val joined = t1.join(t2, t1("i") === t2("i"))
    joined.explain()
```

Before the PR
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [i#50], [i#56], Inner
   :- Sort [i#50 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(i#50)
   :     +- FileScan parquet spark_catalog.default.t1[i#50,j#51,k#52] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
   +- Sort [i#56 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(i#56, 4), ENSURE_REQUIREMENTS, [plan_id=78]
         +- Filter isnotnull(i#56)
            +- FileScan parquet spark_catalog.default.t2[i#56,j#57,k#58] Batched: true, Bucketed: false (disabled by query planner), DataFilters: [isnotnull(i#56)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>

```

After the PR output:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [i#50], [i#56], Inner
   :- Sort [i#50 ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(i#50)
   :     +- FileScan parquet spark_catalog.default.t1[i#50,j#51,k#52] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#50)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4 (Coalesced to 2)
   +- Sort [i#56 ASC NULLS FIRST], false, 0
      +- Filter isnotnull(i#56)
         +- FileScan parquet spark_catalog.default.t2[i#56,j#57,k#58] Batched: true, Bucketed: true, DataFilters: [isnotnull(i#56)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/shezhiming/gh/zzzzming_new/spark/spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 2 out of 2

```

Additional Notes:

We don't add CoalesceBucketsInJoin to `AdaptiveSparkPlanExec#queryStageOptimizerRules` because queryStageOptimizerRules is not applied at the beginning of the init plan. Instead, they are applied in the createQueryStages() method. And createQueryStages() is bottom-up, which causes the exchange to be eliminated to be wrapped in a layer of ShuffleQueryStage first, making CoalesceBucketsInJoin unrecognizable.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

add UT

Closes #40688 from zzzzming95/SPARK-43021.

Authored-by: zzzzming95 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants