From 4d5a0bd14254046b973794ff41d9bf793a072a3b Mon Sep 17 00:00:00 2001 From: Ilya Biryukov Date: Fri, 27 Aug 2021 14:06:53 +0300 Subject: [PATCH] Support aggregation in rolling window queries The idea is to aggregate inside each matching partition and dimension. `ROLLING_WINDOW` clause now has an optional `GROUP BY DIMENSION ` argument. Corresponding expression is used both as a grouping key for non-rolling aggregates and a "join" key to match to the rolling window output dimension. --- datafusion/Cargo.toml | 2 +- datafusion/src/cube_ext/rolling.rs | 156 ++++++++++++++++++++++--- datafusion/src/logical_plan/builder.rs | 32 ++++- datafusion/src/sql/planner.rs | 129 +++++++++++--------- 4 files changed, 246 insertions(+), 73 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index a621930e3c3c..d124988ff5cd 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -48,7 +48,7 @@ ahash = "0.7" hashbrown = "0.11" arrow = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["prettyprint"] } parquet = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["arrow"] } -sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "6008dfab082a3455c54b023be878d92ec9acef43" } +sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "2fcd06f7354e8c85f170b49a08fc018749289a40" } paste = "^1.0" num_cpus = "1.13.0" chrono = "0.4" diff --git a/datafusion/src/cube_ext/rolling.rs b/datafusion/src/cube_ext/rolling.rs index 13e2b0173150..f64f5c93af28 100644 --- a/datafusion/src/cube_ext/rolling.rs +++ b/datafusion/src/cube_ext/rolling.rs @@ -26,23 +26,28 @@ use crate::logical_plan::{ }; use crate::physical_plan::coalesce_batches::concat_batches; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::hash_aggregate::{append_value, create_builder}; +use crate::physical_plan::group_scalar::GroupByScalar; +use crate::physical_plan::hash_aggregate::{ + append_value, create_accumulators, create_builder, create_group_by_value, +}; use crate::physical_plan::planner::ExtensionPlanner; use crate::physical_plan::sort::SortExec; use crate::physical_plan::{ collect, AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning, - PhysicalPlanner, SendableRecordBatchStream, + PhysicalExpr, PhysicalPlanner, SendableRecordBatchStream, }; use crate::scalar::ScalarValue; -use arrow::array::{make_array, BooleanBuilder, MutableArrayData}; +use arrow::array::{make_array, ArrayRef, BooleanBuilder, MutableArrayData, UInt64Array}; use arrow::compute::filter; use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use chrono::{TimeZone, Utc}; +use hashbrown::HashMap; use itertools::Itertools; use std::any::Any; use std::cmp::{max, Ordering}; +use std::convert::TryFrom; use std::sync::Arc; #[derive(Debug)] @@ -55,6 +60,8 @@ pub struct RollingWindowAggregate { pub every: Expr, pub partition_by: Vec, pub rolling_aggs: Vec, + pub group_by_dimension: Option, + pub aggs: Vec, } impl UserDefinedLogicalNode for RollingWindowAggregate { @@ -79,6 +86,10 @@ impl UserDefinedLogicalNode for RollingWindowAggregate { ]; e.extend(self.partition_by.iter().map(|c| Expr::Column(c.clone()))); e.extend_from_slice(self.rolling_aggs.as_slice()); + e.extend_from_slice(self.aggs.as_slice()); + if let Some(d) = &self.group_by_dimension { + e.push(d.clone()); + } e } @@ -96,7 +107,13 @@ impl UserDefinedLogicalNode for RollingWindowAggregate { inputs: &[LogicalPlan], ) -> Arc { assert_eq!(inputs.len(), 1); - assert!(4 + self.partition_by.len() <= exprs.len()); + assert_eq!( + exprs.len(), + 4 + self.partition_by.len() + + self.rolling_aggs.len() + + self.aggs.len() + + self.group_by_dimension.as_ref().map(|_| 1).unwrap_or(0) + ); let input = inputs[0].clone(); let dimension = match &exprs[0] { Expr::Column(c) => c.clone(), @@ -105,14 +122,30 @@ impl UserDefinedLogicalNode for RollingWindowAggregate { let from = exprs[1].clone(); let to = exprs[2].clone(); let every = exprs[3].clone(); - let partition_by = exprs[4..4 + self.partition_by.len()] + let exprs = &exprs[4..]; + let partition_by = exprs[..self.partition_by.len()] .iter() .map(|c| match c { Expr::Column(c) => c.clone(), o => panic!("Expected column for partition_by, got {:?}", o), }) .collect_vec(); - let rolling_aggs = exprs[4 + self.partition_by.len()..].to_vec(); + let exprs = &exprs[self.partition_by.len()..]; + + let rolling_aggs = exprs[..self.rolling_aggs.len()].to_vec(); + let exprs = &exprs[self.rolling_aggs.len()..]; + + let aggs = exprs[..self.aggs.len()].to_vec(); + let exprs = &exprs[self.aggs.len()..]; + + let group_by_dimension = if self.group_by_dimension.is_some() { + debug_assert_eq!(exprs.len(), 1); + Some(exprs[0].clone()) + } else { + debug_assert_eq!(exprs.len(), 0); + None + }; + Arc::new(RollingWindowAggregate { schema: self.schema.clone(), input, @@ -122,6 +155,8 @@ impl UserDefinedLogicalNode for RollingWindowAggregate { every, partition_by, rolling_aggs, + group_by_dimension, + aggs, }) } } @@ -211,6 +246,21 @@ impl ExtensionPlanner for Planner { }) .collect::, _>>()?; + let group_by_dimension = node + .group_by_dimension + .as_ref() + .map(|d| { + planner.create_physical_expr(d, input_dfschema, &input_schema, ctx_state) + }) + .transpose()?; + let aggs = node + .aggs + .iter() + .map(|a| { + planner.create_aggregate_expr(a, input_dfschema, &input_schema, ctx_state) + }) + .collect::>()?; + // TODO: filter inputs by date. // Do preliminary sorting. let mut sort_key = Vec::with_capacity(input_schema.fields().len()); @@ -229,6 +279,7 @@ impl ExtensionPlanner for Planner { }); let sort = Arc::new(SortExec::try_new(sort_key, input.clone())?); + let schema = node.schema.to_schema_ref(); Ok(Some(Arc::new(RollingWindowAggExec { @@ -237,6 +288,8 @@ impl ExtensionPlanner for Planner { group_key, rolling_aggs, dimension, + group_by_dimension, + aggs, from, to, every, @@ -297,6 +350,8 @@ pub struct RollingWindowAggExec { pub group_key: Vec, pub rolling_aggs: Vec, pub dimension: crate::physical_plan::expressions::Column, + pub group_by_dimension: Option>, + pub aggs: Vec>, pub from: ScalarValue, pub to: ScalarValue, pub every: ScalarValue, @@ -335,6 +390,8 @@ impl ExecutionPlan for RollingWindowAggExec { group_key: self.group_key.clone(), rolling_aggs: self.rolling_aggs.clone(), dimension: self.dimension.clone(), + group_by_dimension: self.group_by_dimension.clone(), + aggs: self.aggs.clone(), from: self.from.clone(), to: self.to.clone(), every: self.every.clone(), @@ -357,6 +414,7 @@ impl ExecutionPlan for RollingWindowAggExec { .iter() .map(|c| input.columns()[c.index()].clone()) .collect_vec(); + let other_cols = input .columns() .iter() @@ -374,15 +432,7 @@ impl ExecutionPlan for RollingWindowAggExec { let agg_inputs = self .rolling_aggs .iter() - .map(|r| { - r.agg - .expressions() - .iter() - .map(|e| -> Result<_, DataFusionError> { - Ok(e.evaluate(&input)?.into_array(num_rows)) - }) - .collect::, _>>() - }) + .map(|r| compute_agg_inputs(r.agg.as_ref(), &input)) .collect::, _>>()?; let mut accumulators = self .rolling_aggs @@ -396,6 +446,19 @@ impl ExecutionPlan for RollingWindowAggExec { dimension = arrow::compute::cast(&dimension, &dim_iter_type)?; } + let extra_aggs_dimension = self + .group_by_dimension + .as_ref() + .map(|d| -> Result<_, DataFusionError> { + Ok(d.evaluate(&input)?.into_array(num_rows)) + }) + .transpose()?; + let extra_aggs_inputs = self + .aggs + .iter() + .map(|a| compute_agg_inputs(a.as_ref(), &input)) + .collect::, _>>()?; + let mut out_dim = create_builder(&self.from); let mut out_keys = key_cols .iter() @@ -404,6 +467,12 @@ impl ExecutionPlan for RollingWindowAggExec { let mut out_aggs = Vec::with_capacity(self.rolling_aggs.len()); // This filter must be applied prior to returning the values. let mut out_aggs_keep = BooleanBuilder::new(0); + let extra_agg_nulls = self + .aggs + .iter() + .map(|a| ScalarValue::try_from(a.field()?.data_type())) + .collect::, _>>()?; + let mut out_extra_aggs = extra_agg_nulls.iter().map(create_builder).collect_vec(); let mut out_other = other_cols .iter() .map(|c| MutableArrayData::new(vec![c.data()], true, 0)) @@ -491,6 +560,32 @@ impl ExecutionPlan for RollingWindowAggExec { } } + // Compute non-rolling aggregates for the group. + let mut dim_to_extra_aggs = HashMap::new(); + if let Some(key) = &extra_aggs_dimension { + let mut key_to_rows = HashMap::new(); + for i in group_start..group_end { + let key = create_group_by_value(key, i)?; + key_to_rows.entry(key).or_insert(Vec::new()).push(i as u64); + } + + for (k, rows) in key_to_rows { + let mut accumulators = create_accumulators(&self.aggs)?; + let rows = UInt64Array::from(rows); + let mut values = Vec::with_capacity(accumulators.len()); + for i in 0..accumulators.len() { + let accum_inputs = extra_aggs_inputs[i] + .iter() + .map(|a| arrow::compute::take(a.as_ref(), &rows, None)) + .collect::, _>>()?; + accumulators[i].update_batch(&accum_inputs)?; + values.push(accumulators[i].evaluate()?); + } + + dim_to_extra_aggs.insert(k, values); + } + } + // Add keys, dimension and non-aggregate columns to the output. let mut d = self.from.clone(); let mut d_iter = 0; @@ -509,6 +604,19 @@ impl ExecutionPlan for RollingWindowAggExec { for i in 0..key_cols.len() { out_keys[i].extend(0, group_start, group_start + 1) } + // Add aggregates. + match dim_to_extra_aggs.get(&GroupByScalar::try_from(&d)?) { + Some(aggs) => { + for i in 0..out_extra_aggs.len() { + append_value(out_extra_aggs[i].as_mut(), &aggs[i])? + } + } + None => { + for i in 0..out_extra_aggs.len() { + append_value(out_extra_aggs[i].as_mut(), &extra_agg_nulls[i])? + } + } + } // Find the matching row to add other columns. while matching_row_lower_bound < group_end && cmp_same_types( @@ -590,10 +698,16 @@ impl ExecutionPlan for RollingWindowAggExec { for o in out_other { r.push(make_array(o.freeze())); } + let out_aggs_keep = out_aggs_keep.finish(); for mut a in out_aggs { r.push(filter(a.finish().as_ref(), &out_aggs_keep)?); } + + for mut a in out_extra_aggs { + r.push(a.finish()) + } + let r = RecordBatch::try_new(self.schema(), r)?; Ok(Box::pin(StreamWithSchema::wrap( self.schema(), @@ -621,6 +735,18 @@ fn add_dim(l: &ScalarValue, r: &ScalarValue) -> ScalarValue { } } +fn compute_agg_inputs( + a: &dyn AggregateExpr, + input: &RecordBatch, +) -> Result, DataFusionError> { + a.expressions() + .iter() + .map(|e| -> Result<_, DataFusionError> { + Ok(e.evaluate(input)?.into_array(input.num_rows())) + }) + .collect() +} + fn meets_lower_bound( value: &ScalarValue, current: &ScalarValue, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index f9c54041e774..83188f1cfabc 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -441,14 +441,20 @@ impl LogicalPlanBuilder { every: Expr, rolling_aggs: Vec, mut partition_by: Vec, + group_by_dimension: Option, + aggs: Vec, ) -> Result { - // TODO: it's confusing we're looking at post-aggregation schema here. let dimension = dimension.normalize(&self.plan)?; for c in &mut partition_by { *c = std::mem::replace(c, Column::from_name("")).normalize(&self.plan)?; } let rolling_aggs = normalize_cols(rolling_aggs, &self.plan)?; + let group_by_dimension = group_by_dimension + .map(|d| normalize_col(d, &self.plan)) + .transpose()?; + let aggs = normalize_cols(aggs, &self.plan)?; + if !find_columns(&from).is_empty() { return Err(DataFusionError::Plan( "FROM inside ROLLING_WINDOW cannot reference columns".to_string(), @@ -482,8 +488,23 @@ impl LogicalPlanBuilder { } } + if let Some(d) = &group_by_dimension { + let group_dim_t = d.get_type(&schema)?; + let dim_t = schema.field_from_column(&dimension)?.data_type(); + if &group_dim_t != dim_t { + return Err(DataFusionError::Plan(format!( + "GROUP BY DIMENSION and DIMENSION have different types: {} and {}", + group_dim_t, dim_t + ))); + } + } + // TODO: take other fields from input into account. - validate_unique_names("Rolling window", &rolling_aggs, self.plan.schema())?; + validate_unique_names( + "Rolling window", + rolling_aggs.iter().chain(aggs.iter()), + self.plan.schema(), + )?; // Compute schema. let schema = build_rolling_aggregate_schema( @@ -492,6 +513,7 @@ impl LogicalPlanBuilder { from_type, &partition_by, &rolling_aggs, + &aggs, )?; let p = LogicalPlan::Extension { node: Arc::new(RollingWindowAggregate { @@ -503,6 +525,8 @@ impl LogicalPlanBuilder { every, partition_by, rolling_aggs, + group_by_dimension, + aggs, }), }; Ok(LogicalPlanBuilder::from(p)) @@ -535,6 +559,7 @@ fn build_rolling_aggregate_schema( dimension_type: DataType, partition_by: &[Column], rolling_aggs: &[Expr], + aggs: &[Expr], ) -> Result { let mut fields = Vec::with_capacity(input_schema.fields().len() + rolling_aggs.len()); @@ -571,6 +596,9 @@ fn build_rolling_aggregate_schema( // Followed by the rolling window aggregation results. fields.extend(exprlist_to_fields(rolling_aggs.iter(), input_schema)?); + // Followed by the extra aggregation results. + fields.extend(exprlist_to_fields(aggs.iter(), input_schema)?); + Ok(Arc::new(DFSchema::new(fields)?)) } diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index dbc817528f18..ce0d41d2827f 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -68,6 +68,7 @@ use super::{ use crate::cube_ext::alias::LogicalAlias; use crate::cube_ext::join::contains_table_scan; use crate::sql::utils::find_rolling_aggregate_exprs; +use itertools::Itertools; /// The ContextProvider trait allows the query planner to obtain meta-data about tables and /// functions referenced in SQL statements @@ -686,65 +687,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) .collect::>>()?; - let (plan, select_exprs_post_aggr, having_expr_post_aggr_opt) = if !group_by_exprs - .is_empty() - || !aggr_exprs.is_empty() - { - self.aggregate( - plan, - &select_exprs, - &having_expr_opt, - group_by_exprs, - aggr_exprs, - )? - } else { - if let Some(having_expr) = &having_expr_opt { - let available_columns = select_exprs - .iter() - .map(|expr| expr_as_column_expr(expr, &plan)) - .collect::>>()?; - - // Ensure the HAVING expression is using only columns - // provided by the SELECT. - if !can_columns_satisfy_exprs(&available_columns, &[having_expr.clone()])? - { - return Err(DataFusionError::Plan( - "Having references column(s) not provided by the select" - .to_owned(), - )); - } - } - - (plan, select_exprs, having_expr_opt) - }; - - let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr_opt { - LogicalPlanBuilder::from(plan) - .filter(having_expr_post_aggr)? - .build()? - } else { - plan - }; - - // window function - let window_func_exprs = find_window_exprs(&select_exprs_post_aggr); - - let plan = if window_func_exprs.is_empty() { - plan - } else { - self.window(plan, window_func_exprs)? - }; - // CubeStore extension: rolling window - let rolling_aggs = find_rolling_aggregate_exprs(&select_exprs_post_aggr); - let (plan, select_exprs_post_aggr) = match &select.rolling_window { + let rolling_aggs = find_rolling_aggregate_exprs(&select_exprs); + let (plan, select_exprs, aggr_exprs) = match &select.rolling_window { None => { if !rolling_aggs.is_empty() { return Err(DataFusionError::Plan( "Rolling window aggregates without ROLLING_WINDOW".to_string(), )); } - (plan, select_exprs_post_aggr) + (plan, select_exprs, aggr_exprs) } Some(rolling_window) => { if !select.group_by.is_empty() { @@ -786,9 +738,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let from = self.sql_to_rex(&rolling_window.from, &schema)?; let to = self.sql_to_rex(&rolling_window.to, &schema)?; let every = self.sql_to_rex(&rolling_window.every, &schema)?; - let select_exprs_post_aggr = select_exprs_post_aggr + + let group_by_dimension = rolling_window + .group_by_dimension + .as_ref() + .map(|d| self.sql_to_rex(d, &schema)) + .transpose()?; + if group_by_dimension.is_some() && aggr_exprs.is_empty() { + return Err(DataFusionError::Plan("GROUP BY DIMENSION without aggregate functions inside ROLLING_WINDOW".to_string())); + } else if !aggr_exprs.is_empty() && group_by_dimension.is_none() { + return Err(DataFusionError::Plan("Use of aggregate functions in ROLLING_WINDOW requires GROUP BY DIMENSION ".to_string())); + } + let all_aggs = rolling_aggs .iter() - .map(|expr| rebase_expr(expr, &rolling_aggs, &plan)) + .cloned() + .chain(aggr_exprs.iter().cloned()) + .collect_vec(); + let select_exprs = select_exprs + .iter() + .map(|expr| rebase_expr(expr, &all_aggs, &plan)) .collect::>>()?; let plan = LogicalPlanBuilder::from(plan) .rolling_window_aggregate( @@ -798,10 +766,61 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { every, rolling_aggs, partition_by, + group_by_dimension, + aggr_exprs, )? .build()?; - (plan, select_exprs_post_aggr) + (plan, select_exprs, /*aggr_exprs*/ Vec::new()) + } + }; + + let (plan, select_exprs_post_aggr, having_expr_post_aggr_opt) = if !group_by_exprs + .is_empty() + || !aggr_exprs.is_empty() + { + self.aggregate( + plan, + &select_exprs, + &having_expr_opt, + group_by_exprs, + aggr_exprs, + )? + } else { + if let Some(having_expr) = &having_expr_opt { + let available_columns = select_exprs + .iter() + .map(|expr| expr_as_column_expr(expr, &plan)) + .collect::>>()?; + + // Ensure the HAVING expression is using only columns + // provided by the SELECT. + if !can_columns_satisfy_exprs(&available_columns, &[having_expr.clone()])? + { + return Err(DataFusionError::Plan( + "Having references column(s) not provided by the select" + .to_owned(), + )); + } } + + (plan, select_exprs, having_expr_opt) + }; + + let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr_opt { + LogicalPlanBuilder::from(plan) + .filter(having_expr_post_aggr)? + .build()? + } else { + plan + }; + + // window function + let window_func_exprs = find_window_exprs(&select_exprs_post_aggr); + + let plan = if window_func_exprs.is_empty() { + plan + } else { + self.window(plan, window_func_exprs)? }; let plan = if select.distinct {