Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimizer pass to reduce left/right/full joins to inner join if possible #2750

Merged
merged 9 commits into from
Jun 27, 2022

Conversation

HuSen8891
Copy link
Contributor

@HuSen8891 HuSen8891 commented Jun 19, 2022

Which issue does this PR close?

close #2757

Rationale for this change

try to reduce left/right/full join to inner join

for query: select ... from a left join b on ... where b.xx = 100;
if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
Therefore, there is no need to produce null rows for output, we can use
inner join instead of left join.

Generally, right join/full join can also be reduced to inner join according to these rules.

What changes are included in this PR?

add reduce_outer_plan to try to reduce left/right/full join to inner join in src/planner.rs

@github-actions github-actions bot added core Core DataFusion crate sql SQL Planner labels Jun 19, 2022
@codecov-commenter
Copy link

codecov-commenter commented Jun 19, 2022

Codecov Report

Merging #2750 (9f13704) into master (533e2b4) will increase coverage by 0.08%.
The diff coverage is 99.66%.

@@            Coverage Diff             @@
##           master    #2750      +/-   ##
==========================================
+ Coverage   85.11%   85.20%   +0.08%     
==========================================
  Files         273      274       +1     
  Lines       48240    48543     +303     
==========================================
+ Hits        41060    41361     +301     
- Misses       7180     7182       +2     
Impacted Files Coverage Δ
datafusion/optimizer/src/reduce_outer_join.rs 99.39% <99.39%> (ø)
datafusion/core/src/execution/context.rs 78.56% <100.00%> (+0.02%) ⬆️
datafusion/core/tests/sql/joins.rs 99.31% <100.00%> (+0.20%) ⬆️
datafusion/expr/src/logical_plan/plan.rs 74.20% <0.00%> (-0.20%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 533e2b4...9f13704. Read the comment docs.

Copy link
Member

@waynexia waynexia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the code and test and they look good to me. Thanks @AssHero

Comment some style things and a question.

/// Recursively traversese expr, if expr returns false when
/// any inputs are null, treats columns of both sides as nonnullable columns.
///
/// For and/or expr, extracts from all sub exprs and merges the columns.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here is

Suggested change
/// For and/or expr, extracts from all sub exprs and merges the columns.
/// For and expr, extracts from all sub exprs and merges the columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here is

For And and Or exprs, we need to extract columns from all sub exprs.

fn reduce_outer_join(
plan: &LogicalPlan,
nonnullable_cols: &Vec<Column>,
) -> Result<Option<LogicalPlan>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change this function to modify plan in place like

fn reduce_outer_join(plan: mut LogicalPlan, ...)  -> Result<LogicalPlan>;

This might simplify the match

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change this function to modify plan in place like

fn reduce_outer_join(plan: mut LogicalPlan, ...)  -> Result<LogicalPlan>;

This might simplify the match

inplace update is better, and I'll check it later.

@@ -784,6 +784,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;

// reduce outer joins
let plans = reduce_outer_joins(plans, &filter_expr)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that whether this should be done here, or somewhere else like in optimize phase. Other parts are great ❤️ cc @alamb

Copy link
Contributor Author

@HuSen8891 HuSen8891 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that whether this should be done here, or somewhere else like in optimize phase. Other parts are great ❤️ cc @alamb

Here we have joins and quals from where and these make reduce possible. Please give me your suggestions, thanks!

extract_nonnullable_columns(right, columns, false)
}
Operator::And => {
if !top_level {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will something like Not(And(a > 10, b < 10)) hits this check and gets ignored?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will something like Not(And(a > 10, b < 10)) hits this check and gets ignored?

YES. If not from top_level, we can handle And expr as Or expr, this is better.

@andygrove
Copy link
Member

@AssHero Since this is a significant feature could you file an issue for it so that it gets included in the change logs.

let results = execute_to_batches(&ctx, sql).await;
assert_batches_sorted_eq!(expected, &results);

// could not reduce, use left join
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split the tests out into separate test methods?

@@ -784,6 +784,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;

// reduce outer joins
let plans = reduce_outer_joins(plans, &filter_expr)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it should be implemented as an optimization rule rather than in the SQL query planner and this would mean that we would get the same optimizations regardless of whether the query was created from SQL or from the DataFrame API.

@HuSen8891
Copy link
Contributor Author

@AssHero Since this is a significant feature could you file an issue for it so that it gets included in the change logs.

yes, I'll file an issue.

@HuSen8891
Copy link
Contributor Author

Split the test cases into single test method. Implementing this as optimization rule is in progress.

@alamb
Copy link
Contributor

alamb commented Jun 22, 2022

Thank you -- @AssHero I will put this on my list to review carefully tomorrow. This optimization in general is quite tricky so I want to make sure I give it my full attention

@github-actions github-actions bot added the optimizer Optimizer rules label Jun 23, 2022
@HuSen8891
Copy link
Contributor Author

HuSen8891 commented Jun 23, 2022

Implementing reduce outer join as optimization rule. More improving is in progress.
The test part of this rule will be added later.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @AssHero -- this is looking cool. I haven't had a chance to follow it all yet but I have some suggestions.

👍

/// For IS NOT NULL/NOT expr, always returns false for NULL input.
/// extracts columns from these exprs.
/// For all other exprs, fall through
fn extract_nonnullable_columns(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using Expr::nullable instead? That seems very similar / the same as what you are doing with this function (trying to determine if an expression can be nullable / non nullable)?

https://github.com/apache/arrow-datafusion/blob/d985c0a3b6d96b02028f3abb6edb361ea72cac14/datafusion/expr/src/expr_schema.rs#L148-L211

If Expr::nullable doesn't work correctly I think we should update that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extract_nonnullable_columns gets all columns of these exprs which returns false if inputs are null, and we use these columns to see if they appear in null rows of output for outer join. If so, this means these null rows does not meet the conditions, we can filter null rows, reduce outer to inner. So I think the purpose of extract_nonnullable_columns is not the same as Expr::nullable.

@@ -1229,6 +1230,7 @@ impl SessionState {
if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(ReduceOuterJoin::new()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pass should probably be applied after FilterPushdown, so that as many predicates as possible are available

Copy link
Contributor Author

@HuSen8891 HuSen8891 Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.

For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
after FilterPushdown, the logical plan is

| Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6
| -Inner Join: #a.c2 = #tt3.c5
| ---Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a
| ----Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a
| -----Inner Join: #tt1.c1 = #tt2.c3
| ------TableScan: tt1 projection=Some([c1, c2])
| ------Filter: #tt2.c4 < Int64(100)
| -------TableScan: tt2 projection=Some([c3, c4])
| ---TableScan: tt3 projection=Some([c5, c6])

we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.

Please let me know if I miss something?

{
let mut left_nonnullable = false;
let mut right_nonnullable = false;
for col in nonnullable_cols.iter_mut() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why iter_mut and not iter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me fix this! Thanks!

let left_plan = reduce_outer_join(
_optimizer,
&join.left,
nonnullable_cols,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is a problem to use the same nonnullable_cols here as the call to reduce_outer_join could add new columns if a fitler is encountered somewhere down the left side (e.g. in a subquery). Maybe a clone() should be passed here (or the length remembered and truncated prior to calling reduce_outer_join on the right side)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recognize this problem after pushing the code, use clone() here.

null_equals_null: join.null_equals_null,
}))
}
LogicalPlan::Projection(Projection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Sorts can also rewrite column names?

Copy link
Contributor Author

@HuSen8891 HuSen8891 Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for such query
select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;

The a.c4 can be used to reduce left join to inner join, but we need to know that a.c4 is corresponding with tt2.c4.

I need to think about the case with sort.

assert_batches_sorted_eq!(expected, &results);

Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some negative tests might be good -- like that

    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_id IS NULL";

isn't rewritten to an inner join

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add more test cases in test part of this rule, including this.

@HuSen8891 HuSen8891 requested a review from alamb June 27, 2022 12:07
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you again @AssHero -- I think the test coverage in this PR is quite 👍 and we can iterate it on master as we go forward.

@alamb
Copy link
Contributor

alamb commented Jun 27, 2022

There was a logical conflict in this PR with #2789

I took the liberty of fixing the conflicts in d0f1f83 and 9f13704 and pushing a fix to this branch

@alamb alamb changed the title try to reduce left/right/full join to inner join Add optimizer pass to reduce left/right/full joins to inner join Jun 27, 2022
@alamb alamb changed the title Add optimizer pass to reduce left/right/full joins to inner join Add optimizer pass to reduce left/right/full joins to inner join if possible Jun 27, 2022
@alamb alamb merged commit c8de85e into apache:master Jun 27, 2022
@HuSen8891 HuSen8891 deleted the reduce_outer_join branch June 28, 2022 01:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce outer joins
5 participants