diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 8539ca0874dd..9ba66dd6098c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -571,6 +571,9 @@ config_namespace! { /// when an exact selectivity cannot be determined. Valid values are /// between 0 (no selectivity) and 100 (all rows are selected). pub default_filter_selectivity: u8, default = 20 + + /// When set to true, the optimizer will not attempt to convert Union to Interleave + pub prefer_existing_union: bool, default = false } } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index eacc842c342d..7ea4164963cd 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1192,7 +1192,11 @@ fn ensure_distribution( .collect::>>()?; let children_plans = children.iter().map(|c| c.plan.clone()).collect::>(); - plan = if plan.as_any().is::() && can_interleave(children_plans.iter()) { + + plan = if plan.as_any().is::() + && !config.optimizer.prefer_existing_union + && can_interleave(children_plans.iter()) + { // Add a special case for [`UnionExec`] since we want to "bubble up" // hash-partitioned data. So instead of // @@ -1731,16 +1735,25 @@ pub(crate) mod tests { /// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to /// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans /// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition + /// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to convert Union to Interleave macro_rules! assert_optimized { ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024); + assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false); }; ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024); + assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false); + }; + + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => { + assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION); }; ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { + assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false); + }; + + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); let mut config = ConfigOptions::new(); @@ -1748,6 +1761,7 @@ pub(crate) mod tests { config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS; config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT; + config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION; // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade // because they were written prior to the separation of `BasicEnforcement` into @@ -3107,7 +3121,67 @@ pub(crate) mod tests { "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!(expected, plan.clone(), false); + + Ok(()) + } + + #[test] + fn union_not_to_interleave() -> Result<()> { + // group by (a as a1) + let left = aggregate_exec_with_alias( + parquet_exec(), + vec![("a".to_string(), "a1".to_string())], + ); + // group by (a as a2) + let right = aggregate_exec_with_alias( + parquet_exec(), + vec![("a".to_string(), "a1".to_string())], + ); + + // Union + let plan = Arc::new(UnionExec::new(vec![left, right])); + + // final agg + let plan = + aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]); + + // Only two RepartitionExecs added, no final RepartitionExec required + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]", + "RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20", + "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]", + "UnionExec", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + // no sort in the plan but since we need it as a parameter, make it default false + let prefer_existing_sort = false; + let first_enforce_distribution = true; + let prefer_existing_union = true; + + assert_optimized!( + expected, + plan.clone(), + first_enforce_distribution, + prefer_existing_sort, + prefer_existing_union + ); + assert_optimized!( + expected, + plan, + !first_enforce_distribution, + prefer_existing_sort, + prefer_existing_union + ); Ok(()) } @@ -3661,7 +3735,8 @@ pub(crate) mod tests { true, target_partitions, true, - repartition_size + repartition_size, + false ); let expected = [ @@ -3678,7 +3753,8 @@ pub(crate) mod tests { true, target_partitions, true, - repartition_size + repartition_size, + false ); Ok(()) @@ -3741,7 +3817,7 @@ pub(crate) mod tests { )), vec![("a".to_string(), "a".to_string())], ); - assert_optimized!(expected, plan, true, false, 2, true, 10); + assert_optimized!(expected, plan, true, false, 2, true, 10, false); } Ok(()) } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 8f4b1a3816a3..c64279ede02c 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -216,6 +216,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false +datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true datafusion.optimizer.repartition_aggregations true datafusion.optimizer.repartition_file_min_size 10485760 @@ -294,6 +295,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. +datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 3ee3778177c4..a90f03306268 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -103,6 +103,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | +| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |