Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup the usage of round-robin repartitioning #8794

Merged
merged 13 commits into from
Jan 11, 2024
48 changes: 20 additions & 28 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ fn add_hash_on_top(
mut input: DistributionContext,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
repartition_beneficial_stats: bool,
) -> Result<DistributionContext> {
// Early return if hash repartition is unnecessary
if n_target == 1 {
Expand All @@ -950,12 +949,6 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
input = add_roundrobin_on_top(input, n_target)?;
}

Comment on lines -953 to -958
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This add_roundrobin_on_top currently is not controlled by enable_round_robin.

let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)?
.with_preserve_order();
Expand Down Expand Up @@ -1208,6 +1201,12 @@ fn ensure_distribution(
true
};

let add_roundrobin = enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
viirya marked this conversation as resolved.
Show resolved Hide resolved
&& child.plan.output_partitioning().partition_count() < target_partitions;

// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans && repartition_beneficial_stats {
Expand All @@ -1218,33 +1217,26 @@ fn ensure_distribution(
}
}

if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.plan.output_partitioning().partition_count() < target_partitions
{
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired partition
// count.
child = add_roundrobin_on_top(child, target_partitions)?;
}

// Satisfy the distribution requirement if it is unmet.
match requirement {
Distribution::SinglePartition => {
child = add_spm_on_top(child);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although due to would_benefit, single partition won't satisfy the condition to add roundrobin partitioning. This change makes it explicitly clear that single partition cases don't add roundrobin.

}
Distribution::HashPartitioned(exprs) => {
child = add_hash_on_top(
child,
exprs.to_vec(),
target_partitions,
repartition_beneficial_stats,
)?;
if add_roundrobin {
// Add round-robin repartitioning on top of the operator
// to increase parallelism.
child = add_roundrobin_on_top(child, target_partitions)?;
}
child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
}
Distribution::UnspecifiedDistribution => {
if add_roundrobin {
// Add round-robin repartitioning on top of the operator
// to increase parallelism.
child = add_roundrobin_on_top(child, target_partitions)?;
}
}
Distribution::UnspecifiedDistribution => {}
};

// There is an ordering requirement of the operator:
Expand Down Expand Up @@ -1908,7 +1900,7 @@ pub(crate) mod tests {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.enable_round_robin_repartition = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests expect roundrobin partitioning is added, so this config should be enabled.

config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,6 @@ impl ExecutionPlan for SymmetricHashJoinExec {
Ok(children.iter().any(|u| *u))
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false, false]
}
Comment on lines -340 to -342
Copy link
Member Author

@viirya viirya Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this sets all false for SymmetricHashJoinExec. When the mode is StreamJoinPartitionMode::Partitioned, it requires input distributions to be HashPartitioneds.

I think that we don't need to override default function.

Current test case also proves that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roundbound and hashpartition together remove the input order of the SHJ. This is why we added this guard, but it was long ago. I will send a fixing PR for this purpose. If these changes do not affect your tests, can you please keep it as it is and let me delete it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I don't have this change, current test will fail because roundrobin will not be added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roundbound and hashpartition together remove the input order of the SHJ.

I don't see it requires input ordering (i.e., required_input_ordering).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can run without order but may run out of memory. I am planning to add an optional input requirement for it, thus we can keep the order if it is present in the plan before optimization. I think you can delete it, then I'll prepare the PR after this merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Sounds good.


fn required_input_distribution(&self) -> Vec<Distribution> {
match self.mode {
StreamJoinPartitionMode::Partitioned => {
Expand Down
73 changes: 73 additions & 0 deletions datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test locally without the code changes in this PR and this test still passes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I ran the test locally before and it failed. Let me re-verify it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it. I added a few lines before which broken it.

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

##########
# Tests for repartitioning
##########

# Set 4 partitions for deterministic output plans
statement ok
set datafusion.execution.target_partitions = 4;

statement ok
COPY (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) TO 'test_files/scratch/repartition_scan/parquet_table/2.parquet'
viirya marked this conversation as resolved.
Show resolved Hide resolved
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);

statement ok
CREATE EXTERNAL TABLE parquet_table(column1 int, column2 int)
STORED AS PARQUET
LOCATION 'test_files/scratch/repartition_scan/parquet_table/';
viirya marked this conversation as resolved.
Show resolved Hide resolved

# enable round robin repartitioning
statement ok
set datafusion.optimizer.enable_round_robin_repartition = true;

query TT
EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
----
logical_plan
Aggregate: groupBy=[[parquet_table.column1]], aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
--TableScan: parquet_table projection=[column1, column2]
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
--CoalesceBatchesExec: target_batch_size=8192
----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4
------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet]]}, projection=[column1, column2]

# disable round robin repartitioning
statement ok
set datafusion.optimizer.enable_round_robin_repartition = false;

query TT
EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
----
logical_plan
Aggregate: groupBy=[[parquet_table.column1]], aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
--TableScan: parquet_table projection=[column1, column2]
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
--CoalesceBatchesExec: target_batch_size=8192
----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=1
------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)]
Comment on lines +66 to +67
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not see RoundRobinBatch partitioning as enable_round_robin_repartition is disabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently roundrobin partitioning is still added even it is disabled. We should make the config behavior consistent.

--------ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet]]}, projection=[column1, column2]


# Cleanup
statement ok
DROP TABLE parquet_table;
Loading