-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanup the usage of round-robin repartitioning #8794
Conversation
if repartition_beneficial_stats { | ||
// Since hashing benefits from partitioning, add a round-robin repartition | ||
// before it: | ||
input = add_roundrobin_on_top(input, n_target)?; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This add_roundrobin_on_top
currently is not controlled by enable_round_robin
.
----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=1 | ||
------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[SUM(parquet_table.column2)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not see RoundRobinBatch
partitioning as enable_round_robin_repartition
is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently roundrobin partitioning is still added even it is disabled. We should make the config behavior consistent.
// count. | ||
child = add_roundrobin_on_top(child, target_partitions)?; | ||
} | ||
|
||
// Satisfy the distribution requirement if it is unmet. | ||
match requirement { | ||
Distribution::SinglePartition => { | ||
child = add_spm_on_top(child); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although due to would_benefit
, single partition won't satisfy the condition to add roundrobin partitioning. This change makes it explicitly clear that single partition cases don't add roundrobin.
fn benefits_from_input_partitioning(&self) -> Vec<bool> { | ||
vec![false, false] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why this sets all false for SymmetricHashJoinExec
. When the mode is StreamJoinPartitionMode::Partitioned
, it requires input distributions to be HashPartitioned
s.
I think that we don't need to override default function.
Current test case also proves that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roundbound and hashpartition together remove the input order of the SHJ. This is why we added this guard, but it was long ago. I will send a fixing PR for this purpose. If these changes do not affect your tests, can you please keep it as it is and let me delete it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I don't have this change, current test will fail because roundrobin will not be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roundbound and hashpartition together remove the input order of the SHJ.
I don't see it requires input ordering (i.e., required_input_ordering
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can run without order but may run out of memory. I am planning to add an optional input requirement for it, thus we can keep the order if it is present in the plan before optimization. I think you can delete it, then I'll prepare the PR after this merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Sounds good.
@@ -1908,7 +1900,7 @@ pub(crate) mod tests { | |||
let distribution_context = DistributionContext::new(plan); | |||
let mut config = ConfigOptions::new(); | |||
config.execution.target_partitions = target_partitions; | |||
config.optimizer.enable_round_robin_repartition = false; | |||
config.optimizer.enable_round_robin_repartition = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests expect roundrobin partitioning is added, so this config should be enabled.
@@ -61,7 +61,7 @@ Filter: parquet_table.column1 != Int32(42) | |||
physical_plan | |||
CoalesceBatchesExec: target_batch_size=8192 | |||
--FilterExec: column1@0 != 42 | |||
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 | |||
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..180], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:180..360], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:360..540], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:540..717]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason this test changed is that the repartition.slt test is writing files into the wrong temp space (see note above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch, I didn't notice that.
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
@@ -0,0 +1,76 @@ | |||
# Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran this test locally without the code changes in this PR and this test still passes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I ran the test locally before and it failed. Let me re-verify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it. I added a few lines before which broken it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR makes sense to me and all tests pass (and there is even added coverage). Thus I think it is a nice improvement.
Thank you @viirya @Dandandan @metesynnada
Co-authored-by: Andrew Lamb <[email protected]>
Thanks @alamb @Dandandan @metesynnada |
Which issue does this PR close?
Closes #.
Rationale for this change
Currently the logic of adding roundrobin partitioning is somehow confusing. This patch tries to clean it up.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?