Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pushdown into cross join #8626

Merged
merged 5 commits into from
Dec 25, 2023
Merged

Filter pushdown into cross join #8626

merged 5 commits into from
Dec 25, 2023

Conversation

mustafasrepo
Copy link
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

Current filters are not pushed down to the cross join (equality predicates are pushed down). For this reason we generate following plan for the query

SELECT *
FROM annotated_data as l, annotated_data as r
WHERE l.a > r.a
FilterExec: filter=a@0 > a@1
--CrossJoinExec: 
----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true

However, if we were to pushdown filter into CrossJoin (by converting it into inner join with filter condition ) we could have produced following plan

NestedLoopJoinExec: join_type=Inner, filter=a@0 > a@1
--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true

which would be more memory efficient especially in large tables. This PR adds this support

What changes are included in this PR?

Are these changes tested?

Yes

Are there any user-facing changes?

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Dec 22, 2023
@alamb alamb changed the title Filter pushdown through cross join Filter pushdown into cross join Dec 22, 2023
@alamb
Copy link
Contributor

alamb commented Dec 22, 2023

I changed the title of this PR as I think it is more correct to say the filter is pushed "into" the join as the filter doesn't go below it

@alamb
Copy link
Contributor

alamb commented Dec 22, 2023

@jackwener or @liukun4515 by any chance do you have time to review this PR?

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just reviewed this and it LGTM as well 🚀

@@ -47,81 +48,93 @@ impl EliminateCrossJoin {
/// For above queries, the join predicate is available in filters and they are moved to
/// join nodes appropriately
/// This fix helps to improve the performance of TPCH Q19. issue#78
Copy link
Member

@viirya viirya Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current doc (of PushDownFilter) doesn't have such example, maybe update it with this change?

return Ok(None);
}
None
}
Copy link
Member

@viirya viirya Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a missing case of reordering joins to eliminate CrossJoin, do we have a test case can cover this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on this so we also address it in the follow-on PR (in case you see a bug or gap here)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add a simple test case for this feature. However, I couldn't write a simple test to reproduce this. However, without this reordering support test fails. If I can come up with a simple test, I will add it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant previously this rule doesn't cover the case of reordering joins to eliminate CrossJoin without a Filter on top of Join (because it did only match LogicalPlan::Filter(filter). So if adding this new matching case LogicalPlan::Join is to address it, it means we don't have test case covering it previously. Then we may need to add one.

fn convert_cross_join_to_inner_join(cross_join: CrossJoin) -> Result<Join> {
let CrossJoin { left, right, .. } = cross_join;
let join_schema = build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?;
// predicate is given
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment put wrong place?

@@ -955,6 +965,36 @@ impl PushDownFilter {
}
}

/// Convert cross join to join by pushing down filter predicate to the join condition
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this function only does "Convert cross join to join"?

"pushing down filter predicate" is done by push_down_all_join. Newbies might be confused by this.

Copy link
Contributor Author

@mustafasrepo mustafasrepo Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I have updated the comment to reflect this

Comment on lines +986 to +996
fn convert_to_cross_join_if_beneficial(plan: LogicalPlan) -> Result<LogicalPlan> {
if let LogicalPlan::Join(join) = &plan {
// Can be converted back to cross join
if join.on.is_empty() && join.filter.is_none() {
return LogicalPlanBuilder::from(join.left.as_ref().clone())
.cross_join(join.right.as_ref().clone())?
.build();
}
}
Ok(plan)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, after push_down_all_join, if there are predicates that cannot be join condition, they will be into a Filter on top of join node. In the case, this function will skip to convert the under join to cross join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more precise filter can be pushed down below the join completely. In this case we may end up with joins empty equality predicate and empty filter condition.
As an example

Filter(l.a>l.b AND r.a>r.b)
--Join (on=[], filter=None)
----LeftTable(a,  b)
----RightTable(a, b)

will be converted to the plan below after push_down_all_join

Join (on=[], filter=None)
--Filter(l.a>l.b)
----LeftTable(a,  b)
--Filter(r.a>r.b)
----RightTable(a, b)

this util ensures that plan above is converted to the plan below

CrossJoin
--Filter(l.a>l.b)
----LeftTable(a,  b)
--Filter(r.a>r.b)
----RightTable(a, b)

However, if the original plan were

Filter(l.a>r.b)
--Join (on=[], filter=None)
----LeftTable(a,  b)
----RightTable(a, b)

after push_down_all_join we will end up with following plan

Join (on=[], filter=Some(l.a>r.b))
--LeftTable(a,  b)
--RightTable(a, b)

in this case join top cannot be converted to the cross join.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if after push_down_all_join there are predicates remaining in Filter, but Join has empty on and empty filter, because you match the plan by if let LogicalPlan::Join(join) = &plan, the Join won't be converted to a Cross Join even looks like it should be (based on the logic here).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, after after push_down_all_join predicates remaining in Filter will be pushed into the join filter. Hence, we are sure that top operator is Join.

Copy link
Member

@viirya viirya Dec 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how push_down_all_join handles remaining predicates in Filter.

if keep_predicates.is_empty() {
  Ok(plan)
} else {
  // wrap the join on the filter whose predicates must be kept
  match conjunction(keep_predicates) {
    Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new( // Filter on top on Join
      predicate,
      Arc::new(plan),  // this is Join
    )?)),
    None => Ok(plan),
  }
}

And after it, convert_to_cross_join_if_beneficial is called. Where does the Filter push down? Am I missing it?

let plan = push_down_all_join(
  predicates,
  vec![],
  &join_plan,
  left,
  right,
  vec![],
  true,
  )?;
convert_to_cross_join_if_beneficial(plan)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, the reason that the predicate is kept in Filter in push_down_all_join is because it cannot be pushed into Join filter or pushed down through Join. How does it push into/push down later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. We will discuss and file a follow-on PR today to fix.

@mustafasrepo mustafasrepo merged commit 3698693 into apache:main Dec 25, 2023
22 checks passed
@mustafasrepo
Copy link
Contributor Author

@viirya I have opened a PR to fix remaining points PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants