Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed May 13, 2022
1 parent 0c829b2 commit ab3d258
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 3 deletions.
63 changes: 60 additions & 3 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,64 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
}),
)
}
LogicalPlan::SubqueryAlias(SubqueryAlias { .. }) => push_down(&state, plan),
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, input, .. }) => {
// println!("before: {:?}", schema);
// println!("input: {:?}", &*input);
// println!("state: {:?}", &state);

// let t = match &**input {
// LogicalPlan::TableScan(TableScan { table_name, .. }) => {
// Some(table_name.clone())
// }
// _ => None,
// };

let new_filters: Result<Vec<(Expr, HashSet<Column>)>> = state
.filters
.iter()
.map(|pair| {
let mut replace: HashMap<&Column, Column> = HashMap::new();
let new_columns: HashSet<Column> = pair
.1
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => {
let column = Column {
relation: None,
name: c.name.clone(),
};
replace.insert(c, column.clone());
column
}
_ => c.clone(),
})
.collect();
let replace_map: HashMap<&Column, &Column> =
replace.iter().map(|(k, v)| (*k, v)).collect();
let new_expr = replace_col(pair.0.clone(), &replace_map)?;
Ok((new_expr, new_columns))
})
.collect();

let state = State {
filters: new_filters?,
};

println!("state: {:?}", &state);

let plan = push_down(&state, plan)?;
// let copy = plan.clone();
// println!("after SubqueryAlias: {:?}", copy);

// match copy {
// LogicalPlan::SubqueryAlias(SubqueryAlias { input, schema, .. }) => {
// println!("before: {:?}", schema);
// println!("input: {:?}", &*input);
// }
// _ => {}
// };
Ok(plan)
}
_ => {
// all other plans are _not_ filter-commutable
let used_columns = plan
Expand Down Expand Up @@ -905,9 +962,9 @@ mod tests {
let expected = "\
SubqueryAlias: t\
\n Union\
\n Filter: #t.a = Int64(1)\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None\
\n Filter: #t.a = Int64(1)\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
Expand Down
34 changes: 34 additions & 0 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::optimizer::utils;
use crate::sql::utils::find_sort_exprs;
use arrow::datatypes::{Field, Schema};
use arrow::error::Result as ArrowResult;
use datafusion_expr::logical_plan::Filter;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
Expand Down Expand Up @@ -428,6 +429,7 @@ fn optimize_plan(
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
println!("input: {:?}", *input);
match input.as_ref() {
LogicalPlan::TableScan(TableScan { table_name, .. }) => {
let new_required_columns = new_required_columns
Expand Down Expand Up @@ -474,6 +476,38 @@ fn optimize_plan(

utils::from_plan(plan, &expr, &new_inputs)
}
LogicalPlan::Filter(Filter { input, .. }) => {
// let new_inputs = vec![optimize_plan(
// _optimizer,
// input,
// &new_required_columns,
// has_projection,
// _execution_props,
// )?];
// let expr = vec![];

// utils::from_plan(plan, &expr, &new_inputs)
let expr = plan.expressions();
// collect all required columns by this plan
utils::exprlist_to_columns(&expr, &mut new_required_columns)?;

// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input_plan| {
optimize_plan(
_optimizer,
input_plan,
&new_required_columns,
has_projection,
_execution_props,
)
})
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
)),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,9 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Union(..) => {
self.create_initial_plan(input, session_state).await
}
LogicalPlan::Filter(..) => {
self.create_initial_plan(input, session_state).await
}
_ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
}
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,17 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch>
.unwrap();
let optimized_logical_schema = plan.schema();

println!("plan {:?}", plan);

let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx
.create_physical_plan(&plan)
.await
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();

println!("plan {:?}", plan);

let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let task_ctx = ctx.task_ctx();
let results = collect(plan, task_ctx)
Expand Down

0 comments on commit ab3d258

Please sign in to comment.