-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31705][SQL] Push predicate through join by rewriting join condition to conjunctive normal form #28575
Conversation
Test build #122828 has finished for PR 28575 at commit
|
retest this please |
Test build #122857 has finished for PR 28575 at commit
|
IIRC we had some activities (not merged though) for CNF normalization, e.g., SPARK-6624 and SPARK-17357 (#15558). I'm not sure about why they stopped halfway though, supporting it for optimization (like the other DBMS systems such as PgSQL) sounds nice. But, I think we should implement it in a general way rather than adding it in each rule like this PR. cc: @viirya @cloud-fan |
I don't look at this PR yet, but yes, a general one should be better. |
Test build #122904 has finished for PR 28575 at commit
|
&& ((("x.a".attr > 3) && ("x.a".attr < 13) && ("y.c".attr <= 5)) | ||
|| (("y.a".attr > 2) && ("y.c".attr < 1))))).analyze | ||
|
||
comparePlans(optimized, correctAnswer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Match the PostgreSQL
's plan:
postgres=# explain select x.* from x join y on (x.b = y.b and ( (x.a > 3 and x.a < 13 and y.c <= 5) or (y.a > 2 and y.c < 1) ));
QUERY PLAN
-----------------------------------------------------------------------------------------
Merge Join (cost=207.30..402.86 rows=1927 width=16)
Merge Cond: (y.b = x.b)
Join Filter: (((x.a > 3) AND (x.a < 13) AND (y.c <= 5)) OR ((y.a > 2) AND (y.c < 1)))
-> Sort (cost=78.41..80.30 rows=754 width=12)
Sort Key: y.b
-> Seq Scan on y (cost=0.00..42.38 rows=754 width=12)
Filter: ((c <= 5) OR ((a > 2) AND (c < 1)))
-> Sort (cost=128.89..133.52 rows=1850 width=16)
Sort Key: x.b
-> Seq Scan on x (cost=0.00..28.50 rows=1850 width=16)
(10 rows)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11)))) | ||
.analyze | ||
|
||
comparePlans(optimized, correctAnswer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Match the PostgreSQL
's plan:
postgres=# explain select x.* from x right join y on (x.b = y.b and ( (x.a > 3 and y.a > 13 ) or (x.a > 1 and y.a > 11) ));
QUERY PLAN
---------------------------------------------------------------------------
Merge Left Join (cost=218.07..465.05 rows=1996 width=16)
Merge Cond: (y.b = x.b)
Join Filter: (((x.a > 3) AND (y.a > 13)) OR ((x.a > 1) AND (y.a > 11)))
-> Sort (cost=128.89..133.52 rows=1850 width=8)
Sort Key: y.b
-> Seq Scan on y (cost=0.00..28.50 rows=1850 width=8)
-> Sort (cost=89.18..91.75 rows=1028 width=16)
Sort Key: x.b
-> Seq Scan on x (cost=0.00..37.75 rows=1028 width=16)
Filter: ((a > 3) OR (a > 1))
(10 rows)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11)))) | ||
.analyze | ||
|
||
comparePlans(optimized, correctAnswer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Match the PostgreSQL
's plan:
postgres=# explain select x.* from x left join y on (x.b = y.b and ( (x.a > 3 and y.a > 13 ) or (x.a > 1 and y.a > 11) ));
QUERY PLAN
---------------------------------------------------------------------------
Merge Left Join (cost=218.07..465.05 rows=1996 width=16)
Merge Cond: (x.b = y.b)
Join Filter: (((x.a > 3) AND (y.a > 13)) OR ((x.a > 1) AND (y.a > 11)))
-> Sort (cost=128.89..133.52 rows=1850 width=16)
Sort Key: x.b
-> Seq Scan on x (cost=0.00..28.50 rows=1850 width=16)
-> Sort (cost=89.18..91.75 rows=1028 width=8)
Sort Key: y.b
-> Seq Scan on y (cost=0.00..37.75 rows=1028 width=8)
Filter: ((a > 13) OR (a > 11))
(10 rows)
&& (("x.a".attr <= 3) || (("x.a".attr >= 2) && ("y.a".attr <= 13))) | ||
&& (("x.a".attr <= 1) || ("y.a".attr <= 11)))) | ||
.analyze | ||
comparePlans(optimized, correctAnswer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Match the PostgreSQL
's plan:
postgres=# explain select x.* from x join y on ((x.b = y.b) and Not((x.a > 3) and (x.a < 2 or (y.a > 13)) or (x.a > 1) and (y.a > 11)));
QUERY PLAN
-----------------------------------------------------------------------------------------------
Merge Join (cost=218.07..484.71 rows=3874 width=16)
Merge Cond: (x.b = y.b)
Join Filter: (((x.a <= 1) OR (y.a <= 11)) AND ((x.a <= 3) OR ((x.a >= 2) AND (y.a <= 13))))
-> Sort (cost=89.18..91.75 rows=1028 width=16)
Sort Key: x.b
-> Seq Scan on x (cost=0.00..37.75 rows=1028 width=16)
Filter: ((a <= 3) OR (a >= 2))
-> Sort (cost=128.89..133.52 rows=1850 width=8)
Sort Key: y.b
-> Seq Scan on y (cost=0.00..28.50 rows=1850 width=8)
(10 rows)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11)))) | ||
.analyze | ||
|
||
comparePlans(optimized, correctAnswer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Match the PostgreSQL
's plan:
postgres=# explain select x.* from x join y on (x.b = y.b and ( (x.a > 3 and y.a > 13 ) or (x.a > 1 and y.a > 11) ));
QUERY PLAN
---------------------------------------------------------------------------
Merge Join (cost=178.36..315.60 rows=3593 width=16)
Merge Cond: (x.b = y.b)
Join Filter: (((x.a > 3) AND (y.a > 13)) OR ((x.a > 1) AND (y.a > 11)))
-> Sort (cost=89.18..91.75 rows=1028 width=16)
Sort Key: x.b
-> Seq Scan on x (cost=0.00..37.75 rows=1028 width=16)
Filter: ((a > 3) OR (a > 1))
-> Sort (cost=89.18..91.75 rows=1028 width=8)
Sort Key: y.b
-> Seq Scan on y (cost=0.00..37.75 rows=1028 width=8)
Filter: ((a > 13) OR (a > 11))
(11 rows)
if (depth < SQLConf.get.maxRewritingCNFDepth) { | ||
condition match { | ||
case or @ Or(left: And, right: And) => | ||
val lhs = splitConjunctivePredicates(left).groupBy(_.references.map(_.qualifier)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
group by qualifier to avoid generating too many predicates. For example:
TPCDS q85:
Without group by qualifier:
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[substr(r_reason_desc, 1, 20)#137 ASC NULLS FIRST,aggOrder#142 ASC NULLS FIRST,avg(wr_refunded_cash)#139 ASC NULLS FIRST,avg(wr_fee)#140 ASC NULLS FIRST], output=[substr(r_reason_desc, 1, 20)#137,avg(ws_quantity)#138,avg(wr_refunded_cash)#139,avg(wr_fee)#140])
+- *(9) HashAggregate(keys=[r_reason_desc#124], functions=[avg(cast(ws_quantity#18 as bigint)), avg(UnscaledValue(wr_refunded_cash#54)), avg(UnscaledValue(wr_fee#52))])
+- Exchange hashpartitioning(r_reason_desc#124, 5), true, [id=#351]
+- *(8) HashAggregate(keys=[r_reason_desc#124], functions=[partial_avg(cast(ws_quantity#18 as bigint)), partial_avg(UnscaledValue(wr_refunded_cash#54)), partial_avg(UnscaledValue(wr_fee#52))])
+- *(8) Project [ws_quantity#18, wr_fee#52, wr_refunded_cash#54, r_reason_desc#124]
+- *(8) BroadcastHashJoin [wr_reason_sk#46L], [cast(r_reason_sk#122 as bigint)], Inner, BuildRight
:- *(8) Project [ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: +- *(8) BroadcastHashJoin [ws_sold_date_sk#0], [d_date_sk#94], Inner, BuildRight
: :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : +- *(8) BroadcastHashJoin [wr_refunded_addr_sk#40L], [cast(ca_address_sk#81 as bigint)], Inner, BuildRight, ((((ca_state#89 IN (IN,OH,NJ) AND (ws_net_profit#33 >= 100.00)) AND (ws_net_profit#33 <= 200.00)) OR ((ca_state#89 IN (WI,CT,KY) AND (ws_net_profit#33 >= 150.00)) AND (ws_net_profit#33 <= 300.00))) OR ((ca_state#89 IN (LA,IA,AR) AND (ws_net_profit#33 >= 50.00)) AND (ws_net_profit#33 <= 250.00)))
: : :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : : +- *(8) BroadcastHashJoin [wr_returning_cdemo_sk#42L, cd_marital_status#74, cd_education_status#75], [cast(cd_demo_sk#125 as bigint), cd_marital_status#127, cd_education_status#128], Inner, BuildRight
: : : :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54, cd_marital_status#74, cd_education_status#75]
: : : : +- *(8) BroadcastHashJoin [wr_refunded_cdemo_sk#38L], [cast(cd_demo_sk#72 as bigint)], Inner, BuildRight, ((((((cd_marital_status#74 = M) AND (cd_education_status#75 = Advanced Degree)) AND (ws_sales_price#21 >= 100.00)) AND (ws_sales_price#21 <= 150.00)) OR ((((cd_marital_status#74 = S) AND (cd_education_status#75 = College)) AND (ws_sales_price#21 >= 50.00)) AND (ws_sales_price#21 <= 100.00))) OR ((((cd_marital_status#74 = W) AND (cd_education_status#75 = 2 yr Degree)) AND (ws_sales_price#21 >= 150.00)) AND (ws_sales_price#21 <= 200.00)))
: : : : :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : : : : +- *(8) BroadcastHashJoin [ws_web_page_sk#12], [wp_web_page_sk#58], Inner, BuildRight
: : : : : :- *(8) Project [ws_sold_date_sk#0, ws_web_page_sk#12, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : : : : : +- *(8) BroadcastHashJoin [cast(ws_item_sk#3 as bigint), cast(ws_order_number#17 as bigint)], [wr_item_sk#36L, wr_order_number#47L], Inner, BuildRight
: : : : : : :- *(8) Project [ws_sold_date_sk#0, ws_item_sk#3, ws_web_page_sk#12, ws_order_number#17, ws_quantity#18, ws_sales_price#21, ws_net_profit#33]
: : : : : : : +- *(8) Filter (((((((((((((((((((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0)) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 >= 150.00))) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_sales_price#21 >= 100.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 >= 50.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_sales_price#21 <= 150.00) OR (ws_sales_price#21 <= 100.00)) OR (ws_sales_price#21 <= 200.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 >= 50.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 <= 250.00))) AND (((ws_net_profit#33 >= 100.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 <= 250.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 >= 150.00)) OR (ws_net_profit#33 <= 250.00))) AND (((ws_net_profit#33 <= 200.00) OR (ws_net_profit#33 <= 300.00)) OR (ws_net_profit#33 <= 250.00)))
: : : : : : : +- *(8) ColumnarToRow
: : : : : : : +- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_..., ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,...
: : : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[5, bigint, true])), [id=#291]
: : : : : : +- *(1) Project [wr_item_sk#36L, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_order_number#47L, wr_fee#52, wr_refunded_cash#54]
: : : : : : +- *(1) Filter (((((isnotnull(wr_item_sk#36L) AND isnotnull(wr_order_number#47L)) AND isnotnull(wr_refunded_cdemo_sk#38L)) AND isnotnull(wr_returning_cdemo_sk#42L)) AND isnotnull(wr_refunded_addr_sk#40L)) AND isnotnull(wr_reason_sk#46L))
: : : : : : +- *(1) ColumnarToRow
: : : : : : +- FileScan parquet default.web_returns[wr_item_sk#36L,wr_refunded_cdemo_sk#38L,wr_refunded_addr_sk#40L,wr_returning_cdemo_sk#42L,wr_reason_sk#46L,wr_order_number#47L,wr_fee#52,wr_refunded_cash#54] Batched: true, DataFilters: [isnotnull(wr_item_sk#36L), isnotnull(wr_order_number#47L), isnotnull(wr_refunded_cdemo_sk#38L), ..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wr_item_sk), IsNotNull(wr_order_number), IsNotNull(wr_refunded_cdemo_sk), IsNotNull(wr..., ReadSchema: struct<wr_item_sk:bigint,wr_refunded_cdemo_sk:bigint,wr_refunded_addr_sk:bigint,wr_returning_cdem...
: : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#300]
: : : : : +- *(2) Project [wp_web_page_sk#58]
: : : : : +- *(2) Filter isnotnull(wp_web_page_sk#58)
: : : : : +- *(2) ColumnarToRow
: : : : : +- FileScan parquet default.web_page[wp_web_page_sk#58] Batched: true, DataFilters: [isnotnull(wp_web_page_sk#58)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wp_web_page_sk)], ReadSchema: struct<wp_web_page_sk:int>
: : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#309]
: : : : +- *(3) Project [cd_demo_sk#72, cd_marital_status#74, cd_education_status#75]
: : : : +- *(3) Filter ((((((((((isnotnull(cd_demo_sk#72) AND isnotnull(cd_education_status#75)) AND isnotnull(cd_marital_status#74)) AND (((cd_marital_status#74 = M) OR (cd_marital_status#74 = S)) OR (cd_marital_status#74 = W))) AND (((cd_marital_status#74 = M) OR (cd_marital_status#74 = S)) OR (cd_education_status#75 = 2 yr Degree))) AND (((cd_marital_status#74 = M) OR (cd_education_status#75 = College)) OR (cd_marital_status#74 = W))) AND (((cd_marital_status#74 = M) OR (cd_education_status#75 = College)) OR (cd_education_status#75 = 2 yr Degree))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_marital_status#74 = S)) OR (cd_marital_status#74 = W))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_marital_status#74 = S)) OR (cd_education_status#75 = 2 yr Degree))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_education_status#75 = College)) OR (cd_marital_status#74 = W))) AND (((cd_education_status#75 = Advanced Degree) OR (cd_education_status#75 = College)) OR (cd_education_status#75 = 2 yr Degree)))
: : : : +- *(3) ColumnarToRow
: : : : +- FileScan parquet default.customer_demographics[cd_demo_sk#72,cd_marital_status#74,cd_education_status#75] Batched: true, DataFilters: [isnotnull(cd_demo_sk#72), isnotnull(cd_education_status#75), isnotnull(cd_marital_status#74), ((..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status), Or(Or(Equal..., ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
: : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint), input[1, string, true], input[2, string, true])), [id=#318]
: : : +- *(4) Project [cd_demo_sk#125, cd_marital_status#127, cd_education_status#128]
: : : +- *(4) Filter ((isnotnull(cd_demo_sk#125) AND isnotnull(cd_education_status#128)) AND isnotnull(cd_marital_status#127))
: : : +- *(4) ColumnarToRow
: : : +- FileScan parquet default.customer_demographics[cd_demo_sk#125,cd_marital_status#127,cd_education_status#128] Batched: true, DataFilters: [isnotnull(cd_demo_sk#125), isnotnull(cd_education_status#128), isnotnull(cd_marital_status#127)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status)], ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
: : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#327]
: : +- *(5) Project [ca_address_sk#81, ca_state#89]
: : +- *(5) Filter (((isnotnull(ca_country#91) AND (ca_country#91 = United States)) AND isnotnull(ca_address_sk#81)) AND ((ca_state#89 IN (IN,OH,NJ) OR ca_state#89 IN (WI,CT,KY)) OR ca_state#89 IN (LA,IA,AR)))
: : +- *(5) ColumnarToRow
: : +- FileScan parquet default.customer_address[ca_address_sk#81,ca_state#89,ca_country#91] Batched: true, DataFilters: [isnotnull(ca_country#91), (ca_country#91 = United States), isnotnull(ca_address_sk#81), ((ca_sta..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ca_country), EqualTo(ca_country,United States), IsNotNull(ca_address_sk), Or(Or(In(ca_..., ReadSchema: struct<ca_address_sk:int,ca_state:string,ca_country:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#336]
: +- *(6) Project [d_date_sk#94]
: +- *(6) Filter ((isnotnull(d_year#100) AND (d_year#100 = 2000)) AND isnotnull(d_date_sk#94))
: +- *(6) ColumnarToRow
: +- FileScan parquet default.date_dim[d_date_sk#94,d_year#100] Batched: true, DataFilters: [isnotnull(d_year#100), (d_year#100 = 2000), isnotnull(d_date_sk#94)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#345]
+- *(7) Project [r_reason_sk#122, r_reason_desc#124]
+- *(7) Filter isnotnull(r_reason_sk#122)
+- *(7) ColumnarToRow
+- FileScan parquet default.reason[r_reason_sk#122,r_reason_desc#124] Batched: true, DataFilters: [isnotnull(r_reason_sk#122)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk)], ReadSchema: struct<r_reason_sk:int,r_reason_desc:string>
Group by qualifier:
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[substr(r_reason_desc, 1, 20)#137 ASC NULLS FIRST,aggOrder#142 ASC NULLS FIRST,avg(wr_refunded_cash)#139 ASC NULLS FIRST,avg(wr_fee)#140 ASC NULLS FIRST], output=[substr(r_reason_desc, 1, 20)#137,avg(ws_quantity)#138,avg(wr_refunded_cash)#139,avg(wr_fee)#140])
+- *(9) HashAggregate(keys=[r_reason_desc#124], functions=[avg(cast(ws_quantity#18 as bigint)), avg(UnscaledValue(wr_refunded_cash#54)), avg(UnscaledValue(wr_fee#52))])
+- Exchange hashpartitioning(r_reason_desc#124, 5), true, [id=#351]
+- *(8) HashAggregate(keys=[r_reason_desc#124], functions=[partial_avg(cast(ws_quantity#18 as bigint)), partial_avg(UnscaledValue(wr_refunded_cash#54)), partial_avg(UnscaledValue(wr_fee#52))])
+- *(8) Project [ws_quantity#18, wr_fee#52, wr_refunded_cash#54, r_reason_desc#124]
+- *(8) BroadcastHashJoin [wr_reason_sk#46L], [cast(r_reason_sk#122 as bigint)], Inner, BuildRight
:- *(8) Project [ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: +- *(8) BroadcastHashJoin [ws_sold_date_sk#0], [d_date_sk#94], Inner, BuildRight
: :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : +- *(8) BroadcastHashJoin [wr_refunded_addr_sk#40L], [cast(ca_address_sk#81 as bigint)], Inner, BuildRight, ((((ca_state#89 IN (IN,OH,NJ) AND (ws_net_profit#33 >= 100.00)) AND (ws_net_profit#33 <= 200.00)) OR ((ca_state#89 IN (WI,CT,KY) AND (ws_net_profit#33 >= 150.00)) AND (ws_net_profit#33 <= 300.00))) OR ((ca_state#89 IN (LA,IA,AR) AND (ws_net_profit#33 >= 50.00)) AND (ws_net_profit#33 <= 250.00)))
: : :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : : +- *(8) BroadcastHashJoin [wr_returning_cdemo_sk#42L, cd_marital_status#74, cd_education_status#75], [cast(cd_demo_sk#125 as bigint), cd_marital_status#127, cd_education_status#128], Inner, BuildRight
: : : :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_net_profit#33, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54, cd_marital_status#74, cd_education_status#75]
: : : : +- *(8) BroadcastHashJoin [wr_refunded_cdemo_sk#38L], [cast(cd_demo_sk#72 as bigint)], Inner, BuildRight, ((((((cd_marital_status#74 = M) AND (cd_education_status#75 = Advanced Degree)) AND (ws_sales_price#21 >= 100.00)) AND (ws_sales_price#21 <= 150.00)) OR ((((cd_marital_status#74 = S) AND (cd_education_status#75 = College)) AND (ws_sales_price#21 >= 50.00)) AND (ws_sales_price#21 <= 100.00))) OR ((((cd_marital_status#74 = W) AND (cd_education_status#75 = 2 yr Degree)) AND (ws_sales_price#21 >= 150.00)) AND (ws_sales_price#21 <= 200.00)))
: : : : :- *(8) Project [ws_sold_date_sk#0, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : : : : +- *(8) BroadcastHashJoin [ws_web_page_sk#12], [wp_web_page_sk#58], Inner, BuildRight
: : : : : :- *(8) Project [ws_sold_date_sk#0, ws_web_page_sk#12, ws_quantity#18, ws_sales_price#21, ws_net_profit#33, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_fee#52, wr_refunded_cash#54]
: : : : : : +- *(8) BroadcastHashJoin [cast(ws_item_sk#3 as bigint), cast(ws_order_number#17 as bigint)], [wr_item_sk#36L, wr_order_number#47L], Inner, BuildRight
: : : : : : :- *(8) Project [ws_sold_date_sk#0, ws_item_sk#3, ws_web_page_sk#12, ws_order_number#17, ws_quantity#18, ws_sales_price#21, ws_net_profit#33]
: : : : : : : +- *(8) Filter (((((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0)) AND ((((ws_sales_price#21 >= 100.00) AND (ws_sales_price#21 <= 150.00)) OR ((ws_sales_price#21 >= 50.00) AND (ws_sales_price#21 <= 100.00))) OR ((ws_sales_price#21 >= 150.00) AND (ws_sales_price#21 <= 200.00)))) AND ((((ws_net_profit#33 >= 100.00) AND (ws_net_profit#33 <= 200.00)) OR ((ws_net_profit#33 >= 150.00) AND (ws_net_profit#33 <= 300.00))) OR ((ws_net_profit#33 >= 50.00) AND (ws_net_profit#33 <= 250.00))))
: : : : : : : +- *(8) ColumnarToRow
: : : : : : : +- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_..., ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,...
: : : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[5, bigint, true])), [id=#291]
: : : : : : +- *(1) Project [wr_item_sk#36L, wr_refunded_cdemo_sk#38L, wr_refunded_addr_sk#40L, wr_returning_cdemo_sk#42L, wr_reason_sk#46L, wr_order_number#47L, wr_fee#52, wr_refunded_cash#54]
: : : : : : +- *(1) Filter (((((isnotnull(wr_item_sk#36L) AND isnotnull(wr_order_number#47L)) AND isnotnull(wr_refunded_cdemo_sk#38L)) AND isnotnull(wr_returning_cdemo_sk#42L)) AND isnotnull(wr_refunded_addr_sk#40L)) AND isnotnull(wr_reason_sk#46L))
: : : : : : +- *(1) ColumnarToRow
: : : : : : +- FileScan parquet default.web_returns[wr_item_sk#36L,wr_refunded_cdemo_sk#38L,wr_refunded_addr_sk#40L,wr_returning_cdemo_sk#42L,wr_reason_sk#46L,wr_order_number#47L,wr_fee#52,wr_refunded_cash#54] Batched: true, DataFilters: [isnotnull(wr_item_sk#36L), isnotnull(wr_order_number#47L), isnotnull(wr_refunded_cdemo_sk#38L), ..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wr_item_sk), IsNotNull(wr_order_number), IsNotNull(wr_refunded_cdemo_sk), IsNotNull(wr..., ReadSchema: struct<wr_item_sk:bigint,wr_refunded_cdemo_sk:bigint,wr_refunded_addr_sk:bigint,wr_returning_cdem...
: : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#300]
: : : : : +- *(2) Project [wp_web_page_sk#58]
: : : : : +- *(2) Filter isnotnull(wp_web_page_sk#58)
: : : : : +- *(2) ColumnarToRow
: : : : : +- FileScan parquet default.web_page[wp_web_page_sk#58] Batched: true, DataFilters: [isnotnull(wp_web_page_sk#58)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(wp_web_page_sk)], ReadSchema: struct<wp_web_page_sk:int>
: : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#309]
: : : : +- *(3) Project [cd_demo_sk#72, cd_marital_status#74, cd_education_status#75]
: : : : +- *(3) Filter (((isnotnull(cd_demo_sk#72) AND isnotnull(cd_education_status#75)) AND isnotnull(cd_marital_status#74)) AND ((((cd_marital_status#74 = M) AND (cd_education_status#75 = Advanced Degree)) OR ((cd_marital_status#74 = S) AND (cd_education_status#75 = College))) OR ((cd_marital_status#74 = W) AND (cd_education_status#75 = 2 yr Degree))))
: : : : +- *(3) ColumnarToRow
: : : : +- FileScan parquet default.customer_demographics[cd_demo_sk#72,cd_marital_status#74,cd_education_status#75] Batched: true, DataFilters: [isnotnull(cd_demo_sk#72), isnotnull(cd_education_status#75), isnotnull(cd_marital_status#74), ((..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status), Or(Or(And(E..., ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
: : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint), input[1, string, true], input[2, string, true])), [id=#318]
: : : +- *(4) Project [cd_demo_sk#125, cd_marital_status#127, cd_education_status#128]
: : : +- *(4) Filter ((isnotnull(cd_demo_sk#125) AND isnotnull(cd_education_status#128)) AND isnotnull(cd_marital_status#127))
: : : +- *(4) ColumnarToRow
: : : +- FileScan parquet default.customer_demographics[cd_demo_sk#125,cd_marital_status#127,cd_education_status#128] Batched: true, DataFilters: [isnotnull(cd_demo_sk#125), isnotnull(cd_education_status#128), isnotnull(cd_marital_status#127)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_demo_sk), IsNotNull(cd_education_status), IsNotNull(cd_marital_status)], ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
: : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#327]
: : +- *(5) Project [ca_address_sk#81, ca_state#89]
: : +- *(5) Filter (((isnotnull(ca_country#91) AND (ca_country#91 = United States)) AND isnotnull(ca_address_sk#81)) AND ((ca_state#89 IN (IN,OH,NJ) OR ca_state#89 IN (WI,CT,KY)) OR ca_state#89 IN (LA,IA,AR)))
: : +- *(5) ColumnarToRow
: : +- FileScan parquet default.customer_address[ca_address_sk#81,ca_state#89,ca_country#91] Batched: true, DataFilters: [isnotnull(ca_country#91), (ca_country#91 = United States), isnotnull(ca_address_sk#81), ((ca_sta..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(ca_country), EqualTo(ca_country,United States), IsNotNull(ca_address_sk), Or(Or(In(ca_..., ReadSchema: struct<ca_address_sk:int,ca_state:string,ca_country:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#336]
: +- *(6) Project [d_date_sk#94]
: +- *(6) Filter ((isnotnull(d_year#100) AND (d_year#100 = 2000)) AND isnotnull(d_date_sk#94))
: +- *(6) ColumnarToRow
: +- FileScan parquet default.date_dim[d_date_sk#94,d_year#100] Batched: true, DataFilters: [isnotnull(d_year#100), (d_year#100 = 2000), isnotnull(d_date_sk#94)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#345]
+- *(7) Project [r_reason_sk#122, r_reason_desc#124]
+- *(7) Filter isnotnull(r_reason_sk#122)
+- *(7) ColumnarToRow
+- FileScan parquet default.reason[r_reason_sk#122,r_reason_desc#124] Batched: true, DataFilters: [isnotnull(r_reason_sk#122)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [IsNotNull(r_reason_sk)], ReadSchema: struct<r_reason_sk:int,r_reason_desc:string>
Test build #123348 has finished for PR 28575 at commit
|
* Rewriting join condition to conjunctive normal form expression so that we can push | ||
* more predicate. | ||
*/ | ||
object PushPredicateThroughJoinByCNF extends Rule[LogicalPlan] with PredicateHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it apply to all the predicates? like when we pushdown filters to the data source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, such as the step of scanning web_sales table for TPC-DS q85.
Before this pr:
+- *(8) Filter (((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0))
+- *(8) ColumnarToRow
+- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(ws_sold_date_sk#0)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSQuerySuite/web_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_date_sk)], ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,ws_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>
After this pr:
+- *(8) Filter (((((isnotnull(ws_item_sk#3) AND isnotnull(ws_order_number#17)) AND isnotnull(ws_web_page_sk#12)) AND isnotnull(ws_sold_date_sk#0)) AND ((((ws_sales_price#21 >= 100.00) AND (ws_sales_price#21 <= 150.00)) OR ((ws_sales_price#21 >= 50.00) AND (ws_sales_price#21 <= 100.00))) OR ((ws_sales_price#21 >= 150.00) AND (ws_sales_price#21 <= 200.00)))) AND ((((ws_net_profit#33 >= 100.00) AND (ws_net_profit#33 <= 200.00)) OR ((ws_net_profit#33 >= 150.00) AND (ws_net_profit#33 <= 300.00))) OR ((ws_net_profit#33 >= 50.00) AND (ws_net_profit#33 <= 250.00))))
+- *(8) ColumnarToRow
+- FileScan parquet default.web_sales[ws_sold_date_sk#0,ws_item_sk#3,ws_web_page_sk#12,ws_order_number#17,ws_quantity#18,ws_sales_price#21,ws_net_profit#33] Batched: true, DataFilters: [isnotnull(ws_item_sk#3), isnotnull(ws_order_number#17), isnotnull(ws_web_page_sk#12), isnotnull(ws_sold_date_sk#0), ((((ws_sales_price#21 >= 100.00) AND (ws_sales_price#21 <= 150.00)) OR ((ws_sales_price#21 >= 50.00) AND (ws_sales_price#21 <= 100.00))) OR ((ws_sales_price#21 >= 150.00) AND (ws_sales_price#21 <= 200.00))), ((((ws_net_profit#33 >= 100.00) AND (ws_net_profit#33 <= 200.00)) OR ((ws_net_profit#33 >= 150.00) AND (ws_net_profit#33 <= 300.00))) OR ((ws_net_profit#33 >= 50.00) AND (ws_net_profit#33 <= 250.00)))], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-28216/sql/core/spark-warehouse/org.apache.spark.sql.TPCDSQuerySuite/web_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_page_sk), IsNotNull(ws_sold_date_sk), Or(Or(And(GreaterThanOrEqual(ws_sales_price,100.00),LessThanOrEqual(ws_sales_price,150.00)),And(GreaterThanOrEqual(ws_sales_price,50.00),LessThanOrEqual(ws_sales_price,100.00))),And(GreaterThanOrEqual(ws_sales_price,150.00),LessThanOrEqual(ws_sales_price,200.00))), Or(Or(And(GreaterThanOrEqual(ws_net_profit,100.00),LessThanOrEqual(ws_net_profit,200.00)),And(GreaterThanOrEqual(ws_net_profit,150.00),LessThanOrEqual(ws_net_profit,300.00))),And(GreaterThanOrEqual(ws_net_profit,50.00),LessThanOrEqual(ws_net_profit,250.00)))], ReadSchema: struct<ws_sold_date_sk:int,ws_item_sk:int,ws_web_page_sk:int,ws_order_number:int,ws_quantity:int,ws_sales_price:decimal(7,2),ws_net_profit:decimal(7,2)>
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
private def maybeWithFilter(joinCondition: Option[Expression], plan: LogicalPlan) = { | ||
(joinCondition, plan) match { | ||
// Avoid adding the same filter. | ||
case (Some(condition), filter: Filter) if condition.semanticEquals(filter.condition) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there an existing optimizer rule to remove duplicated predicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it.
case or @ Or(left: And, right: And) => | ||
val lhs = splitConjunctivePredicates(left).groupBy(_.references.map(_.qualifier)) | ||
val rhs = splitConjunctivePredicates(right).groupBy(_.references.map(_.qualifier)) | ||
if (lhs.size > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we pick rhs
if it has more conjunctive predicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We will pick it at case or @ Or(left, right: And) =>
or case or @ Or(left: And, right: And) =>
.
E.g.: (a && b) || (c && d)
. The rewriting steps are:
(a && b) || (c && d)
--> (a || (c && d)) && (b || (c && d))
--> (a || c) && (a || d) && (b || c) && (b && d)
.
We will pick it at case or @ Or(left, right: And) =>
if a
is fixed .
val lhs = splitConjunctivePredicates(left).groupBy(_.references.map(_.qualifier)) | ||
val rhs = splitConjunctivePredicates(right).groupBy(_.references.map(_.qualifier)) | ||
if (lhs.size > 1) { | ||
lhs.values.map(_.reduceLeft(And)).map { c => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the time complexity here? I am concerned about the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
07:35:05.863 WARN org.apache.spark.sql.TPCDSQuerySuite:
=== Metrics of Analyzer/Optimizer Rules ===
Total number of runs: 7249
Total time: 1.949092121 seconds
Rule Effective Time / Total Time Effective Runs / Total Runs
org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog 151465071 / 249555919 24 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions 138746642 / 168406459 1 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences 78878999 / 132411189 3 / 59
org.apache.spark.sql.execution.datasources.FindDataSourceTable 95372289 / 99326980 1 / 59
org.apache.spark.sql.catalyst.analysis.DecimalPrecision 56750800 / 66170980 2 / 59
org.apache.spark.sql.execution.datasources.PreprocessTableCreation 0 / 48910600 0 / 28
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts 20820860 / 44731509 1 / 59
org.apache.spark.sql.catalyst.optimizer.ColumnPruning 12919681 / 44543112 2 / 105
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification 24718048 / 43686766 1 / 55
org.apache.spark.sql.execution.datasources.SchemaPruning 0 / 32795196 0 / 25
org.apache.spark.sql.catalyst.analysis.ResolveCatalogs 0 / 30645089 0 / 59
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion 17902985 / 27578796 2 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator 0 / 26144312 0 / 59
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates 17354780 / 25365681 5 / 80
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints 22137478 / 24736530 1 / 25
org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases 0 / 24508932 0 / 55
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions 10536555 / 24417169 2 / 59
org.apache.spark.sql.catalyst.optimizer.ReorderJoin 17311087 / 22391786 1 / 55
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences 0 / 21142565 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveBinaryArithmetic 0 / 20704002 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations 15024644 / 20411277 1 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases 15859386 / 18096012 1 / 59
org.apache.spark.sql.catalyst.optimizer.ReorderAssociativeOperator 0 / 17026758 0 / 55
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability 0 / 16870561 0 / 28
org.apache.spark.sql.catalyst.optimizer.PruneFilters 0 / 15220754 0 / 80
org.apache.spark.sql.catalyst.optimizer.CollapseProject 8979563 / 13211028 1 / 80
org.apache.spark.sql.catalyst.optimizer.LikeSimplification 0 / 12837312 0 / 55
org.apache.spark.sql.catalyst.optimizer.ConstantFolding 7099416 / 12652039 1 / 55
org.apache.spark.sql.catalyst.analysis.TimeWindowing 0 / 12566615 0 / 59
org.apache.spark.sql.catalyst.optimizer.ComputeCurrentTime 0 / 12279377 0 / 25
org.apache.spark.sql.catalyst.optimizer.FoldablePropagation 0 / 12158420 0 / 55
org.apache.spark.sql.catalyst.analysis.ResolveTimeZone 7331904 / 11661433 5 / 59
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown 0 / 11651482 0 / 25
org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF 0 / 11312278 0 / 28
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic 0 / 11111621 0 / 28
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings 0 / 11032240 0 / 59
org.apache.spark.sql.catalyst.optimizer.OptimizeIn 0 / 10792662 0 / 55
org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps 0 / 10435223 0 / 55
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics 0 / 10147394 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions 0 / 9906569 0 / 28
org.apache.spark.sql.catalyst.optimizer.SimplifyBinaryComparison 0 / 9434962 0 / 55
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRandomSeed 0 / 8745520 0 / 59
org.apache.spark.sql.catalyst.optimizer.NullPropagation 0 / 8673323 0 / 55
org.apache.spark.sql.catalyst.analysis.ResolveLambdaVariables 0 / 8618322 0 / 59
org.apache.spark.sql.catalyst.optimizer.DecimalAggregates 1133548 / 8344365 1 / 26
org.apache.spark.sql.catalyst.analysis.TypeCoercion$EltCoercion 0 / 8306604 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveInsertInto 0 / 8272186 0 / 59
org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint 0 / 8269481 0 / 25
org.apache.spark.sql.catalyst.analysis.TypeCoercion$BooleanEquality 0 / 8164425 0 / 59
org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion 0 / 7999784 0 / 59
org.apache.spark.sql.catalyst.analysis.TypeCoercion$WindowFrameCoercion 0 / 7744630 0 / 59
org.apache.spark.sql.catalyst.optimizer.EliminateSorts 0 / 7664352 0 / 25
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNamespace 0 / 7333421 0 / 59
org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery 0 / 7288581 0 / 55
org.apache.spark.sql.catalyst.optimizer.ReassignLambdaVariableID 0 / 7094275 0 / 25
org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers 0 / 6952061 0 / 25
org.apache.spark.sql.catalyst.analysis.CleanupAliases 1507275 / 6818821 1 / 29
org.apache.spark.sql.catalyst.analysis.TypeCoercion$DateTimeOperations 0 / 6621755 0 / 59
org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals 0 / 6594834 0 / 55
org.apache.spark.sql.execution.dynamicpruning.PartitionPruning 0 / 6435655 0 / 25
org.apache.spark.sql.catalyst.optimizer.SimplifyCasts 1726589 / 6431193 1 / 55
org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion 0 / 6390305 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates 0 / 6364189 0 / 59
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ConcatCoercion 0 / 6344767 0 / 59
org.apache.spark.sql.catalyst.optimizer.PushLeftSemiLeftAntiThroughJoin 0 / 6300823 0 / 55
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery 0 / 6237735 0 / 59
org.apache.spark.sql.catalyst.optimizer.SimplifyCaseConversionExpressions 0 / 6221240 0 / 55
org.apache.spark.sql.catalyst.analysis.TypeCoercion$StringLiteralCoercion 0 / 6192720 0 / 59
org.apache.spark.sql.catalyst.analysis.CTESubstitution 0 / 6150743 0 / 28
org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase 0 / 6148608 0 / 25
org.apache.spark.sql.catalyst.analysis.ResolveCreateNamedStruct 0 / 6101190 0 / 59
org.apache.spark.sql.catalyst.optimizer.ReplaceNullWithFalseInPredicate 0 / 6100257 0 / 55
org.apache.spark.sql.catalyst.optimizer.ConstantPropagation 0 / 6087339 0 / 55
org.apache.spark.sql.catalyst.analysis.ResolveHigherOrderFunctions 0 / 6067619 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions 0 / 6061035 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast 0 / 6007788 0 / 59
org.apache.spark.sql.catalyst.analysis.TypeCoercion$Division 0 / 5925219 0 / 59
org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions 0 / 5824041 0 / 25
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot 0 / 5559831 0 / 59
org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveJoinStrategyHints 0 / 5524307 0 / 28
org.apache.spark.sql.catalyst.optimizer.RemoveDispensableExpressions 0 / 5510061 0 / 55
org.apache.spark.sql.execution.datasources.DataSourceAnalysis 5412159 / 5475347 24 / 28
org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery 2055027 / 5324950 1 / 25
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoinByCNF 0 / 5285288 0 / 25
org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators 0 / 4998930 0 / 105
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOrdinalInOrderByAndGroupBy 0 / 4914629 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder 0 / 4899787 0 / 59
org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals 0 / 4897924 0 / 28
org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin 0 / 4835640 0 / 55
org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation 0 / 4795908 0 / 50
org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning 0 / 4763236 0 / 25
org.apache.spark.sql.catalyst.analysis.ResolveTableValuedFunctions 0 / 4607355 0 / 59
org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin 0 / 4562616 0 / 28
org.apache.spark.sql.catalyst.analysis.TypeCoercion$CaseWhenCoercion 0 / 4517751 0 / 59
org.apache.spark.sql.execution.datasources.FallBackFileSourceV2 0 / 4397375 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame 0 / 4313997 0 / 59
org.apache.spark.sql.catalyst.analysis.TypeCoercion$MapZipWithCoercion 0 / 4309833 0 / 59
org.apache.spark.sql.catalyst.optimizer.RewriteNonCorrelatedExists 0 / 4253433 0 / 25
org.apache.spark.sql.catalyst.analysis.TypeCoercion$StackCoercion 0 / 4208225 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer 0 / 4145215 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy 0 / 4095335 0 / 59
org.apache.spark.sql.execution.python.ExtractPythonUDFs 0 / 4001274 0 / 25
org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates 0 / 3920586 0 / 25
org.apache.spark.sql.catalyst.optimizer.CollapseWindow 0 / 3888524 0 / 55
org.apache.spark.sql.catalyst.optimizer.RewriteDistinctAggregates 0 / 3803019 0 / 25
org.apache.spark.sql.catalyst.optimizer.CollapseRepartition 0 / 3755697 0 / 55
org.apache.spark.sql.catalyst.optimizer.CombineUnions 0 / 3704733 0 / 80
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNewInstance 0 / 3695526 0 / 59
org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases 1421178 / 3635064 1 / 25
org.apache.spark.sql.catalyst.optimizer.EliminateSerialization 0 / 3550180 0 / 55
org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries 0 / 3536272 0 / 50
org.apache.spark.sql.catalyst.optimizer.CombineLimits 0 / 3504846 0 / 55
org.apache.spark.sql.catalyst.optimizer.TransposeWindow 0 / 3499057 0 / 55
org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion 0 / 3355877 0 / 55
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin 0 / 3233735 0 / 55
org.apache.spark.sql.catalyst.optimizer.LimitPushDown 0 / 3208991 0 / 55
org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero 0 / 3190070 0 / 25
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables 0 / 3094747 0 / 59
org.apache.spark.sql.catalyst.optimizer.RewriteIntersectAll 0 / 3085769 0 / 25
org.apache.spark.sql.catalyst.analysis.Analyzer$WindowsSubstitution 0 / 2979758 0 / 28
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAlterTableChanges 0 / 2948089 0 / 28
org.apache.spark.sql.catalyst.optimizer.ExtractPythonUDFFromJoinCondition 0 / 2915414 0 / 25
org.apache.spark.sql.catalyst.analysis.EliminateView 0 / 2805178 0 / 25
org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveCoalesceHints 0 / 2710440 0 / 28
org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions 0 / 2535170 0 / 25
org.apache.spark.sql.catalyst.optimizer.EliminateMapObjects 0 / 2520470 0 / 25
org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters 0 / 2519862 0 / 25
org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate 0 / 2495766 0 / 25
org.apache.spark.sql.execution.python.ExtractGroupingPythonUDFFromAggregate 0 / 2417625 0 / 25
org.apache.spark.sql.catalyst.optimizer.RewriteExceptAll 0 / 2301441 0 / 25
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubqueryColumnAliases 0 / 2277596 0 / 59
org.apache.spark.sql.catalyst.optimizer.CombineTypedFilters 0 / 2267944 0 / 25
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithFilter 0 / 2252560 0 / 25
org.apache.spark.sql.catalyst.optimizer.CombineFilters 0 / 2236246 0 / 55
org.apache.spark.sql.execution.datasources.ResolveSQLOnFile 0 / 2214162 0 / 59
org.apache.spark.sql.catalyst.analysis.TypeCoercion$WidenSetOperationTypes 0 / 2164273 0 / 59
org.apache.spark.sql.catalyst.optimizer.RemoveLiteralFromGroupExpressions 0 / 2092068 0 / 25
org.apache.spark.sql.catalyst.optimizer.ReplaceIntersectWithSemiJoin 0 / 1952290 0 / 25
org.apache.spark.sql.catalyst.optimizer.ReplaceDeduplicateWithAggregate 0 / 1927408 0 / 25
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate 0 / 1915572 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin 0 / 1910467 0 / 59
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation 0 / 1839082 0 / 59
org.apache.spark.sql.catalyst.optimizer.ReplaceExceptWithAntiJoin 0 / 1651671 0 / 25
org.apache.spark.sql.catalyst.analysis.ResolveInlineTables 0 / 1536486 0 / 59
org.apache.spark.sql.catalyst.optimizer.RemoveRepetitionFromGroupExpressions 0 / 1474769 0 / 25
org.apache.spark.sql.execution.datasources.PreprocessTableInsertion 0 / 1406859 0 / 28
org.apache.spark.sql.catalyst.analysis.EliminateUnions 0 / 1327002 0 / 28
org.apache.spark.sql.catalyst.optimizer.CombineConcats 0 / 1217837 0 / 55
org.apache.spark.sql.catalyst.optimizer.ReplaceDistinctWithAggregate 0 / 1201090 0 / 25
org.apache.spark.sql.catalyst.analysis.ResolveHints$RemoveAllHints 0 / 1199850 0 / 28
org.apache.spark.sql.catalyst.optimizer.EliminateDistinct 0 / 1098290 0 / 25
org.apache.spark.sql.catalyst.analysis.UpdateOuterReferences 0 / 1028033 0 / 28
org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts 0 / 703053 0 / 50
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin 0 / 617000 0 / 25
org.apache.spark.sql.execution.OptimizeMetadataOnlyQuery 0 / 371820 0 / 25
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder 0 / 309745 0 / 25
val rhs = splitConjunctivePredicates(right).groupBy(_.references.map(_.qualifier)) | ||
if (lhs.size > 1) { | ||
lhs.values.map(_.reduceLeft(And)).map { c => | ||
toCNF(Or(toCNF(c, depth + 1), toCNF(right, depth + 1)), depth + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand this. At least I can't see how is it related to (a && b) || c --> (a || c) && (b || c)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a complete step to PR description to explain how it works.
…listedOnceBatches
Test build #123483 has finished for PR 28575 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
As I talked to @wangyum offline, I am taking this PR over in #28733 for the CNF implementation and config naming. There have been PRs for CNF conversion, such as #10444, #15558, #28575. The common issue is the recursive implementation can slow, or even cause a stack overflow exception. With this non-recursive implementation, the rule should be faster and more robust. |
What changes were proposed in this pull request?
This PR add a new rule to support push predicate through join by rewriting join condition to CNF(conjunctive normal form). The following example is the steps of this rule:
Why are the changes needed?
Improve query performance. PostgreSQL, Impala and Hive support this feature.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test.