diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index fc1f090deaec..2b2d216bcc86 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -504,7 +504,64 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }), ) } - LogicalPlan::SubqueryAlias(SubqueryAlias { .. }) => push_down(&state, plan), + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, input, .. }) => { + // println!("before: {:?}", schema); + // println!("input: {:?}", &*input); + // println!("state: {:?}", &state); + + // let t = match &**input { + // LogicalPlan::TableScan(TableScan { table_name, .. }) => { + // Some(table_name.clone()) + // } + // _ => None, + // }; + + let new_filters: Result)>> = state + .filters + .iter() + .map(|pair| { + let mut replace: HashMap<&Column, Column> = HashMap::new(); + let new_columns: HashSet = pair + .1 + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => { + let column = Column { + relation: None, + name: c.name.clone(), + }; + replace.insert(c, column.clone()); + column + } + _ => c.clone(), + }) + .collect(); + let replace_map: HashMap<&Column, &Column> = + replace.iter().map(|(k, v)| (*k, v)).collect(); + let new_expr = replace_col(pair.0.clone(), &replace_map)?; + Ok((new_expr, new_columns)) + }) + .collect(); + + let state = State { + filters: new_filters?, + }; + + println!("state: {:?}", &state); + + let plan = push_down(&state, plan)?; + // let copy = plan.clone(); + // println!("after SubqueryAlias: {:?}", copy); + + // match copy { + // LogicalPlan::SubqueryAlias(SubqueryAlias { input, schema, .. }) => { + // println!("before: {:?}", schema); + // println!("input: {:?}", &*input); + // } + // _ => {} + // }; + Ok(plan) + } _ => { // all other plans are _not_ filter-commutable let used_columns = plan @@ -905,9 +962,9 @@ mod tests { let expected = "\ SubqueryAlias: t\ \n Union\ - \n Filter: #t.a = Int64(1)\ + \n Filter: #a = Int64(1)\ \n TableScan: test projection=None\ - \n Filter: #t.a = Int64(1)\ + \n Filter: #a = Int64(1)\ \n TableScan: test projection=None"; assert_optimized_plan_eq(&plan, expected); Ok(()) diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index b32db62c7296..193fbea4ad30 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -32,6 +32,7 @@ use crate::optimizer::utils; use crate::sql::utils::find_sort_exprs; use arrow::datatypes::{Field, Schema}; use arrow::error::Result as ArrowResult; +use datafusion_expr::logical_plan::Filter; use std::{ collections::{BTreeSet, HashSet}, sync::Arc, @@ -428,6 +429,7 @@ fn optimize_plan( })) } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + println!("input: {:?}", *input); match input.as_ref() { LogicalPlan::TableScan(TableScan { table_name, .. }) => { let new_required_columns = new_required_columns @@ -474,6 +476,38 @@ fn optimize_plan( utils::from_plan(plan, &expr, &new_inputs) } + LogicalPlan::Filter(Filter { input, .. }) => { + // let new_inputs = vec![optimize_plan( + // _optimizer, + // input, + // &new_required_columns, + // has_projection, + // _execution_props, + // )?]; + // let expr = vec![]; + + // utils::from_plan(plan, &expr, &new_inputs) + let expr = plan.expressions(); + // collect all required columns by this plan + utils::exprlist_to_columns(&expr, &mut new_required_columns)?; + + // apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|input_plan| { + optimize_plan( + _optimizer, + input_plan, + &new_required_columns, + has_projection, + _execution_props, + ) + }) + .collect::>>()?; + + utils::from_plan(plan, &expr, &new_inputs) + } _ => Err(DataFusionError::Plan( "SubqueryAlias should only wrap TableScan".to_string(), )), diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index b1b4bbf92751..a1b948b1db64 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -838,6 +838,9 @@ impl DefaultPhysicalPlanner { LogicalPlan::Union(..) => { self.create_initial_plan(input, session_state).await } + LogicalPlan::Filter(..) => { + self.create_initial_plan(input, session_state).await + } _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) } } diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 566baefab7f4..5704b5d83c1a 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -548,6 +548,8 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec .unwrap(); let optimized_logical_schema = plan.schema(); + println!("plan {:?}", plan); + let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); let plan = ctx .create_physical_plan(&plan) @@ -555,6 +557,8 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); + println!("plan {:?}", plan); + let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); let task_ctx = ctx.task_ctx(); let results = collect(plan, task_ctx)