Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[minior fix]: adjust the projection statistics #7428

Merged
merged 3 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1500,15 +1500,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
--------CoalescePartitionsExec
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this pr, the t1 and t2 has the statistics in the projection exe.

The table of t2 project the t2_id, the table of t1 project the t1_id and t1_name, hence the cost of t1 is greater than t2.

Collect the left in the join, use the t2 as the building table.

------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
----------CoalescePartitionsExec
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.optimizer.repartition_joins = true;
Expand All @@ -1527,18 +1528,19 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 2), input_partitions=2
------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 2), input_partitions=2
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 2), input_partitions=2
--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 2), input_partitions=2
--------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]

# Left side expr key inner join

Expand Down Expand Up @@ -1619,10 +1621,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@1)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)]
----------CoalescePartitionsExec
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]

Expand All @@ -1644,15 +1649,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@1)]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 2), input_partitions=2
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 2), input_partitions=2
--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down
46 changes: 24 additions & 22 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum
----------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
--CoalesceBatchesExec: target_batch_size=8192
----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
------CoalesceBatchesExec: target_batch_size=8192
--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), t2_id@1 as t2_id]
----CoalesceBatchesExec: target_batch_size=8192
------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]

query II rowsort
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
Expand All @@ -211,17 +212,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2
----------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@1 as t2_sum]
--CoalesceBatchesExec: target_batch_size=8192
----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
------CoalesceBatchesExec: target_batch_size=8192
--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id]
----CoalesceBatchesExec: target_batch_size=8192
------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
--------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]

query IR rowsort
SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
Expand Down
Loading