-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple optimizer passes #3880
Changes from all commits
c8d9b29
f8e4772
762b559
801def1
8417e7e
ee4cd70
800afc3
beb70d2
c1f5c7b
b41c277
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS re | |
Projection: lineitem.l_extendedprice, lineitem.l_discount | ||
Filter: part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) | ||
Inner Join: lineitem.l_partkey = part.p_partkey | ||
Filter: lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) | ||
Filter: lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") | ||
TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode] | ||
Filter: part.p_size >= Int32(1) AND part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND part.p_size <= Int32(15) | ||
TableScan: part projection=[p_partkey, p_brand, p_size, p_container] | ||
Filter: part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND part.p_size <= Int32(15) AND part.p_size >= Int32(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The order changed here |
||
TableScan: part projection=[p_partkey, p_brand, p_size, p_container] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -394,9 +394,9 @@ order by cntrycode;"#; | |
Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1 | ||
Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] | ||
Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) | ||
TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])]"# | ||
TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[CAST(customer.c_acctbal AS Decimal128(30, 15)) > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)]"# | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Example of an optimization that we previously missed |
||
.to_string(); | ||
assert_eq!(actual, expected); | ||
assert_eq!(expected, actual); | ||
|
||
// assert data | ||
let results = execute_to_batches(&ctx, sql).await; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ use datafusion_common::{DataFusionError, Result}; | |
use datafusion_expr::logical_plan::LogicalPlan; | ||
use log::{debug, trace, warn}; | ||
use std::sync::Arc; | ||
use std::time::Instant; | ||
|
||
/// `OptimizerRule` transforms one ['LogicalPlan'] into another which | ||
/// computes the same results, but in a potentially more efficient | ||
|
@@ -71,6 +72,8 @@ pub struct OptimizerConfig { | |
skip_failing_rules: bool, | ||
/// Specify whether to enable the filter_null_keys rule | ||
filter_null_keys: bool, | ||
/// Maximum number of times to run optimizer against a plan | ||
max_passes: u8, | ||
} | ||
|
||
impl OptimizerConfig { | ||
|
@@ -81,6 +84,7 @@ impl OptimizerConfig { | |
next_id: 0, // useful for generating things like unique subquery aliases | ||
skip_failing_rules: true, | ||
filter_null_keys: true, | ||
max_passes: 3, | ||
} | ||
} | ||
|
||
|
@@ -107,6 +111,12 @@ impl OptimizerConfig { | |
self | ||
} | ||
|
||
/// Specify how many times to attempt to optimize the plan | ||
pub fn with_max_passes(mut self, v: u8) -> Self { | ||
self.max_passes = v; | ||
self | ||
} | ||
|
||
/// Generate the next ID needed | ||
pub fn next_id(&mut self) -> usize { | ||
self.next_id += 1; | ||
|
@@ -189,38 +199,57 @@ impl Optimizer { | |
where | ||
F: FnMut(&LogicalPlan, &dyn OptimizerRule), | ||
{ | ||
let start_time = Instant::now(); | ||
let mut plan_str = format!("{}", plan.display_indent()); | ||
let mut new_plan = plan.clone(); | ||
log_plan("Optimizer input", plan); | ||
let mut i = 0; | ||
while i < optimizer_config.max_passes { | ||
log_plan(&format!("Optimizer input (pass {})", i), &new_plan); | ||
|
||
for rule in &self.rules { | ||
let result = rule.optimize(&new_plan, optimizer_config); | ||
match result { | ||
Ok(plan) => { | ||
new_plan = plan; | ||
observer(&new_plan, rule.as_ref()); | ||
log_plan(rule.name(), &new_plan); | ||
} | ||
Err(ref e) => { | ||
if optimizer_config.skip_failing_rules { | ||
// Note to future readers: if you see this warning it signals a | ||
// bug in the DataFusion optimizer. Please consider filing a ticket | ||
// https://github.com/apache/arrow-datafusion | ||
warn!( | ||
for rule in &self.rules { | ||
let result = rule.optimize(&new_plan, optimizer_config); | ||
match result { | ||
Ok(plan) => { | ||
new_plan = plan; | ||
observer(&new_plan, rule.as_ref()); | ||
log_plan(rule.name(), &new_plan); | ||
} | ||
Err(ref e) => { | ||
if optimizer_config.skip_failing_rules { | ||
// Note to future readers: if you see this warning it signals a | ||
// bug in the DataFusion optimizer. Please consider filing a ticket | ||
// https://github.com/apache/arrow-datafusion | ||
warn!( | ||
"Skipping optimizer rule '{}' due to unexpected error: {}", | ||
rule.name(), | ||
e | ||
); | ||
} else { | ||
return Err(DataFusionError::Internal(format!( | ||
"Optimizer rule '{}' failed due to unexpected error: {}", | ||
rule.name(), | ||
e | ||
))); | ||
} else { | ||
return Err(DataFusionError::Internal(format!( | ||
"Optimizer rule '{}' failed due to unexpected error: {}", | ||
rule.name(), | ||
e | ||
))); | ||
} | ||
} | ||
} | ||
} | ||
log_plan(&format!("Optimized plan (pass {})", i), &new_plan); | ||
|
||
// TODO this is an expensive way to see if the optimizer did anything and | ||
// it would be better to change the OptimizerRule trait to return an Option | ||
// instead | ||
let new_plan_str = format!("{}", new_plan.display_indent()); | ||
if plan_str == new_plan_str { | ||
// plan did not change, so no need to continue trying to optimize | ||
debug!("optimizer pass {} did not make changes", i); | ||
break; | ||
} | ||
Comment on lines
+239
to
+247
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1! This is probably a follow-up item; but instead of having the optimizer decide on this (by returning an option), it might also make sense to compute a unique plan id (bottom up) so that we can also use this to detect optimization cycles. A very basic example is (assuming each letter is a unique plan id) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a great idea. Thanks @isidentical. I will write up an issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed #3892 to track this |
||
plan_str = new_plan_str; | ||
i += 1; | ||
} | ||
log_plan("Optimized plan", &new_plan); | ||
log_plan("Final optimized plan", &new_plan); | ||
debug!("Optimizer took {} ms", start_time.elapsed().as_millis()); | ||
Ok(new_plan) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -135,7 +135,7 @@ fn optimize_plan( | |
_optimizer_config: &OptimizerConfig, | ||
) -> Result<LogicalPlan> { | ||
let mut new_required_columns = required_columns.clone(); | ||
match plan { | ||
let new_plan = match plan { | ||
LogicalPlan::Projection(Projection { | ||
input, | ||
expr, | ||
|
@@ -509,7 +509,25 @@ fn optimize_plan( | |
|
||
from_plan(plan, &expr, &new_inputs) | ||
} | ||
} | ||
}; | ||
|
||
// when this rule is applied multiple times it will insert duplicate nested projections, | ||
// so we catch this here | ||
let with_dupe_projection_removed = match new_plan? { | ||
LogicalPlan::Projection(p) => match p.input.as_ref() { | ||
LogicalPlan::Projection(p2) if projection_equal(&p, p2) => { | ||
LogicalPlan::Projection(p2.clone()) | ||
} | ||
_ => LogicalPlan::Projection(p), | ||
}, | ||
other => other, | ||
}; | ||
Comment on lines
+516
to
+524
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the fix for #3881 |
||
|
||
Ok(with_dupe_projection_removed) | ||
} | ||
|
||
fn projection_equal(p: &Projection, p2: &Projection) -> bool { | ||
p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) | ||
} | ||
|
||
#[cfg(test)] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order changed here