Skip to content

Commit

Permalink
feat: add optimizer config param to avoid grouping partitions `prefer…
Browse files Browse the repository at this point in the history
…_existing_union` (apache#10259)

* feat: add a config param to avoid converting union to interleave

* chore: update config for the tests

* chore: update configs.md
  • Loading branch information
NGA-TRAN authored and appletreeisyellow committed Apr 30, 2024
1 parent c616c84 commit 79cf58f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 7 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ config_namespace! {
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
pub default_filter_selectivity: u8, default = 20

/// When set to true, the optimizer will not attempt to convert Union to Interleave
pub prefer_existing_union: bool, default = false
}
}

Expand Down
90 changes: 83 additions & 7 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,11 @@ fn ensure_distribution(
.collect::<Result<Vec<_>>>()?;

let children_plans = children.iter().map(|c| c.plan.clone()).collect::<Vec<_>>();
plan = if plan.as_any().is::<UnionExec>() && can_interleave(children_plans.iter()) {

plan = if plan.as_any().is::<UnionExec>()
&& !config.optimizer.prefer_existing_union
&& can_interleave(children_plans.iter())
{
// Add a special case for [`UnionExec`] since we want to "bubble up"
// hash-partitioned data. So instead of
//
Expand Down Expand Up @@ -1731,23 +1735,33 @@ pub(crate) mod tests {
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to convert Union to Interleave
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;

// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
Expand Down Expand Up @@ -3107,7 +3121,67 @@ pub(crate) mod tests {
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
assert_optimized!(expected, plan.clone(), false);

Ok(())
}

#[test]
fn union_not_to_interleave() -> Result<()> {
// group by (a as a1)
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
// group by (a as a2)
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);

// Union
let plan = Arc::new(UnionExec::new(vec![left, right]));

// final agg
let plan =
aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]);

// Only two RepartitionExecs added, no final RepartitionExec required
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20",
"AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
"UnionExec",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
// no sort in the plan but since we need it as a parameter, make it default false
let prefer_existing_sort = false;
let first_enforce_distribution = true;
let prefer_existing_union = true;

assert_optimized!(
expected,
plan.clone(),
first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
);
assert_optimized!(
expected,
plan,
!first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
);

Ok(())
}
Expand Down Expand Up @@ -3661,7 +3735,8 @@ pub(crate) mod tests {
true,
target_partitions,
true,
repartition_size
repartition_size,
false
);

let expected = [
Expand All @@ -3678,7 +3753,8 @@ pub(crate) mod tests {
true,
target_partitions,
true,
repartition_size
repartition_size,
false
);

Ok(())
Expand Down Expand Up @@ -3741,7 +3817,7 @@ pub(crate) mod tests {
)),
vec![("a".to_string(), "a".to_string())],
);
assert_optimized!(expected, plan, true, false, 2, true, 10);
assert_optimized!(expected, plan, true, false, 2, true, 10, false);
}
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.max_passes 3
datafusion.optimizer.prefer_existing_sort false
datafusion.optimizer.prefer_existing_union false
datafusion.optimizer.prefer_hash_join true
datafusion.optimizer.repartition_aggregations true
datafusion.optimizer.repartition_file_min_size 10485760
Expand Down Expand Up @@ -294,6 +295,7 @@ datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum es
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave
datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory
datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level
datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning.
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.hash_join_single_partition_threshold_rows | 131072 | The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down

0 comments on commit 79cf58f

Please sign in to comment.