Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always wrapping OnceAsync for the inner table side in NestedLoopJoinExec #5156

Merged
merged 9 commits into from
Feb 12, 2023

Conversation

ygf11
Copy link
Contributor

@ygf11 ygf11 commented Feb 2, 2023

Which issue does this PR close?

Closes #5022.

Rationale for this change

For the nested loop join, the algorithm will be like:

  for out-row in outer-table
    for inner-row in inner-table
       check-join

We can see, the inner table will be visited many time.
In the current implemetion of NestedLoopJoin, the relationship between inner/outer table and left/right table is not fixed, it is decided by the required distribution:

  • For (UnspecifiedDistribution, SinglePartition), right is the inner table side.
  • For (SinglePartition, UnspecifiedDistribution), left table is the inner table side.

If the table is the inner side, we should wrap OnceAsync for it. There are two reasons:

  • It is better to cache the result of a physical plan that has to be executed many times.
  • Some physical plan can not be executed many time, like RepartitionExec.

The NestedLoopJoinExec always only load/cache(OnceAsync) the left table now, it is not correct(right table may be the inner table side), some queries(#5022) will panic for this reason. we should always load/cache the inner table data.

What changes are included in this PR?

  1. Always load/cache(OnceAsync) the inner table data of nested loop join.
  2. Use the separated logic to handle difference distributions(build-left and build-right).
  3. Remove redundant usage of bitmap, Left/LeftAnti/LeftSemi joins do not need it now.

Are these changes tested?

Yes.

Are there any user-facing changes?

let left_indices = get_anti_u64_indices(count_left_batch, &left_indices);
// the right_indices will not be used later for the `left anti` join
(left_indices, right_indices)
}
Copy link
Contributor Author

@ygf11 ygf11 Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left/LeftSemi/LeftAnti joins do not need the bitmap because the right table is single partition.

.map(|left_row_index| {
build_join_indices(left_row_index, right_batch, left_batch, filter)
})
.collect::<Result<Vec<(UInt64Array, UInt32Array)>>>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of join_indices is always (UInt64Array, UInt32Array) now no matter which side is single partition.

We can do better here:

  • Always use UInt64Array for the single partition side, and the other side is UInt32Array.

@ygf11 ygf11 changed the title Always wrapping OnceFut for the inner table side in NestedLoopJoinExec Always wrapping OnceAsync for the inner table side in NestedLoopJoinExec Feb 2, 2023
load_specified_partition_of_input(0, self.left.clone(), context.clone())
});
let outer_table = self.right.execute(partition, context)?;
(outer_table, inner_table)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix is here, always load/cache the build-side(inner table) data.

@alamb alamb requested a review from liukun4515 February 5, 2023 11:04
@alamb
Copy link
Contributor

alamb commented Feb 5, 2023

@liukun4515 as the author of NestedLookJoinExec do you have time to review this PR?

also cc @mingmwang and @jackwener as reviewers

@jackwener jackwener self-requested a review February 5, 2023 12:58
@liukun4515
Copy link
Contributor

@liukun4515 as the author of NestedLookJoinExec do you have time to review this PR?

also cc @mingmwang and @jackwener as reviewers

I will take a look this PR today

@@ -304,6 +280,14 @@ async fn load_left_specified_partition(
Ok(merged_batch)
}

// BuildLeft means the left relation is the single patrition side.
// For full join, both side are single partition, so it is BuildLeft and BuildRight, treat it as BuildLeft.
pub fn left_is_build_side(join_type: JoinType) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I‘m right, this part is the most important insight for your fix.
If the left can be the single partition, we will iter the right table/partition
If the right is the single partition, we will iter the left table/partition

Copy link
Contributor Author

@ygf11 ygf11 Feb 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the iter way is start from outer-table now. I think:

  • It is more suitable for the meaning of nested loop join.
  • We can optimize the data type of join_indices(not do in this pr). The indices type of inner-table should be UInt64Array, and UInt32Array for the outer-table. Currently left table is UInt64Array, and right table is UInt32Array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We can optimize the data type of join_indices(not do in this pr). The indices type of inner-table should be UInt64Array, and UInt32Array for the outer-table. Currently left table is UInt64Array, and right table is UInt32Array.

Hash join has the same issue.

Before this PR, the NLJ and hash join share some methods in the utils, such as need_produce_result_in_final.
But this PR breaks this, I want to refactor them, Do you have any ideas about that?

In the hash-join, we always build left table and cache the left table(single partition or multi partition), and the right data will be visited iteratively with the INT32 ARRAY as the index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current the indices type are fixed in the following functions, it is not enough, so we should make them more generic first.

apply_join_filter_to_indices
build_batch_from_indices
adjust_indices_by_join_type
get_semi_indices
get_anti_indices

Then we can choose array corresponding the distribution(left and right), like:

  • (Single, Unspecified) -> (UInt64Array, UInt32Array)
  • (Unspecified, Single) -> (UInt32Array, UInt64Array)
  • (UInt32Array, UInt32Array) --- PartitionMode::Partitioned for HashJoin
  • (UInt64Array, UInt64Array) --- Full join in NestedLoopJoin

@@ -352,24 +337,22 @@ fn build_join_indices(
}

impl NestedLoopJoinStream {
fn poll_next_impl(
// For Inner/Left/LeftSemi/LeftAnti joins, left is the single partition side.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should add full join type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meaning of the comment is opposite 🤣.

When left is collected to a single partition, the join type should be Right/RightSemi/RightAnti/Full.

Fixed it.

Copy link
Contributor

@liukun4515 liukun4515 Feb 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meaning of the comment is opposite 🤣.

yes, I also ignore this and just remember the missing type of Full.

Thanks for your reminder and point out the mistake of the comments.

👍

When left is collected to a single partition, the join type should be Right/RightSemi/RightAnti/Full.

Fixed it.

/// right
right: SendableRecordBatchStream,
/// There is nothing to process anymore and left side is processed in case of left/left semi/left anti/full join
// the outer table data of the nested loop join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add more comments to indicate which side belonged to the outer table for the different join type.

Copy link
Contributor Author

@ygf11 ygf11 Feb 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comments to the description of NestedLoopJoinExec.

type JoinLeftData = RecordBatch;

/// NestedLoopJoinExec executes partitions in parallel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@liukun4515 liukun4515 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
@ygf11 thanks for your time to discuss this issue and fix it.

@liukun4515
Copy link
Contributor

I want to left this pr for about one day, waiting for the opinions from other reviewer.

cc @jackwener

Copy link
Member

@jackwener jackwener left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks @ygf11.
I just leave some trivial review.

@@ -917,6 +917,23 @@ pub(crate) fn get_anti_indices(
.collect::<UInt32Array>()
}

/// Get unmatched and deduplicated indices
pub(crate) fn get_anti_u64_indices(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge get_anti_u64_indices/get_semi_u64_indices into one function?
Because they are almost the same except (!bitmap.get_bit(idx))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can merge these two.

Further more, get_semi_indices and get_semi_u64_indices(anti) can also be merged to one function with generic.

I think it is relative to comment #5156 (comment), and @liukun4515 plans to refactor them.

@jackwener jackwener merged commit 4eb1a57 into apache:master Feb 12, 2023
@jackwener
Copy link
Member

Thanks @ygf11 @liukun4515

@ursabot
Copy link

ursabot commented Feb 12, 2023

Benchmark runs are scheduled for baseline = 0c0ed1c and contender = 4eb1a57. 4eb1a57 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ygf11 ygf11 deleted the nlj-fix branch February 14, 2023 01:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

NestedLoopJoin will panic when right child contains RepartitionExec
5 participants