diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 94d32e257f3d9..a2d774678ce79 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -22,6 +22,7 @@ use super::{ functions, hash_join::PartitionMode, udaf, union::UnionExec, windows, }; use crate::execution::context::ExecutionContextState; +use crate::logical_plan::window_frames::WindowFrame; use crate::logical_plan::{ DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType, StringifiedPlan, UserDefinedLogicalNode, @@ -740,19 +741,56 @@ impl DefaultPhysicalPlanner { ctx_state: &ExecutionContextState, ) -> Result> { match e { - Expr::WindowFunction { fun, args, .. } => { + Expr::WindowFunction { + fun, + args, + partition_by, + order_by, + window_frame, + } => { let args = args .iter() .map(|e| { self.create_physical_expr(e, physical_input_schema, ctx_state) }) .collect::>>()?; - // if !order_by.is_empty() { - // return Err(DataFusionError::NotImplemented( - // "Window function with order by is not yet implemented".to_owned(), - // )); - // } - windows::create_window_expr(fun, &args, physical_input_schema, name) + let partition_by = partition_by + .iter() + .map(|e| { + self.create_physical_expr(e, physical_input_schema, ctx_state) + }) + .collect::>>()?; + let order_by = order_by + .iter() + .map(|e| match e { + Expr::Sort { + expr, + asc, + nulls_first, + } => self.create_physical_sort_expr( + expr, + &physical_input_schema, + SortOptions { + descending: !*asc, + nulls_first: *nulls_first, + }, + &ctx_state, + ), + _ => Err(DataFusionError::Plan( + "Sort only accepts sort expressions".to_string(), + )), + }) + .collect::>>()?; + let window_frame = window_frame.unwrap_or_else(WindowFrame::default); + windows::create_window_expr( + fun, + name, + &args, + &partition_by, + &order_by, + window_frame, + physical_input_schema, + ) } other => Err(DataFusionError::Internal(format!( "Invalid window expression '{:?}'", diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 7eb14943facf1..b7dfd37c87120 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -18,9 +18,11 @@ //! Execution plan for window functions use crate::error::{DataFusionError, Result}; + +use crate::logical_plan::window_frames::WindowFrame; use crate::physical_plan::{ aggregates, common, - expressions::{Literal, NthValue, RowNumber}, + expressions::{Literal, NthValue, PhysicalSortExpr, RowNumber}, type_coercion::coerce, window_functions::signature_for_built_in, window_functions::BuiltInWindowFunctionExpr, @@ -61,12 +63,15 @@ pub struct WindowAggExec { /// Create a physical expression for window function pub fn create_window_expr( fun: &WindowFunction, + name: String, args: &[Arc], + _partition_by: &[Arc], + _order_by: &[PhysicalSortExpr], + _window_frame: WindowFrame, input_schema: &Schema, - name: String, ) -> Result> { - match fun { - WindowFunction::AggregateFunction(fun) => Ok(Arc::new(AggregateWindowExpr { + Ok(match fun { + WindowFunction::AggregateFunction(fun) => Arc::new(AggregateWindowExpr { aggregate: aggregates::create_aggregate_expr( fun, false, @@ -74,11 +79,11 @@ pub fn create_window_expr( input_schema, name, )?, - })), - WindowFunction::BuiltInWindowFunction(fun) => Ok(Arc::new(BuiltInWindowExpr { + }), + WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr { window: create_built_in_window_expr(fun, args, input_schema, name)?, - })), - } + }), + }) } fn create_built_in_window_expr( @@ -538,9 +543,12 @@ mod tests { let window_exec = Arc::new(WindowAggExec::try_new( vec![create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Count), + "count".to_owned(), &[col("c3")], + &[], + &[], + WindowFrame::default(), schema.as_ref(), - "count".to_owned(), )?], input, schema.clone(), @@ -568,21 +576,30 @@ mod tests { vec![ create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Count), + "count".to_owned(), &[col("c3")], + &[], + &[], + WindowFrame::default(), schema.as_ref(), - "count".to_owned(), )?, create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Max), + "max".to_owned(), &[col("c3")], + &[], + &[], + WindowFrame::default(), schema.as_ref(), - "max".to_owned(), )?, create_window_expr( &WindowFunction::AggregateFunction(AggregateFunction::Min), + "min".to_owned(), &[col("c3")], + &[], + &[], + WindowFrame::default(), schema.as_ref(), - "min".to_owned(), )?, ], input,