Skip to content

Commit

Permalink
reuse datafusion physical planner in ballista building from protobuf (#…
Browse files Browse the repository at this point in the history
…532)

* use logical planner in ballista building

* simplify statement

* fix unit test

* fix per comment
  • Loading branch information
jimexist authored Jun 11, 2021
1 parent 63e3045 commit ad70a1e
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 149 deletions.
142 changes: 26 additions & 116 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::logical_plan::{DFSchema, Expr};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::expressions::col;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
Expand All @@ -45,7 +45,6 @@ use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
use datafusion::physical_plan::windows::create_window_expr;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
coalesce_batches::CoalesceBatchesExec,
Expand Down Expand Up @@ -205,76 +204,27 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
)
})?
.clone();

let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};

let ctx_state = ExecutionContextState::new();
let window_agg_expr: Vec<(Expr, String)> = window_agg
.window_expr
.iter()
.zip(window_agg.window_expr_name.iter())
.map(|(expr, name)| expr.try_into().map(|expr| (expr, name.clone())))
.collect::<Result<Vec<_>, _>>()?;

let mut physical_window_expr = vec![];

let df_planner = DefaultPhysicalPlanner::default();

for (expr, name) in &window_agg_expr {
match expr {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
..
} => {
let arg = df_planner
.create_physical_expr(
&args[0],
&physical_schema,
&ctx_state,
)
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
if !partition_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
}
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
&physical_schema,
name.to_owned(),
)?;
physical_window_expr.push(window_expr);
}
_ => {
return Err(BallistaError::General(
"Invalid expression for WindowAggrExec".to_string(),
));
}
}
}

let physical_window_expr = window_agg_expr
.iter()
.map(|(expr, name)| {
df_planner.create_window_expr_with_name(
expr,
name.to_string(),
&physical_schema,
&ctx_state,
)
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(WindowAggExec::try_new(
physical_window_expr,
input,
Expand All @@ -297,7 +247,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
AggregateMode::FinalPartitioned
}
};

let group = hash_agg
.group_expr
.iter()
Expand All @@ -306,25 +255,13 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
compile_expr(expr, &input.schema()).map(|e| (e, name.to_string()))
})
.collect::<Result<Vec<_>, _>>()?;

let logical_agg_expr: Vec<(Expr, String)> = hash_agg
.aggr_expr
.iter()
.zip(hash_agg.aggr_expr_name.iter())
.map(|(expr, name)| expr.try_into().map(|expr| (expr, name.clone())))
.collect::<Result<Vec<_>, _>>()?;

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};

let ctx_state = ExecutionContextState::new();
let input_schema = hash_agg
.input_schema
.as_ref()
Expand All @@ -336,37 +273,18 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.clone();
let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);

let mut physical_aggr_expr = vec![];

let df_planner = DefaultPhysicalPlanner::default();
for (expr, name) in &logical_agg_expr {
match expr {
Expr::AggregateFunction { fun, args, .. } => {
let arg = df_planner
.create_physical_expr(
&args[0],
&physical_schema,
&ctx_state,
)
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
physical_aggr_expr.push(create_aggregate_expr(
&fun,
false,
&[arg],
&physical_schema,
name.to_string(),
)?);
}
_ => {
return Err(BallistaError::General(
"Invalid expression for HashAggregateExec".to_string(),
))
}
}
}
let physical_aggr_expr = logical_agg_expr
.iter()
.map(|(expr, name)| {
df_planner.create_aggregate_expr_with_name(
expr,
name.to_string(),
&physical_schema,
&ctx_state,
)
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(HashAggregateExec::try_new(
agg_mode,
group,
Expand Down Expand Up @@ -484,15 +402,7 @@ fn compile_expr(
schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
let df_planner = DefaultPhysicalPlanner::default();
let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
let state = ExecutionContextState {
catalog_list,
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
};
let state = ExecutionContextState::new();
let expr: Expr = expr.try_into()?;
df_planner
.create_physical_expr(&expr, schema, &state)
Expand Down
116 changes: 95 additions & 21 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,34 +731,82 @@ impl DefaultPhysicalPlanner {
}
}

/// Create a window expression from a logical expression
pub fn create_window_expr(
/// Create a window expression with a name from a logical expression
pub fn create_window_expr_with_name(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
name: String,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn WindowExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};

match e {
Expr::WindowFunction { fun, args, .. } => {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
} => {
let args = args
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
// if !order_by.is_empty() {
// return Err(DataFusionError::NotImplemented(
// "Window function with order by is not yet implemented".to_owned(),
// ));
// }
windows::create_window_expr(fun, &args, physical_input_schema, name)
let partition_by = partition_by
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let order_by = order_by
.iter()
.map(|e| match e {
Expr::Sort {
expr,
asc,
nulls_first,
} => self.create_physical_sort_expr(
expr,
&physical_input_schema,
SortOptions {
descending: !*asc,
nulls_first: *nulls_first,
},
&ctx_state,
),
_ => Err(DataFusionError::Plan(
"Sort only accepts sort expressions".to_string(),
)),
})
.collect::<Result<Vec<_>>>()?;
if !partition_by.is_empty() {
return Err(DataFusionError::NotImplemented(
"window expression with non-empty partition by clause is not yet supported"
.to_owned(),
));
}
if !order_by.is_empty() {
return Err(DataFusionError::NotImplemented(
"window expression with non-empty order by clause is not yet supported"
.to_owned(),
));
}
if window_frame.is_some() {
return Err(DataFusionError::NotImplemented(
"window expression with window frame definition is not yet supported"
.to_owned(),
));
}
windows::create_window_expr(
fun,
name,
&args,
&partition_by,
&order_by,
*window_frame,
physical_input_schema,
)
}
other => Err(DataFusionError::Internal(format!(
"Invalid window expression '{:?}'",
Expand All @@ -767,20 +815,30 @@ impl DefaultPhysicalPlanner {
}
}

/// Create an aggregate expression from a logical expression
pub fn create_aggregate_expr(
/// Create a window expression from a logical expression or an alias
pub fn create_window_expr(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) as total"
) -> Result<Arc<dyn WindowExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};
self.create_window_expr_with_name(e, name, physical_input_schema, ctx_state)
}

/// Create an aggregate expression with a name from a logical expression
pub fn create_aggregate_expr_with_name(
&self,
e: &Expr,
name: String,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
match e {
Expr::AggregateFunction {
fun,
Expand Down Expand Up @@ -819,7 +877,23 @@ impl DefaultPhysicalPlanner {
}
}

/// Create an aggregate expression from a logical expression
/// Create an aggregate expression from a logical expression or an alias
pub fn create_aggregate_expr(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn AggregateExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};
self.create_aggregate_expr_with_name(e, name, physical_input_schema, ctx_state)
}

/// Create a physical sort expression from a logical expression
pub fn create_physical_sort_expr(
&self,
e: &Expr,
Expand Down
Loading

0 comments on commit ad70a1e

Please sign in to comment.