From 8d4663ba60e4370a953b62a302221c46eca39e5c Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Wed, 27 Nov 2024 13:38:53 -0800 Subject: [PATCH] perf: Use primitive accumulator for min and max, and backport CountGroupsAccumulator Note that we avoid the upstream DF bugs on min/max with groups accumulators on float32 and float64, where, in the max case, max({-infinity}) returns `f32::MIN` or `f64::MIN`, respectively, which are *not* negative infinity. --- .../src/physical_plan/expressions/count.rs | 221 ++++++++++++++- .../src/physical_plan/expressions/min_max.rs | 251 +++++++++++++++++- .../src/physical_plan/expressions/mod.rs | 51 +++- 3 files changed, 500 insertions(+), 23 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/count.rs b/datafusion/src/physical_plan/expressions/count.rs index 0ea4353da766..b123865cc182 100644 --- a/datafusion/src/physical_plan/expressions/count.rs +++ b/datafusion/src/physical_plan/expressions/count.rs @@ -18,15 +18,18 @@ //! Defines physical expressions that can evaluated at runtime during query execution use std::any::Any; +use std::mem::size_of; use std::sync::Arc; -use crate::error::Result; -use crate::physical_plan::groups_accumulator::GroupsAccumulator; -use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::groups_accumulator::{EmitTo, GroupsAccumulator}; +use crate::physical_plan::null_state::accumulate_indices; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; +use arrow::array::{Array, ArrayData, BooleanArray, PrimitiveArray}; +use arrow::buffer::Buffer; use arrow::compute; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, UInt64Type}; use arrow::{ array::{ArrayRef, UInt64Array}, datatypes::Field, @@ -99,11 +102,7 @@ impl AggregateExpr for Count { fn create_groups_accumulator( &self, ) -> arrow::error::Result>> { - Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::< - CountAccumulator, - >::new(move || { - Ok(CountAccumulator::new()) - })))) + Ok(Some(Box::new(CountGroupsAccumulator::new()))) } fn name(&self) -> &str { @@ -170,6 +169,210 @@ impl Accumulator for CountAccumulator { } } +/// An accumulator to compute the counts of [`PrimitiveArray`]. +/// Stores values as native types, and does overflow checking +/// +/// Unlike most other accumulators, COUNT never produces NULLs. If no +/// non-null values are seen in any group the output is 0. Thus, this +/// accumulator has no additional null or seen filter tracking. +#[derive(Debug)] +struct CountGroupsAccumulator { + /// Count per group. + /// + /// Note that in upstream this is a Vec, and the count output and intermediate data type is + /// `DataType::Int64`. But here we are still using UInt64. + counts: Vec, +} + +impl CountGroupsAccumulator { + pub fn new() -> Self { + Self { counts: vec![] } + } +} + +impl GroupsAccumulator for CountGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "single argument to update_batch"); + let values = &values[0]; + + // Add one to each group's counter for each non null, non + // filtered value + self.counts.resize(total_num_groups, 0); + accumulate_indices( + group_indices, + values + .data_ref() + .null_bitmap() + .as_ref() + .map(|bitmap| (bitmap, values.offset(), values.len())), + opt_filter, + |group_index| { + self.counts[group_index] += 1; + }, + ); + + Ok(()) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 1, "one argument to merge_batch"); + // first batch is counts, second is partial sums + let partial_counts = match values[0] + .as_any() + .downcast_ref::>() + { + Some(x) => x, + None => { + panic!("values[0] is of unexpected type {:?}, expecting UInt64Type for intermediate count batch", values[0].data_type()); + } + }; + + // intermediate counts are always created as non null + assert_eq!(partial_counts.null_count(), 0); + let partial_counts = partial_counts.values(); + + // Adds the counts with the partial counts + self.counts.resize(total_num_groups, 0); + match opt_filter { + Some(filter) => filter + .iter() + .zip(group_indices.iter()) + .zip(partial_counts.iter()) + .for_each(|((filter_value, &group_index), partial_count)| { + if let Some(true) = filter_value { + self.counts[group_index] += partial_count; + } + }), + None => group_indices.iter().zip(partial_counts.iter()).for_each( + |(&group_index, partial_count)| { + self.counts[group_index] += partial_count; + }, + ), + } + + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let counts = emit_to.take_needed(&mut self.counts); + + // Count is always non null (null inputs just don't contribute to the overall values) + + // TODO: This copies. Ideally, don't. Note: Avoiding this memcpy had minimal effect in PrimitiveGroupsAccumulator + let buffers = vec![Buffer::from_slice_ref(&counts)]; + + let data = ArrayData::new( + DataType::UInt64, + counts.len(), + None, + None, + 0, /* offset */ + buffers, + vec![], + ); + Ok(Arc::new(PrimitiveArray::::from(data))) + } + + // return arrays for counts + fn state(&mut self, emit_to: EmitTo) -> Result> { + let counts = emit_to.take_needed(&mut self.counts); + // Backporting note: UInt64Array::from actually does copy here in old DF. + let counts: PrimitiveArray = UInt64Array::from(counts); // zero copy, no nulls + Ok(vec![Arc::new(counts) as ArrayRef]) + } + + /// Converts an input batch directly to a state batch + /// + /// The state of `COUNT` is always a single Int64Array (backporting note: it is a UInt64Array): + /// * `1` (for non-null, non filtered values) + /// * `0` (for null values) + fn convert_to_state( + &self, + _values: &[ArrayRef], + _opt_filter: Option<&BooleanArray>, + ) -> Result> { + // convert_to_state only gets used in upstream datafusion, and we set + // supports_convert_to_state to false. Because values.data_ref().offset() and the null + // bitmap have differences that require care to backport, we comment this out instead. + return Err(DataFusionError::NotImplemented( + "Input batch conversion to state not implemented".to_owned(), + )); + /* + let values = &values[0]; + + let state_array = match (values.logical_nulls(), opt_filter) { + (None, None) => { + // In case there is no nulls in input and no filter, returning array of 1 + Arc::new(Int64Array::from_value(1, values.len())) + } + (Some(nulls), None) => { + // If there are any nulls in input values -- casting `nulls` (true for values, false for nulls) + // of input array to Int64 + let nulls = BooleanArray::new(nulls.into_inner(), None); + compute::cast(&nulls, &DataType::Int64)? + } + (None, Some(filter)) => { + // If there is only filter + // - applying filter null mask to filter values by bitand filter values and nulls buffers + // (using buffers guarantees absence of nulls in result) + // - casting result of bitand to Int64 array + let (filter_values, filter_nulls) = filter.clone().into_parts(); + + let state_buf = match filter_nulls { + Some(filter_nulls) => &filter_values & filter_nulls.inner(), + None => filter_values, + }; + + let boolean_state = BooleanArray::new(state_buf, None); + compute::cast(&boolean_state, &DataType::Int64)? + } + (Some(nulls), Some(filter)) => { + // For both input nulls and filter + // - applying filter null mask to filter values by bitand filter values and nulls buffers + // (using buffers guarantees absence of nulls in result) + // - applying values null mask to filter buffer by another bitand on filter result and + // nulls from input values + // - casting result to Int64 array + let (filter_values, filter_nulls) = filter.clone().into_parts(); + + let filter_buf = match filter_nulls { + Some(filter_nulls) => &filter_values & filter_nulls.inner(), + None => filter_values, + }; + let state_buf = &filter_buf & nulls.inner(); + + let boolean_state = BooleanArray::new(state_buf, None); + compute::cast(&boolean_state, &DataType::Int64)? + } + }; + + Ok(vec![state_array]) + */ + } + + fn supports_convert_to_state(&self) -> bool { + // Is set to true in upstream (as it's implemented above in upstream). But convert_to_state + // is not used in this branch anyway. + false + } + + fn size(&self) -> usize { + self.counts.capacity() * size_of::() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 9d5bb3f9db4a..cf9932670a75 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -17,6 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution +use core::{f32, f64}; use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; @@ -24,10 +25,11 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result}; use crate::physical_plan::groups_accumulator::GroupsAccumulator; use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; +use crate::physical_plan::groups_accumulator_prim_op::PrimitiveGroupsAccumulator; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::compute; -use arrow::datatypes::{DataType, TimeUnit}; +use arrow::datatypes::{ArrowPrimitiveType, DataType, TimeUnit}; use arrow::{ array::{ ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, @@ -108,12 +110,97 @@ impl AggregateExpr for Max { fn create_groups_accumulator( &self, ) -> arrow::error::Result>> { - let data_type = self.data_type.clone(); - Ok(Some(Box::new( - GroupsAccumulatorFlatAdapter::::new(move || { - MaxAccumulator::try_new(&data_type) - }), - ))) + macro_rules! make_max_accumulator { + ($T:ty) => { + make_max_accumulator!($T, <$T as ArrowPrimitiveType>::Native::MIN) + }; + ($T:ty, $STARTING_VALUE:expr) => { + Box::new( + PrimitiveGroupsAccumulator::<$T, $T, _, _>::new( + &<$T as ArrowPrimitiveType>::DATA_TYPE, + |x: &mut <$T as ArrowPrimitiveType>::Native, + y: <$T as ArrowPrimitiveType>::Native| { + *x = (*x).max(y); + }, + |x: &mut <$T as ArrowPrimitiveType>::Native, + y: <$T as ArrowPrimitiveType>::Native| { + *x = (*x).max(y); + }, + ) + .with_starting_value($STARTING_VALUE), + ) + }; + } + let acc: Box = match &self.data_type { + DataType::Float64 => { + make_max_accumulator!(arrow::datatypes::Float64Type, f64::NEG_INFINITY) + } + DataType::Float32 => { + make_max_accumulator!(arrow::datatypes::Float32Type, f32::NEG_INFINITY) + } + DataType::Int64 => make_max_accumulator!(arrow::datatypes::Int64Type), + DataType::Int96 => make_max_accumulator!(arrow::datatypes::Int96Type), + DataType::Int64Decimal(0) => { + make_max_accumulator!(arrow::datatypes::Int64Decimal0Type) + } + DataType::Int64Decimal(1) => { + make_max_accumulator!(arrow::datatypes::Int64Decimal1Type) + } + DataType::Int64Decimal(2) => { + make_max_accumulator!(arrow::datatypes::Int64Decimal2Type) + } + DataType::Int64Decimal(3) => { + make_max_accumulator!(arrow::datatypes::Int64Decimal3Type) + } + DataType::Int64Decimal(4) => { + make_max_accumulator!(arrow::datatypes::Int64Decimal4Type) + } + DataType::Int64Decimal(5) => { + make_max_accumulator!(arrow::datatypes::Int64Decimal5Type) + } + DataType::Int64Decimal(10) => { + make_max_accumulator!(arrow::datatypes::Int64Decimal10Type) + } + DataType::Int96Decimal(0) => { + make_max_accumulator!(arrow::datatypes::Int96Decimal0Type) + } + DataType::Int96Decimal(1) => { + make_max_accumulator!(arrow::datatypes::Int96Decimal1Type) + } + DataType::Int96Decimal(2) => { + make_max_accumulator!(arrow::datatypes::Int96Decimal2Type) + } + DataType::Int96Decimal(3) => { + make_max_accumulator!(arrow::datatypes::Int96Decimal3Type) + } + DataType::Int96Decimal(4) => { + make_max_accumulator!(arrow::datatypes::Int96Decimal4Type) + } + DataType::Int96Decimal(5) => { + make_max_accumulator!(arrow::datatypes::Int96Decimal5Type) + } + DataType::Int96Decimal(10) => { + make_max_accumulator!(arrow::datatypes::Int96Decimal10Type) + } + DataType::Int32 => make_max_accumulator!(arrow::datatypes::Int32Type), + DataType::Int16 => make_max_accumulator!(arrow::datatypes::Int16Type), + DataType::Int8 => make_max_accumulator!(arrow::datatypes::Int8Type), + DataType::UInt64 => make_max_accumulator!(arrow::datatypes::UInt64Type), + DataType::UInt32 => make_max_accumulator!(arrow::datatypes::UInt32Type), + DataType::UInt16 => make_max_accumulator!(arrow::datatypes::UInt16Type), + DataType::UInt8 => make_max_accumulator!(arrow::datatypes::UInt8Type), + _ => { + // Not all types (strings) can use primitive accumulators. And strings use + // max_string as the $OP in typed_min_match_batch. + + // Timestamps presently take this branch. + let data_type = self.data_type.clone(); + Box::new(GroupsAccumulatorFlatAdapter::::new( + move || MaxAccumulator::try_new(&data_type), + )) + } + }; + Ok(Some(acc)) } fn name(&self) -> &str { @@ -547,12 +634,98 @@ impl AggregateExpr for Min { fn create_groups_accumulator( &self, ) -> arrow::error::Result>> { - let data_type = self.data_type.clone(); - Ok(Some(Box::new( - GroupsAccumulatorFlatAdapter::::new(move || { - MinAccumulator::try_new(&data_type) - }), - ))) + macro_rules! make_min_accumulator { + ($T:ty) => { + make_min_accumulator!($T, <$T as ArrowPrimitiveType>::Native::MAX) + }; + ($T:ty, $STARTING_VALUE:expr) => { + Box::new( + PrimitiveGroupsAccumulator::<$T, $T, _, _>::new( + &<$T as ArrowPrimitiveType>::DATA_TYPE, + |x: &mut <$T as ArrowPrimitiveType>::Native, + y: <$T as ArrowPrimitiveType>::Native| { + *x = (*x).min(y); + }, + |x: &mut <$T as ArrowPrimitiveType>::Native, + y: <$T as ArrowPrimitiveType>::Native| { + *x = (*x).min(y); + }, + ) + .with_starting_value($STARTING_VALUE), + ) + }; + } + + let acc: Box = match &self.data_type { + DataType::Float64 => { + make_min_accumulator!(arrow::datatypes::Float64Type, f64::INFINITY) + } + DataType::Float32 => { + make_min_accumulator!(arrow::datatypes::Float32Type, f32::INFINITY) + } + DataType::Int64 => make_min_accumulator!(arrow::datatypes::Int64Type), + DataType::Int96 => make_min_accumulator!(arrow::datatypes::Int96Type), + DataType::Int64Decimal(0) => { + make_min_accumulator!(arrow::datatypes::Int64Decimal0Type) + } + DataType::Int64Decimal(1) => { + make_min_accumulator!(arrow::datatypes::Int64Decimal1Type) + } + DataType::Int64Decimal(2) => { + make_min_accumulator!(arrow::datatypes::Int64Decimal2Type) + } + DataType::Int64Decimal(3) => { + make_min_accumulator!(arrow::datatypes::Int64Decimal3Type) + } + DataType::Int64Decimal(4) => { + make_min_accumulator!(arrow::datatypes::Int64Decimal4Type) + } + DataType::Int64Decimal(5) => { + make_min_accumulator!(arrow::datatypes::Int64Decimal5Type) + } + DataType::Int64Decimal(10) => { + make_min_accumulator!(arrow::datatypes::Int64Decimal10Type) + } + DataType::Int96Decimal(0) => { + make_min_accumulator!(arrow::datatypes::Int96Decimal0Type) + } + DataType::Int96Decimal(1) => { + make_min_accumulator!(arrow::datatypes::Int96Decimal1Type) + } + DataType::Int96Decimal(2) => { + make_min_accumulator!(arrow::datatypes::Int96Decimal2Type) + } + DataType::Int96Decimal(3) => { + make_min_accumulator!(arrow::datatypes::Int96Decimal3Type) + } + DataType::Int96Decimal(4) => { + make_min_accumulator!(arrow::datatypes::Int96Decimal4Type) + } + DataType::Int96Decimal(5) => { + make_min_accumulator!(arrow::datatypes::Int96Decimal5Type) + } + DataType::Int96Decimal(10) => { + make_min_accumulator!(arrow::datatypes::Int96Decimal10Type) + } + DataType::Int32 => make_min_accumulator!(arrow::datatypes::Int32Type), + DataType::Int16 => make_min_accumulator!(arrow::datatypes::Int16Type), + DataType::Int8 => make_min_accumulator!(arrow::datatypes::Int8Type), + DataType::UInt64 => make_min_accumulator!(arrow::datatypes::UInt64Type), + DataType::UInt32 => make_min_accumulator!(arrow::datatypes::UInt32Type), + DataType::UInt16 => make_min_accumulator!(arrow::datatypes::UInt16Type), + DataType::UInt8 => make_min_accumulator!(arrow::datatypes::UInt8Type), + _ => { + // Not all types (strings) can use primitive accumulators. And strings use + // min_string as the $OP in typed_min_match_batch. + + // Timestamps presently take this branch. + let data_type = self.data_type.clone(); + Box::new(GroupsAccumulatorFlatAdapter::::new( + move || MinAccumulator::try_new(&data_type), + )) + } + }; + Ok(Some(acc)) } fn name(&self) -> &str { @@ -612,9 +785,13 @@ impl Accumulator for MinAccumulator { #[cfg(test)] mod tests { + use core::f64; + use super::*; + use crate::generic_grouped_test_op; use crate::physical_plan::expressions::col; use crate::physical_plan::expressions::tests::aggregate; + use crate::physical_plan::expressions::tests::grouped_aggregate; use crate::{error::Result, generic_test_op}; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; @@ -816,6 +993,30 @@ mod tests { ) } + #[test] + fn max_f64_infinity() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![f64::NEG_INFINITY])); + generic_test_op!( + a, + DataType::Float64, + Max, + ScalarValue::from(f64::NEG_INFINITY), + DataType::Float64 + ) + } + + #[test] + fn max_f64_infinity_grouped() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![f64::NEG_INFINITY])); + generic_grouped_test_op!( + a, + DataType::Float64, + Max, + ScalarValue::from(f64::NEG_INFINITY), + DataType::Float64 + ) + } + #[test] fn min_f64() -> Result<()> { let a: ArrayRef = @@ -828,4 +1029,28 @@ mod tests { DataType::Float64 ) } + + #[test] + fn min_f64_infinity() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![f64::INFINITY])); + generic_test_op!( + a, + DataType::Float64, + Min, + ScalarValue::from(f64::INFINITY), + DataType::Float64 + ) + } + + #[test] + fn min_f64_infinity_grouped() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![f64::INFINITY])); + generic_grouped_test_op!( + a, + DataType::Float64, + Min, + ScalarValue::from(f64::INFINITY), + DataType::Float64 + ) + } } diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index bd3dab65b05d..6e718f91676e 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -121,7 +121,14 @@ impl PhysicalSortExpr { #[cfg(test)] mod tests { use super::*; - use crate::{error::Result, physical_plan::AggregateExpr, scalar::ScalarValue}; + use crate::{ + error::Result, + physical_plan::{ + groups_accumulator::{EmitTo, GroupsAccumulator}, + AggregateExpr, + }, + scalar::ScalarValue, + }; /// macro to perform an aggregation and verify the result. #[macro_export] @@ -159,4 +166,46 @@ mod tests { accum.update_batch(&values)?; accum.evaluate() } + + /// macro to perform a grouped aggregation and verify the result. + #[macro_export] + macro_rules! generic_grouped_test_op { + ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $EXPECTED:expr, $EXPECTED_DATATYPE:expr) => {{ + let schema = Schema::new(vec![Field::new("a", $DATATYPE, false)]); + + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY])?; + + let agg = Arc::new(<$OP>::new( + col("a", &schema)?, + "bla".to_string(), + $EXPECTED_DATATYPE, + )); + let actual = grouped_aggregate(&batch, agg)?; + let expected = ScalarValue::from($EXPECTED); + + assert_eq!(expected, actual); + + Ok(()) + }}; + } + + pub fn grouped_aggregate( + batch: &RecordBatch, + agg: Arc, + ) -> Result { + let accum = agg.create_groups_accumulator()?; + let mut accum: Box = + accum.ok_or(DataFusionError::Internal( + "create_groups_accumulator not supported".to_owned(), + ))?; + let expr = agg.expressions(); + let values = expr + .iter() + .map(|e| e.evaluate(batch)) + .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .collect::>>()?; + accum.update_batch(&values, &vec![0; values[0].len()], None, 1)?; + let results = accum.evaluate(EmitTo::All)?; + ScalarValue::try_from_array(&results, 0) + } }