Skip to content

Commit

Permalink
Support join filter for SortMergeJoin
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 31, 2024
1 parent 851bc7d commit 6dbfed6
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ fn adjust_input_keys_ordering(
left,
right,
on,
filter,
join_type,
sort_options,
null_equals_null,
Expand All @@ -356,6 +357,7 @@ fn adjust_input_keys_ordering(
left.clone(),
right.clone(),
new_conditions.0,
filter.clone(),
*join_type,
new_conditions.1,
*null_equals_null,
Expand Down Expand Up @@ -635,6 +637,7 @@ pub(crate) fn reorder_join_keys_to_inputs(
left,
right,
on,
filter,
join_type,
sort_options,
null_equals_null,
Expand Down Expand Up @@ -664,6 +667,7 @@ pub(crate) fn reorder_join_keys_to_inputs(
left.clone(),
right.clone(),
new_join_on,
filter.clone(),
*join_type,
new_sort_options,
*null_equals_null,
Expand Down Expand Up @@ -1642,6 +1646,7 @@ pub(crate) mod tests {
left,
right,
join_on.clone(),
None,
*join_type,
vec![SortOptions::default(); join_on.len()],
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ fn try_swapping_with_sort_merge_join(
Arc::new(new_left),
Arc::new(new_right),
new_on,
sm_join.filter.clone(),
sm_join.join_type,
sm_join.sort_options.clone(),
sm_join.null_equals_null,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub fn sort_merge_join_exec(
left,
right,
join_on.clone(),
None,
*join_type,
vec![SortOptions::default(); join_on.len()],
false,
Expand Down
26 changes: 12 additions & 14 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,7 @@ impl DefaultPhysicalPlanner {
};

let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join;

if join_on.is_empty() {
// there is no equal join condition, use the nested loop join
// TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
Expand All @@ -1126,20 +1127,17 @@ impl DefaultPhysicalPlanner {
{
// Use SortMergeJoin if hash join is not preferred
// Sort-Merge join support currently is experimental
if join_filter.is_some() {
// TODO SortMergeJoinExec need to support join filter
not_impl_err!("SortMergeJoinExec does not support join_filter now.")
} else {
let join_on_len = join_on.len();
Ok(Arc::new(SortMergeJoinExec::try_new(
physical_left,
physical_right,
join_on,
*join_type,
vec![SortOptions::default(); join_on_len],
null_equals_null,
)?))
}

let join_on_len = join_on.len();
Ok(Arc::new(SortMergeJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
*join_type,
vec![SortOptions::default(); join_on_len],
null_equals_null,
)?))
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& prefer_hash_join {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async fn run_join_test(
left,
right,
on_columns.clone(),
None,
join_type,
vec![SortOptions::default(), SortOptions::default()],
false,
Expand Down
Loading

0 comments on commit 6dbfed6

Please sign in to comment.