Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hash join for nested types #11232

Merged
merged 7 commits into from
Jul 5, 2024
Merged

Conversation

eejbyfeldt
Copy link
Contributor

@eejbyfeldt eejbyfeldt commented Jul 2, 2024

Which issue does this PR close?

Does some follow up to #10749 and it PR #11117 and follow up #11149.

Closes #11233

Rationale for this change

In #11117 hash join was extended to handled nested types. The fixed contained a small issues around handling of null_equals_null where it only used the new compare function when null_equals_null was true. But it also used the Operator::Eq which does not consider nulls equal.

What changes are included in this PR?

This address the 2 described issues. Makes hash join also support nested types when null_equals_null is false and use the correct Operator when it is true.

Are these changes tested?

New test cases.

Are there any user-facing changes?

Yes, it fixes bugs in unreleased features.

@eejbyfeldt eejbyfeldt marked this pull request as ready for review July 2, 2024 21:09
Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks good. It'd be better if we can add some e2e tests in sqllogictest.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jul 3, 2024
@eejbyfeldt
Copy link
Contributor Author

The fix looks good. It'd be better if we can add some e2e tests in sqllogictest.

@viirya Good suggestion. I tried added some sqllogictest, they work. But as far as I can tell they are not using HashJoinExec (even if the exact same query joining on only field is). Is there some way to force a HashJoin or help me understand why hashjoin is not used?

@viirya
Copy link
Member

viirya commented Jul 3, 2024

@viirya Good suggestion. I tried added some sqllogictest, they work. But as far as I can tell they are not using HashJoinExec (even if the exact same query joining on only field is). Is there some way to force a HashJoin or help me understand why hashjoin is not used?

Hmm, what join operator it is using? I think HashJoin is used by default.

@eejbyfeldt
Copy link
Contributor Author

@viirya Good suggestion. I tried added some sqllogictest, they work. But as far as I can tell they are not using HashJoinExec (even if the exact same query joining on only field is). Is there some way to force a HashJoin or help me understand why hashjoin is not used?

Hmm, what join operator it is using? I think HashJoin is used by default.

Just using = and an inner join. Here are the same joins in datafusion-cli. The one using struct uses NestedLoopJoin while the one using the id direcly uses the HashJoin

> CREATE TABLE join_t3(s3 struct<id INT>)
  AS VALUES
  (NULL),
  (struct(1)),
  (struct(2));

0 row(s) fetched. 
Elapsed 0.003 seconds.

> CREATE TABLE join_t4(s4 struct<id INT>)
  AS VALUES
  (NULL),
  (struct(2)),
  (struct(3));

0 row(s) fetched. 
Elapsed 0.002 seconds.

> explain analyze select join_t3.s3, join_t4.s4
from join_t3
inner join join_t4 on join_t3.s3 = join_t4.s4;
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                       |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | NestedLoopJoinExec: join_type=Inner, filter=s3@0 = s4@1, metrics=[output_rows=1, output_batches=1, build_input_batches=1, input_rows=3, build_input_rows=3, input_batches=1, build_mem_used=266, join_time=270.817µs, build_time=34.164µs] |
|                   |   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                |
|                   |   MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                |
|                   |                                                                                                                                                                                                                                            |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.002 seconds.

> explain analyze select join_t3.s3, join_t4.s4
from join_t3
inner join join_t4 on join_t3.s3.id = join_t4.s4.id;
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                       |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=13.975µs]                                                                                                                                                                                                             |
|                   |   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t3.s3[id]@1, join_t4.s4[id]@1)], projection=[s3@0, s4@2], metrics=[output_rows=1, output_batches=3, build_input_batches=3, input_rows=3, build_input_rows=3, input_batches=3, build_mem_used=2146, join_time=141.757µs, build_time=647.423µs] |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=3, elapsed_compute=68.41µs]                                                                                                                                                                                                          |
|                   |       RepartitionExec: partitioning=Hash([join_t3.s3[id]@1], 16), input_partitions=1, metrics=[fetch_time=50.816µs, repart_time=84.96µs, send_time=16.675µs]                                                                                                                                               |
|                   |         ProjectionExec: expr=[s3@0 as s3, get_field(s3@0, id) as join_t3.s3[id]], metrics=[output_rows=3, elapsed_compute=28.443µs]                                                                                                                                                                        |
|                   |           MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                                        |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=3, elapsed_compute=129.626µs]                                                                                                                                                                                                        |
|                   |       RepartitionExec: partitioning=Hash([join_t4.s4[id]@1], 16), input_partitions=1, metrics=[fetch_time=9.288µs, repart_time=36.86µs, send_time=11.054µs]                                                                                                                                                |
|                   |         ProjectionExec: expr=[s4@0 as s4, get_field(s4@0, id) as join_t4.s4[id]], metrics=[output_rows=3, elapsed_compute=4.569µs]                                                                                                                                                                         |
|                   |           MemoryExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                                                                                                                                                        |
|                   |                                                                                                                                                                                                                                                                                                            |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.003 seconds.

@eejbyfeldt
Copy link
Contributor Author

Reading the explain more closely I see that the reason for it not using HashJoin seems to be that it fails to extract an equi-join condition when it is joining on structs.

@eejbyfeldt
Copy link
Contributor Author

Seems like we need to add struct here: https://github.com/eejbyfeldt/datafusion/blob/2b851c5bbdaef7d1d78d91833d2a7e8d94809bc8/datafusion/expr/src/utils.rs#L830-L833 for hash join to be used.

@github-actions github-actions bot added the logical-expr Logical plan and expressions label Jul 3, 2024
Comment on lines 1354 to 1360
# Join on struct
query ??
select join_t3.s3, join_t4.s4
from join_t3
inner join join_t4 on join_t3.s3 = join_t4.s4
----
{id: 2} {id: 2}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also add an explain query to make sure it is using HashJoin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for this query. The one using IS NOT DISTINCT FROM is still not using hash join. But that does not seem to be related to the struct, since it the same if just joining on a int.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so the query using IS NOT DISTINCT FROM is actually not testing HashJoin path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly not. If we made some other improvements like #11272 it would.

But for not I just changed the test case to use EXCEPT instead and test that path. (The test case fails on master without the fixes in the branches)

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work @eejbyfeldt

@comphead comphead merged commit 7df000a into apache:main Jul 5, 2024
23 checks passed
comphead pushed a commit to comphead/arrow-datafusion that referenced this pull request Jul 8, 2024
* Fixes to 10749 and generalization

* Add e2e tests for joins on struct

* PR comments

* Add Struct to can_hash method

* Add explain query as well

* Use EXCEPT to trigger failure

* Update datafusion/sqllogictest/test_files/joins.slt

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
* Fixes to 10749 and generalization

* Add e2e tests for joins on struct

* PR comments

* Add Struct to can_hash method

* Add explain query as well

* Use EXCEPT to trigger failure

* Update datafusion/sqllogictest/test_files/joins.slt

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

HashJoin for nested types give wrong results
3 participants