Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always wrapping OnceAsync for the inner table side in NestedLoopJoinExec #5156

Merged
merged 9 commits into from
Feb 12, 2023
21 changes: 20 additions & 1 deletion datafusion/core/src/physical_plan/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,27 @@ use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::coalesce_batches::concat_batches;

/// Data of the left side
/// Data of the inner table side
type JoinLeftData = RecordBatch;

/// NestedLoopJoinExec executes partitions in parallel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// One input will be collected to a single partition, call it inner-table.
/// The other side of the input is treated as outer-table, and the output Partitioning is from it.
/// Giving an output partition number x, the execution will be:
///
/// ```text
/// for outer-table-batch in outer-table-partition-x
/// check-join(outer-table-batch, inner-table-data)
/// ```
///
/// One of the inputs will become inner table, and it is decided by the join type.
/// Following is the relation table:
///
/// ```text
/// JoinType Distribution Inner-table
/// Inner/Left/LeftSemi/LeftAnti (UnspecifiedDistribution, SinglePartition) right
/// Right/RightSemi/RightAnti/Full (SinglePartition, UnspecifiedDistribution) left
/// ```
ygf11 marked this conversation as resolved.
Show resolved Hide resolved
///
#[derive(Debug)]
pub struct NestedLoopJoinExec {
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3303,6 +3303,7 @@ async fn two_in_subquery_to_join_with_outer_filter() -> Result<()> {
async fn right_as_inner_table_nested_loop_join() -> Result<()> {
let ctx = create_nested_loop_join_context()?;

// Distribution: left is `UnspecifiedDistribution`, right is `SinglePartition`.
let sql = "SELECT t1.t1_id, t2.t2_id
ygf11 marked this conversation as resolved.
Show resolved Hide resolved
FROM t1 INNER JOIN t2 ON t1.t1_id > t2.t2_id
WHERE t1.t1_id > 10 AND t2.t2_int > 1";
Expand Down Expand Up @@ -3349,9 +3350,10 @@ async fn right_as_inner_table_nested_loop_join() -> Result<()> {
}

#[tokio::test]
async fn left_as_inner_table_nested_loop_join1() -> Result<()> {
async fn left_as_inner_table_nested_loop_join() -> Result<()> {
let ctx = create_nested_loop_join_context()?;

// Distribution: left is `SinglePartition`, right is `UnspecifiedDistribution`.
let sql = "SELECT t1.t1_id,t2.t2_id FROM (select t1_id from t1 where t1.t1_id > 22) as t1
ygf11 marked this conversation as resolved.
Show resolved Hide resolved
RIGHT JOIN (select t2_id from t2 where t2.t2_id > 11) as t2
ON t1.t1_id < t2.t2_id";
Expand Down