Skip to content

Commit

Permalink
[Bug Fix]: Deem hash repartition unnecessary when input and output ha…
Browse files Browse the repository at this point in the history
…s 1 partition (apache#10095)

* Add input partition number check

* Minor changes
  • Loading branch information
mustafasrepo authored and Omega359 committed Apr 16, 2024
1 parent bbf1a26 commit 80a9442
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,8 @@ fn add_hash_on_top(
n_target: usize,
) -> Result<DistributionContext> {
// Early return if hash repartition is unnecessary
if n_target == 1 {
// `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
return Ok(input);
}

Expand Down
101 changes: 101 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3587,3 +3587,104 @@ SELECT 1 FROM join_partitioned_table JOIN (SELECT c1 AS id1 FROM join_partitione
1
1
1


statement ok
set datafusion.explain.logical_plan_only = false;

query TT
EXPLAIN SELECT * FROM (
SELECT 1 as c, 2 as d
UNION ALL
SELECT 1 as c, 3 AS d
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
----
logical_plan
01)Projection: a.c, a.d, rhs.e, rhs.f
02)--Full Join: a.c = rhs.e
03)----SubqueryAlias: a
04)------Union
05)--------Projection: Int64(1) AS c, Int64(2) AS d
06)----------EmptyRelation
07)--------Projection: Int64(1) AS c, Int64(3) AS d
08)----------EmptyRelation
09)----SubqueryAlias: rhs
10)------Projection: Int64(1) AS e, Int64(3) AS f
11)--------EmptyRelation
physical_plan
01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
04)------CoalesceBatchesExec: target_batch_size=2
05)--------RepartitionExec: partitioning=Hash([e@0], 2), input_partitions=1
06)----------ProjectionExec: expr=[1 as e, 3 as f]
07)------------PlaceholderRowExec
08)------CoalesceBatchesExec: target_batch_size=2
09)--------RepartitionExec: partitioning=Hash([c@0], 2), input_partitions=2
10)----------UnionExec
11)------------ProjectionExec: expr=[1 as c, 2 as d]
12)--------------PlaceholderRowExec
13)------------ProjectionExec: expr=[1 as c, 3 as d]
14)--------------PlaceholderRowExec

query IIII
SELECT * FROM (
SELECT 1 as c, 2 as d
UNION ALL
SELECT 1 as c, 3 AS d
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
----
1 2 1 3
1 3 1 3

statement ok
set datafusion.execution.target_partitions = 1;

query TT
EXPLAIN SELECT * FROM (
SELECT 1 as c, 2 as d
UNION ALL
SELECT 1 as c, 3 AS d
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
----
logical_plan
01)Projection: a.c, a.d, rhs.e, rhs.f
02)--Full Join: a.c = rhs.e
03)----SubqueryAlias: a
04)------Union
05)--------Projection: Int64(1) AS c, Int64(2) AS d
06)----------EmptyRelation
07)--------Projection: Int64(1) AS c, Int64(3) AS d
08)----------EmptyRelation
09)----SubqueryAlias: rhs
10)------Projection: Int64(1) AS e, Int64(3) AS f
11)--------EmptyRelation
physical_plan
01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
04)------ProjectionExec: expr=[1 as e, 3 as f]
05)--------PlaceholderRowExec
06)------CoalesceBatchesExec: target_batch_size=2
07)--------RepartitionExec: partitioning=Hash([c@0], 1), input_partitions=2
08)----------UnionExec
09)------------ProjectionExec: expr=[1 as c, 2 as d]
10)--------------PlaceholderRowExec
11)------------ProjectionExec: expr=[1 as c, 3 as d]
12)--------------PlaceholderRowExec

query IIII
SELECT * FROM (
SELECT 1 as c, 2 as d
UNION ALL
SELECT 1 as c, 3 AS d
) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
----
1 2 1 3
1 3 1 3

statement ok
set datafusion.explain.logical_plan_only = true;

statement ok
set datafusion.execution.target_partitions = 2;

0 comments on commit 80a9442

Please sign in to comment.