From c2a1593cd4fb4c8f831a7f40f6e6dd0c53acb695 Mon Sep 17 00:00:00 2001 From: Mustafa akur <106137913+mustafasrepo@users.noreply.github.com> Date: Thu, 15 Dec 2022 15:10:19 +0300 Subject: [PATCH] partition by refactor (#28) * partition by refactor * minor changes * Unnecessary tuple to Range conversion is removed * move transpose under common --- datafusion/common/src/lib.rs | 12 +++ .../remove_unnecessary_sorts.rs | 25 +++--- .../physical_plan/windows/window_agg_exec.rs | 79 +++++++++++++++- datafusion/core/tests/sql/window.rs | 54 +++++++++++ .../physical-expr/src/window/aggregate.rs | 89 +++++++++---------- .../physical-expr/src/window/built_in.rs | 55 +++++------- .../physical-expr/src/window/cume_dist.rs | 25 +++--- .../physical-expr/src/window/lead_lag.rs | 17 ++-- .../src/window/partition_evaluator.rs | 55 +----------- datafusion/physical-expr/src/window/rank.rs | 28 +++--- .../physical-expr/src/window/row_number.rs | 18 ++-- .../physical-expr/src/window/window_expr.rs | 20 +---- .../src/window/window_frame_state.rs | 15 ++-- 13 files changed, 262 insertions(+), 230 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index c6812ed24f8a..63683f5af024 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -73,3 +73,15 @@ pub fn reverse_sort_options(options: SortOptions) -> SortOptions { nulls_first: !options.nulls_first, } } + +/// Transposes 2d vector +pub fn transpose(original: Vec>) -> Vec> { + assert!(!original.is_empty()); + let mut transposed = (0..original[0].len()).map(|_| vec![]).collect::>(); + for original_row in original { + for (item, transposed_row) in original_row.into_iter().zip(&mut transposed) { + transposed_row.push(item); + } + } + transposed +} diff --git a/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs b/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs index 322677a30428..ccaf93e0e60d 100644 --- a/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs +++ b/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs @@ -364,16 +364,15 @@ pub fn can_skip_sort( .iter() .filter(|elem| elem.is_partition) .collect::>(); - let (can_skip_partition_bys, should_reverse_partition_bys) = - if partition_by_sections.is_empty() { - (true, false) - } else { - let first_reverse = partition_by_sections[0].reverse; - let can_skip_partition_bys = partition_by_sections - .iter() - .all(|c| c.is_aligned && c.reverse == first_reverse); - (can_skip_partition_bys, first_reverse) - }; + let can_skip_partition_bys = if partition_by_sections.is_empty() { + true + } else { + let first_reverse = partition_by_sections[0].reverse; + let can_skip_partition_bys = partition_by_sections + .iter() + .all(|c| c.is_aligned && c.reverse == first_reverse); + can_skip_partition_bys + }; let order_by_sections = col_infos .iter() .filter(|elem| !elem.is_partition) @@ -387,11 +386,7 @@ pub fn can_skip_sort( .all(|c| c.is_aligned && c.reverse == first_reverse); (can_skip_order_bys, first_reverse) }; - // TODO: We cannot skip partition by keys when sort direction is reversed, - // by propogating partition by sort direction to `WindowAggExec` we can skip - // these columns also. Add support for that (Use direction during partition range calculation). - let can_skip = - can_skip_order_bys && can_skip_partition_bys && !should_reverse_partition_bys; + let can_skip = can_skip_order_bys && can_skip_partition_bys; Ok((can_skip, should_reverse_order_bys)) } diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 914e3e71dbad..837f32ac69b5 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -28,19 +28,23 @@ use crate::physical_plan::{ ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, }; -use arrow::compute::concat_batches; +use arrow::compute::{ + concat, concat_batches, lexicographical_partition_ranges, SortColumn, +}; use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; +use datafusion_common::{transpose, DataFusionError}; use datafusion_physical_expr::rewrite::TreeNodeRewritable; use datafusion_physical_expr::EquivalentClass; use futures::stream::Stream; use futures::{ready, StreamExt}; use log::debug; use std::any::Any; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -131,6 +135,25 @@ impl WindowAggExec { pub fn input_schema(&self) -> SchemaRef { self.input_schema.clone() } + + /// Get Partition Columns + pub fn partition_by_sort_keys(&self) -> Result> { + // All window exprs have same partition by hance we just use first one + let partition_by = self.window_expr()[0].partition_by(); + let mut partition_columns = vec![]; + for elem in partition_by { + if let Some(sort_keys) = &self.sort_keys { + for a in sort_keys { + if a.expr.eq(elem) { + partition_columns.push(a.clone()); + break; + } + } + } + } + assert_eq!(partition_by.len(), partition_columns.len()); + Ok(partition_columns) + } } impl ExecutionPlan for WindowAggExec { @@ -253,6 +276,7 @@ impl ExecutionPlan for WindowAggExec { self.window_expr.clone(), input, BaselineMetrics::new(&self.metrics, partition), + self.partition_by_sort_keys()?, )); Ok(stream) } @@ -337,6 +361,7 @@ pub struct WindowAggStream { batches: Vec, finished: bool, window_expr: Vec>, + partition_by_sort_keys: Vec, baseline_metrics: BaselineMetrics, } @@ -347,6 +372,7 @@ impl WindowAggStream { window_expr: Vec>, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, + partition_by_sort_keys: Vec, ) -> Self { Self { schema, @@ -355,6 +381,7 @@ impl WindowAggStream { finished: false, window_expr, baseline_metrics, + partition_by_sort_keys, } } @@ -369,8 +396,27 @@ impl WindowAggStream { let batch = concat_batches(&self.input.schema(), &self.batches)?; // calculate window cols - let mut columns = compute_window_aggregates(&self.window_expr, &batch) - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; + let partition_columns = self.partition_columns(&batch)?; + let partition_points = + self.evaluate_partition_points(batch.num_rows(), &partition_columns)?; + + let mut partition_results = vec![]; + for partition_point in partition_points { + let length = partition_point.end - partition_point.start; + partition_results.push( + compute_window_aggregates( + &self.window_expr, + &batch.slice(partition_point.start, length), + ) + .map_err(|e| ArrowError::ExternalError(Box::new(e)))?, + ) + } + let mut columns = transpose(partition_results) + .iter() + .map(|elems| concat(&elems.iter().map(|x| x.as_ref()).collect::>())) + .collect::>() + .into_iter() + .collect::>>()?; // combine with the original cols // note the setup of window aggregates is that they newly calculated window @@ -378,6 +424,33 @@ impl WindowAggStream { columns.extend_from_slice(batch.columns()); RecordBatch::try_new(self.schema.clone(), columns) } + + /// Get Partition Columns + pub fn partition_columns(&self, batch: &RecordBatch) -> Result> { + self.partition_by_sort_keys + .iter() + .map(|elem| elem.evaluate_to_sort_column(batch)) + .collect::>>() + } + + /// evaluate the partition points given the sort columns; if the sort columns are + /// empty then the result will be a single element vec of the whole column rows. + fn evaluate_partition_points( + &self, + num_rows: usize, + partition_columns: &[SortColumn], + ) -> Result>> { + if partition_columns.is_empty() { + Ok(vec![Range { + start: 0, + end: num_rows, + }]) + } else { + Ok(lexicographical_partition_ranges(partition_columns) + .map_err(DataFusionError::ArrowError)? + .collect::>()) + } + } } impl Stream for WindowAggStream { diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 085628051e17..a5bd6a3b97c8 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -2177,3 +2177,57 @@ async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Result<()> { + let config = SessionConfig::new().with_repartition_windows(false); + let ctx = SessionContext::with_config(config); + register_aggregate_csv(&ctx).await?; + let sql = "SELECT c3, + SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1, + SUM(c9) OVER(PARTITION BY c3 ORDER BY c9 DESC ) as sum2 + FROM aggregate_test_100 + LIMIT 5"; + + let msg = format!("Creating logical plan for '{}'", sql); + let plan = ctx.create_logical_plan(sql).expect(&msg); + let state = ctx.state(); + let logical_plan = state.optimize(&plan)?; + let physical_plan = state.create_physical_plan(&logical_plan).await?; + let formatted = displayable(physical_plan.as_ref()).indent().to_string(); + // Only 1 SortExec was added + let expected = { + vec![ + "ProjectionExec: expr=[c3@3 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@0 as sum2]", + " GlobalLimitExec: skip=0, fetch=5", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", + " WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]", + " SortExec: [c3@1 DESC,c9@2 DESC,c2@0 ASC NULLS LAST]", + ] + }; + + let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual_len = actual.len(); + let actual_trim_last = &actual[..actual_len - 1]; + assert_eq!( + expected, actual_trim_last, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected, actual + ); + + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-----+-------------+------------+", + "| c3 | sum1 | sum2 |", + "+-----+-------------+------------+", + "| 125 | 3625286410 | 3625286410 |", + "| 123 | 7192027599 | 3566741189 |", + "| 123 | 9784358155 | 6159071745 |", + "| 122 | 13845993262 | 4061635107 |", + "| 120 | 16676974334 | 2830981072 |", + "+-----+-------------+------------+", + ]; + assert_batches_eq!(expected, &actual); + + Ok(()) +} diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 1f0c286efc85..1268559fe568 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -19,6 +19,7 @@ use std::any::Any; use std::iter::IntoIterator; +use std::ops::Range; use std::sync::Arc; use arrow::array::Array; @@ -90,58 +91,50 @@ impl WindowExpr for AggregateWindowExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let partition_columns = self.partition_columns(batch)?; - let partition_points = - self.evaluate_partition_points(batch.num_rows(), &partition_columns)?; let sort_options: Vec = self.order_by.iter().map(|o| o.options).collect(); let mut row_wise_results: Vec = vec![]; - for partition_range in &partition_points { - let mut accumulator = self.aggregate.create_accumulator()?; - let length = partition_range.end - partition_range.start; - let (values, order_bys) = - self.get_values_orderbys(&batch.slice(partition_range.start, length))?; - - let mut window_frame_ctx = WindowFrameContext::new(&self.window_frame); - let mut last_range: (usize, usize) = (0, 0); - - // We iterate on each row to perform a running calculation. - // First, cur_range is calculated, then it is compared with last_range. - for i in 0..length { - let cur_range = window_frame_ctx.calculate_range( - &order_bys, - &sort_options, - length, - i, - )?; - let value = if cur_range.0 == cur_range.1 { - // We produce None if the window is empty. - ScalarValue::try_from(self.aggregate.field()?.data_type())? - } else { - // Accumulate any new rows that have entered the window: - let update_bound = cur_range.1 - last_range.1; - if update_bound > 0 { - let update: Vec = values - .iter() - .map(|v| v.slice(last_range.1, update_bound)) - .collect(); - accumulator.update_batch(&update)? - } - // Remove rows that have now left the window: - let retract_bound = cur_range.0 - last_range.0; - if retract_bound > 0 { - let retract: Vec = values - .iter() - .map(|v| v.slice(last_range.0, retract_bound)) - .collect(); - accumulator.retract_batch(&retract)? - } - accumulator.evaluate()? - }; - row_wise_results.push(value); - last_range = cur_range; - } + + let mut accumulator = self.aggregate.create_accumulator()?; + let length = batch.num_rows(); + let (values, order_bys) = self.get_values_orderbys(batch)?; + + let mut window_frame_ctx = WindowFrameContext::new(&self.window_frame); + let mut last_range = Range { start: 0, end: 0 }; + + // We iterate on each row to perform a running calculation. + // First, cur_range is calculated, then it is compared with last_range. + for i in 0..length { + let cur_range = + window_frame_ctx.calculate_range(&order_bys, &sort_options, length, i)?; + let value = if cur_range.end == cur_range.start { + // We produce None if the window is empty. + ScalarValue::try_from(self.aggregate.field()?.data_type())? + } else { + // Accumulate any new rows that have entered the window: + let update_bound = cur_range.end - last_range.end; + if update_bound > 0 { + let update: Vec = values + .iter() + .map(|v| v.slice(last_range.end, update_bound)) + .collect(); + accumulator.update_batch(&update)? + } + // Remove rows that have now left the window: + let retract_bound = cur_range.start - last_range.start; + if retract_bound > 0 { + let retract: Vec = values + .iter() + .map(|v| v.slice(last_range.start, retract_bound)) + .collect(); + accumulator.retract_batch(&retract)? + } + accumulator.evaluate()? + }; + row_wise_results.push(value); + last_range = cur_range; } + ScalarValue::iter_to_array(row_wise_results.into_iter()) } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index 8e16fc7e7a62..0b1e5ee8f19c 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -22,14 +22,13 @@ use super::BuiltInWindowFunctionExpr; use super::WindowExpr; use crate::window::window_expr::reverse_order_bys; use crate::{expressions::PhysicalSortExpr, PhysicalExpr}; -use arrow::compute::{concat, SortOptions}; +use arrow::compute::SortOptions; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; -use datafusion_common::DataFusionError; use datafusion_common::Result; +use datafusion_common::ScalarValue; use datafusion_expr::WindowFrame; use std::any::Any; -use std::ops::Range; use std::sync::Arc; /// A window expr that takes the form of a built in window function @@ -92,47 +91,35 @@ impl WindowExpr for BuiltInWindowExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let evaluator = self.expr.create_evaluator()?; let num_rows = batch.num_rows(); - let partition_columns = self.partition_columns(batch)?; - let partition_points = - self.evaluate_partition_points(num_rows, &partition_columns)?; - - let results = if evaluator.uses_window_frame() { + if evaluator.uses_window_frame() { let sort_options: Vec = self.order_by.iter().map(|o| o.options).collect(); let mut row_wise_results = vec![]; - for partition_range in &partition_points { - let length = partition_range.end - partition_range.start; - let (values, order_bys) = self - .get_values_orderbys(&batch.slice(partition_range.start, length))?; - let mut window_frame_ctx = WindowFrameContext::new(&self.window_frame); - // We iterate on each row to calculate window frame range and and window function result - for idx in 0..length { - let range = window_frame_ctx.calculate_range( - &order_bys, - &sort_options, - num_rows, - idx, - )?; - let range = Range { - start: range.0, - end: range.1, - }; - let value = evaluator.evaluate_inside_range(&values, range)?; - row_wise_results.push(value.to_array()); - } + + let length = batch.num_rows(); + let (values, order_bys) = self.get_values_orderbys(batch)?; + let mut window_frame_ctx = WindowFrameContext::new(&self.window_frame); + // We iterate on each row to calculate window frame range and and window function result + for idx in 0..length { + let range = window_frame_ctx.calculate_range( + &order_bys, + &sort_options, + num_rows, + idx, + )?; + let value = evaluator.evaluate_inside_range(&values, range)?; + row_wise_results.push(value); } - row_wise_results + ScalarValue::iter_to_array(row_wise_results.into_iter()) } else if evaluator.include_rank() { let columns = self.sort_columns(batch)?; let sort_partition_points = self.evaluate_partition_points(num_rows, &columns)?; - evaluator.evaluate_with_rank(partition_points, sort_partition_points)? + evaluator.evaluate_with_rank(num_rows, &sort_partition_points) } else { let (values, _) = self.get_values_orderbys(batch)?; - evaluator.evaluate(&values, partition_points)? - }; - let results = results.iter().map(|i| i.as_ref()).collect::>(); - concat(&results).map_err(DataFusionError::ArrowError) + evaluator.evaluate(&values, num_rows) + } } fn get_window_frame(&self) -> &Arc { diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs index 4202058a3c5a..45fe51178afc 100644 --- a/datafusion/physical-expr/src/window/cume_dist.rs +++ b/datafusion/physical-expr/src/window/cume_dist.rs @@ -73,19 +73,19 @@ impl PartitionEvaluator for CumeDistEvaluator { true } - fn evaluate_partition_with_rank( + fn evaluate_with_rank( &self, - partition: Range, + num_rows: usize, ranks_in_partition: &[Range], ) -> Result { - let scaler = (partition.end - partition.start) as f64; + let scalar = num_rows as f64; let result = Float64Array::from_iter_values( ranks_in_partition .iter() .scan(0_u64, |acc, range| { let len = range.end - range.start; *acc += len as u64; - let value: f64 = (*acc as f64) / scaler; + let value: f64 = (*acc as f64) / scalar; let result = iter::repeat(value).take(len); Some(result) }) @@ -102,15 +102,14 @@ mod tests { fn test_i32_result( expr: &CumeDist, - partition: Range, + num_rows: usize, ranks: Vec>, expected: Vec, ) -> Result<()> { let result = expr .create_evaluator()? - .evaluate_with_rank(vec![partition], ranks)?; - assert_eq!(1, result.len()); - let result = as_float64_array(&result[0])?; + .evaluate_with_rank(num_rows, &ranks)?; + let result = as_float64_array(&result)?; let result = result.values(); assert_eq!(expected, result); Ok(()) @@ -121,19 +120,19 @@ mod tests { let r = cume_dist("arr".into()); let expected = vec![0.0; 0]; - test_i32_result(&r, 0..0, vec![], expected)?; + test_i32_result(&r, 0, vec![], expected)?; let expected = vec![1.0; 1]; - test_i32_result(&r, 0..1, vec![0..1], expected)?; + test_i32_result(&r, 1, vec![0..1], expected)?; let expected = vec![1.0; 2]; - test_i32_result(&r, 0..2, vec![0..2], expected)?; + test_i32_result(&r, 2, vec![0..2], expected)?; let expected = vec![0.5, 0.5, 1.0, 1.0]; - test_i32_result(&r, 0..4, vec![0..2, 2..4], expected)?; + test_i32_result(&r, 4, vec![0..2, 2..4], expected)?; let expected = vec![0.25, 0.5, 0.75, 1.0]; - test_i32_result(&r, 0..4, vec![0..1, 1..2, 2..3, 3..4], expected)?; + test_i32_result(&r, 4, vec![0..1, 1..2, 2..3, 3..4], expected)?; Ok(()) } diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index 97076963f60d..f4c176262ae4 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -28,7 +28,6 @@ use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use std::any::Any; use std::ops::Neg; -use std::ops::Range; use std::sync::Arc; /// window shift expression @@ -178,15 +177,10 @@ fn shift_with_default_value( } impl PartitionEvaluator for WindowShiftEvaluator { - fn evaluate_partition( - &self, - values: &[ArrayRef], - partition: Range, - ) -> Result { + fn evaluate(&self, values: &[ArrayRef], _num_rows: usize) -> Result { // LEAD, LAG window functions take single column, values will have size 1 let value = &values[0]; - let value = value.slice(partition.start, partition.end - partition.start); - shift_with_default_value(&value, self.shift_offset, self.default_value.as_ref()) + shift_with_default_value(value, self.shift_offset, self.default_value.as_ref()) } } @@ -205,9 +199,10 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; let values = expr.evaluate_args(&batch)?; - let result = expr.create_evaluator()?.evaluate(&values, vec![0..8])?; - assert_eq!(1, result.len()); - let result = as_int32_array(&result[0])?; + let result = expr + .create_evaluator()? + .evaluate(&values, batch.num_rows())?; + let result = as_int32_array(&result)?; assert_eq!(expected, *result); Ok(()) } diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs index 1608758d61b3..86500441df5b 100644 --- a/datafusion/physical-expr/src/window/partition_evaluator.rs +++ b/datafusion/physical-expr/src/window/partition_evaluator.rs @@ -22,23 +22,6 @@ use datafusion_common::Result; use datafusion_common::{DataFusionError, ScalarValue}; use std::ops::Range; -/// Given a partition range, and the full list of sort partition points, given that the sort -/// partition points are sorted using [partition columns..., order columns...], the split -/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted -/// on finer columns), so this will use binary search to find ranges that are within the -/// partition range and return the valid slice. -pub(crate) fn find_ranges_in_range<'a>( - partition_range: &Range, - sort_partition_points: &'a [Range], -) -> &'a [Range] { - let start_idx = sort_partition_points - .partition_point(|sort_range| sort_range.start < partition_range.start); - let end_idx = start_idx - + sort_partition_points[start_idx..] - .partition_point(|sort_range| sort_range.end <= partition_range.end); - &sort_partition_points[start_idx..end_idx] -} - /// Partition evaluator pub trait PartitionEvaluator { /// Whether the evaluator should be evaluated with rank @@ -50,49 +33,17 @@ pub trait PartitionEvaluator { false } - /// evaluate the partition evaluator against the partitions - fn evaluate( - &self, - values: &[ArrayRef], - partition_points: Vec>, - ) -> Result> { - partition_points - .into_iter() - .map(|partition| self.evaluate_partition(values, partition)) - .collect() - } - - /// evaluate the partition evaluator against the partitions with rank information - fn evaluate_with_rank( - &self, - partition_points: Vec>, - sort_partition_points: Vec>, - ) -> Result> { - partition_points - .into_iter() - .map(|partition| { - let ranks_in_partition = - find_ranges_in_range(&partition, &sort_partition_points); - self.evaluate_partition_with_rank(partition, ranks_in_partition) - }) - .collect() - } - /// evaluate the partition evaluator against the partition - fn evaluate_partition( - &self, - _values: &[ArrayRef], - _partition: Range, - ) -> Result { + fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result { Err(DataFusionError::NotImplemented( "evaluate_partition is not implemented by default".into(), )) } /// evaluate the partition evaluator against the partition but with rank - fn evaluate_partition_with_rank( + fn evaluate_with_rank( &self, - _partition: Range, + _num_rows: usize, _ranks_in_partition: &[Range], ) -> Result { Err(DataFusionError::NotImplemented( diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index 8ed0319a10b0..87e01528de5a 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -114,9 +114,9 @@ impl PartitionEvaluator for RankEvaluator { true } - fn evaluate_partition_with_rank( + fn evaluate_with_rank( &self, - partition: Range, + num_rows: usize, ranks_in_partition: &[Range], ) -> Result { // see https://www.postgresql.org/docs/current/functions-window.html @@ -132,7 +132,7 @@ impl PartitionEvaluator for RankEvaluator { )), RankType::Percent => { // Returns the relative rank of the current row, that is (rank - 1) / (total partition rows - 1). The value thus ranges from 0 to 1 inclusive. - let denominator = (partition.end - partition.start) as f64; + let denominator = num_rows as f64; Arc::new(Float64Array::from_iter_values( ranks_in_partition .iter() @@ -177,15 +177,14 @@ mod tests { fn test_f64_result( expr: &Rank, - range: Range, + num_rows: usize, ranks: Vec>, expected: Vec, ) -> Result<()> { let result = expr .create_evaluator()? - .evaluate_with_rank(vec![range], ranks)?; - assert_eq!(1, result.len()); - let result = as_float64_array(&result[0])?; + .evaluate_with_rank(num_rows, &ranks)?; + let result = as_float64_array(&result)?; let result = result.values(); assert_eq!(expected, result); Ok(()) @@ -196,11 +195,8 @@ mod tests { ranks: Vec>, expected: Vec, ) -> Result<()> { - let result = expr - .create_evaluator()? - .evaluate_with_rank(vec![0..8], ranks)?; - assert_eq!(1, result.len()); - let result = as_uint64_array(&result[0])?; + let result = expr.create_evaluator()?.evaluate_with_rank(8, &ranks)?; + let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(expected, result); Ok(()) @@ -228,19 +224,19 @@ mod tests { // empty case let expected = vec![0.0; 0]; - test_f64_result(&r, 0..0, vec![0..0; 0], expected)?; + test_f64_result(&r, 0, vec![0..0; 0], expected)?; // singleton case let expected = vec![0.0]; - test_f64_result(&r, 0..1, vec![0..1], expected)?; + test_f64_result(&r, 1, vec![0..1], expected)?; // uniform case let expected = vec![0.0; 7]; - test_f64_result(&r, 0..7, vec![0..7], expected)?; + test_f64_result(&r, 7, vec![0..7], expected)?; // non-trivial case let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5]; - test_f64_result(&r, 0..7, vec![0..3, 3..7], expected)?; + test_f64_result(&r, 7, vec![0..3, 3..7], expected)?; Ok(()) } diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index f70d9ea379dd..b27ac29d2764 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -24,7 +24,6 @@ use arrow::array::{ArrayRef, UInt64Array}; use arrow::datatypes::{DataType, Field}; use datafusion_common::Result; use std::any::Any; -use std::ops::Range; use std::sync::Arc; /// row_number expression @@ -69,12 +68,7 @@ impl BuiltInWindowFunctionExpr for RowNumber { pub(crate) struct NumRowsEvaluator {} impl PartitionEvaluator for NumRowsEvaluator { - fn evaluate_partition( - &self, - _values: &[ArrayRef], - partition: Range, - ) -> Result { - let num_rows = partition.end - partition.start; + fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { Ok(Arc::new(UInt64Array::from_iter_values( 1..(num_rows as u64) + 1, ))) @@ -99,9 +93,8 @@ mod tests { let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? - .evaluate(&values, vec![0..8])?; - assert_eq!(1, result.len()); - let result = as_uint64_array(&result[0])?; + .evaluate(&values, batch.num_rows())?; + let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(()) @@ -118,9 +111,8 @@ mod tests { let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? - .evaluate(&values, vec![0..8])?; - assert_eq!(1, result.len()); - let result = as_uint64_array(&result[0])?; + .evaluate(&values, batch.num_rows())?; + let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(()) diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index b1af6cf49a0a..bc35dd49b50d 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -17,7 +17,7 @@ use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::compute::kernels::partition::lexicographical_partition_ranges; -use arrow::compute::kernels::sort::{SortColumn, SortOptions}; +use arrow::compute::kernels::sort::SortColumn; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; use datafusion_common::{reverse_sort_options, DataFusionError, Result}; @@ -86,20 +86,6 @@ pub trait WindowExpr: Send + Sync + Debug { /// expressions that's from the window function's order by clause, empty if absent fn order_by(&self) -> &[PhysicalSortExpr]; - /// get partition columns that can be used for partitioning, empty if absent - fn partition_columns(&self, batch: &RecordBatch) -> Result> { - self.partition_by() - .iter() - .map(|expr| { - PhysicalSortExpr { - expr: expr.clone(), - options: SortOptions::default(), - } - .evaluate_to_sort_column(batch) - }) - .collect() - } - /// get order by columns, empty if absent fn order_by_columns(&self, batch: &RecordBatch) -> Result> { self.order_by() @@ -110,10 +96,8 @@ pub trait WindowExpr: Send + Sync + Debug { /// get sort columns that can be used for peer evaluation, empty if absent fn sort_columns(&self, batch: &RecordBatch) -> Result> { - let mut sort_columns = self.partition_columns(batch)?; let order_by_columns = self.order_by_columns(batch)?; - sort_columns.extend(order_by_columns); - Ok(sort_columns) + Ok(order_by_columns) } /// Get values columns(argument of Window Function) diff --git a/datafusion/physical-expr/src/window/window_frame_state.rs b/datafusion/physical-expr/src/window/window_frame_state.rs index 307ea91440df..b49bd3a22a78 100644 --- a/datafusion/physical-expr/src/window/window_frame_state.rs +++ b/datafusion/physical-expr/src/window/window_frame_state.rs @@ -26,6 +26,7 @@ use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; use std::cmp::min; use std::collections::VecDeque; use std::fmt::Debug; +use std::ops::Range; use std::sync::Arc; /// This object stores the window frame state for use in incremental calculations. @@ -68,7 +69,7 @@ impl<'a> WindowFrameContext<'a> { sort_options: &[SortOptions], length: usize, idx: usize, - ) -> Result<(usize, usize)> { + ) -> Result> { match *self { WindowFrameContext::Rows(window_frame) => { Self::calculate_range_rows(window_frame, length, idx) @@ -99,7 +100,7 @@ impl<'a> WindowFrameContext<'a> { window_frame: &Arc, length: usize, idx: usize, - ) -> Result<(usize, usize)> { + ) -> Result> { let start = match window_frame.start_bound { // UNBOUNDED PRECEDING WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0, @@ -152,7 +153,7 @@ impl<'a> WindowFrameContext<'a> { return Err(DataFusionError::Internal("Rows should be Uint".to_string())) } }; - Ok((start, end)) + Ok(Range { start, end }) } } @@ -171,7 +172,7 @@ impl WindowFrameStateRange { sort_options: &[SortOptions], length: usize, idx: usize, - ) -> Result<(usize, usize)> { + ) -> Result> { let start = match window_frame.start_bound { WindowFrameBound::Preceding(ref n) => { if n.is_null() { @@ -240,7 +241,7 @@ impl WindowFrameStateRange { } } }; - Ok((start, end)) + Ok(Range { start, end }) } /// This function does the heavy lifting when finding range boundaries. It is meant to be @@ -333,7 +334,7 @@ impl WindowFrameStateGroups { range_columns: &[ArrayRef], length: usize, idx: usize, - ) -> Result<(usize, usize)> { + ) -> Result> { if range_columns.is_empty() { return Err(DataFusionError::Execution( "GROUPS mode requires an ORDER BY clause".to_string(), @@ -399,7 +400,7 @@ impl WindowFrameStateGroups { )) } }; - Ok((start, end)) + Ok(Range { start, end }) } /// This function does the heavy lifting when finding group boundaries. It is meant to be