Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-38570][SQL] Incorrect DynamicPartitionPruning caused by Literal
### What changes were proposed in this pull request? The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column. For example, the sql in the test case will generate such a physical plan when the adaptive is closed: ```text *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300)) : : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- *(1) ColumnarToRow : : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int> : : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] : : +- *(1) Project [store_id#5291, state_province#5292] : : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string> : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300)) : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : +- *(2) ColumnarToRow : +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int> : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] ``` after this pr: ```text *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int> : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) : +- *(2) ColumnarToRow : +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326] +- *(3) Project [store_id#5291, state_province#5292] +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291)) +- *(3) ColumnarToRow +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string> ``` ### Why are the changes needed? Execution performance improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test Closes #35878 from mcdull-zhang/literal_dynamic_partition. Lead-authored-by: mcdull-zhang <[email protected]> Co-authored-by: mcdull_zhang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
- Loading branch information