-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic support for IN
and NOT IN
Subqueries by rewriting them to SEMI
/ ANTI
Join
#2421
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @korowa this is looking good. This actually adds support for benchmark queries 16 and 18, which previously were not supported.
If you want to, you could enable these queries in the benchmark test at benchmarks/src/bin/tpch.rs
by adding these tests. This could also be done as a separate PR.
#[tokio::test]
async fn run_q16() -> Result<()> {
run_query(16).await
}
#[tokio::test]
async fn run_q18() -> Result<()> {
run_query(18).await
}
})?; | ||
|
||
if !subqueries_in_regular.is_empty() { | ||
return Err(DataFusionError::NotImplemented( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just revert to the original query here rather than fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All fixed. My idea was to give user some explanation about why query is "incorrect" by throwing errors.
Now I see that if DF is able to produce this kind of logical plan, then it's valid (at least for some purposes maybe), even if we don't have physical implementation for some of its parts yet.
let right_key = right_schema.field(0).qualified_column(); | ||
let left_key = match *expr.clone() { | ||
Expr::Column(col) => col, | ||
_ => return Err(DataFusionError::NotImplemented( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here. Can we just abort the optimization attempt rather than fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also fixed
Ok(()) | ||
} | ||
_ => Err(DataFusionError::Plan( | ||
"Unknown expression while rewriting subquery to joins" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also fixed
)), | ||
}; | ||
|
||
let join_type = match negated { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use if/else here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done
}; | ||
|
||
let schema = build_join_schema( | ||
new_input.schema(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use optimized_input.schema()
and avoid creating a mut
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to try_fold, no mutable variables required now
null_equals_null: false, | ||
}); | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and return result here to use below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now folding
IN
and NOT IN
Subqueries by rewriting them to SEMI
/ ANTI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @korowa. Really nice work. 🏅
This is very cool -- both good code as well as well tested.
@@ -1074,6 +1074,16 @@ mod tests { | |||
run_query(14).await | |||
} | |||
|
|||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
@@ -305,7 +267,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> { | |||
LogicalPlan::Analyze { .. } => push_down(&state, plan), | |||
LogicalPlan::Filter(Filter { input, predicate }) => { | |||
let mut predicates = vec![]; | |||
split_members(predicate, &mut predicates); | |||
utils::split_conjunction(predicate, &mut predicates); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -556,6 +556,41 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> { | |||
} | |||
} | |||
|
|||
/// converts "A AND B AND C" => [A, B, C] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for moving these functions into utils
)), | ||
}; | ||
|
||
let join_type = if *negated { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
.build()?; | ||
|
||
let expected = "Projection: #test.b [b:UInt32]\ | ||
\n Anti Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't tell from this plan if the predicate is correct (because #test
is used as the relation name in both the inner and outer query.
It might make these tests more readable if the relation name in the subquery was something different (like test_sq
) so that this join predicate appears as #test.c = #sq_ test.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - now table names in tests are different, so it should by much easier to read
.build()?; | ||
|
||
let expected = "Projection: #test.b [b:UInt32]\ | ||
\n Semi Join: #test.b = #test.a [a:UInt32, b:UInt32, c:UInt32]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
assert_optimized_plan_eq(&plan, expected); | ||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would also be helpful to coverage of the negative cases (aka cases that can't be rewritten like x IN (select ...) OR y = 5
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added cases for unsupported filter expression and for filters input being rewritten while filter remains untouched (checks that falling back to original query doesn't affect its recursive call results)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really nice work @korowa -- thank you so much.
.project(vec![col("test.b")])? | ||
.build()?; | ||
|
||
let expected = "Projection: #test.b [b:UInt32]\ | ||
\n Semi Join: #test.c = #test.c [a:UInt32, b:UInt32, c:UInt32]\ | ||
\n Semi Join: #test.c = #sq.c [a:UInt32, b:UInt32, c:UInt32]\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
much easier to read -- thank you
} | ||
|
||
/// Test for filter input modification in case filter not supported | ||
/// Outer filter expression not modified while inner converted to join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks for reviews! I hope I'll follow up with PR(s?) for currently unsupported cases soon. |
IN
and NOT IN
Subqueries by rewriting them to SEMI
/ ANTI
IN
and NOT IN
Subqueries by rewriting them to SEMI
/ ANTI
Join
Which issue does this PR close?
Partially #488.
Rationale for this change
Naive implementation of optimizer rule for replacing InSubquery with join, which allows to execute queries with IN (subquery) in case of proper WHERE condition.
What changes are included in this PR?
SubqueryFilterToJoin rule is able to replace Filter input in logical plan with Sem/AntiJoin in case InSubquery is a part of logical conjunction - this precondition allows to pushdown IN predicate before other predicates in Filter.
Cases when IN (subquery) cannot be pushed to Filters input due to its result being required for predicate evaluation, are handled by returning NotImplemented error for now.
Are there any user-facing changes?
Queries with IN (subquery) predicates start executing for described above filter combinations