diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 715725a34d56..73b71722f96e 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -46,7 +46,8 @@ use arrow::{ DECIMAL128_MAX_PRECISION, }, }; -use arrow_array::{timezone::Tz, ArrowNativeTypeOp}; +use arrow_array::timezone::Tz; +use arrow_array::ArrowNativeTypeOp; use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime}; // Constants we use throughout this file: @@ -1779,6 +1780,25 @@ macro_rules! eq_array_primitive { } impl ScalarValue { + /// Create a [`ScalarValue`] with the provided value and datatype + /// + /// # Panics + /// + /// Panics if d is not compatible with T + pub fn new_primitive( + a: Option, + d: &DataType, + ) -> Self { + match a { + None => d.try_into().unwrap(), + Some(v) => { + let array = PrimitiveArray::::new(vec![v].into(), None) + .with_data_type(d.clone()); + Self::try_from_array(&array, 0).unwrap() + } + } + } + /// Create a decimal Scalar from value/precision and scale. pub fn try_new_decimal128(value: i128, precision: u8, scale: i8) -> Result { // make sure the precision and scale is valid @@ -2089,11 +2109,13 @@ impl ScalarValue { impl_op!(self, rhs, &) } + #[deprecated(note = "Use arrow kernels or specialization (#6842)")] pub fn bitor>(&self, other: T) -> Result { let rhs = other.borrow(); impl_op!(self, rhs, |) } + #[deprecated(note = "Use arrow kernels or specialization (#6842)")] pub fn bitxor>(&self, other: T) -> Result { let rhs = other.borrow(); impl_op!(self, rhs, ^) @@ -4059,6 +4081,7 @@ mod tests { use arrow::datatypes::ArrowPrimitiveType; use arrow::util::pretty::pretty_format_columns; use arrow_array::ArrowNumericType; + use chrono::NaiveDate; use rand::Rng; use crate::cast::{as_string_array, as_uint32_array, as_uint64_array}; diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs index 55ca02a14709..93b911c939d6 100644 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs @@ -19,24 +19,12 @@ use ahash::RandomState; use std::any::Any; -use std::convert::TryFrom; use std::sync::Arc; use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr}; -use arrow::datatypes::{ - DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, - UInt64Type, UInt8Type, -}; -use arrow::{ - array::{ - ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, - }, - datatypes::Field, -}; -use datafusion_common::{ - downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue, -}; +use arrow::datatypes::DataType; +use arrow::{array::ArrayRef, datatypes::Field}; +use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::Accumulator; use std::collections::HashSet; @@ -45,81 +33,9 @@ use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::compute::{bit_and, bit_or, bit_xor}; - -/// Creates a [`PrimitiveGroupsAccumulator`] with the specified -/// [`ArrowPrimitiveType`] that initailizes each accumulator to $START -/// and applies `$FN` to each element -/// -/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType -macro_rules! instantiate_accumulator { - ($SELF:expr, $START:expr, $PRIMTYPE:ident, $FN:expr) => {{ - Ok(Box::new( - PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$SELF.data_type, $FN) - .with_starting_value($START), - )) - }}; -} - -// returns the new value after bit_and/bit_or/bit_xor with the new values, taking nullability into account -macro_rules! typed_bit_and_or_xor_batch { - ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ - let array = downcast_value!($VALUES, $ARRAYTYPE); - let delta = $OP(array); - Ok(ScalarValue::$SCALAR(delta)) - }}; -} - -// bit_and/bit_or/bit_xor the array and returns a ScalarValue of its corresponding type. -macro_rules! bit_and_or_xor_batch { - ($VALUES:expr, $OP:ident) => {{ - match $VALUES.data_type() { - DataType::Int64 => { - typed_bit_and_or_xor_batch!($VALUES, Int64Array, Int64, $OP) - } - DataType::Int32 => { - typed_bit_and_or_xor_batch!($VALUES, Int32Array, Int32, $OP) - } - DataType::Int16 => { - typed_bit_and_or_xor_batch!($VALUES, Int16Array, Int16, $OP) - } - DataType::Int8 => { - typed_bit_and_or_xor_batch!($VALUES, Int8Array, Int8, $OP) - } - DataType::UInt64 => { - typed_bit_and_or_xor_batch!($VALUES, UInt64Array, UInt64, $OP) - } - DataType::UInt32 => { - typed_bit_and_or_xor_batch!($VALUES, UInt32Array, UInt32, $OP) - } - DataType::UInt16 => { - typed_bit_and_or_xor_batch!($VALUES, UInt16Array, UInt16, $OP) - } - DataType::UInt8 => { - typed_bit_and_or_xor_batch!($VALUES, UInt8Array, UInt8, $OP) - } - e => { - return internal_err!( - "Bit and/Bit or/Bit xor is not expected to receive the type {e:?}" - ); - } - } - }}; -} - -/// dynamically-typed bit_and(array) -> ScalarValue -fn bit_and_batch(values: &ArrayRef) -> Result { - bit_and_or_xor_batch!(values, bit_and) -} - -/// dynamically-typed bit_or(array) -> ScalarValue -fn bit_or_batch(values: &ArrayRef) -> Result { - bit_and_or_xor_batch!(values, bit_or) -} - -/// dynamically-typed bit_xor(array) -> ScalarValue -fn bit_xor_batch(values: &ArrayRef) -> Result { - bit_and_or_xor_batch!(values, bit_xor) -} +use arrow_array::cast::AsArray; +use arrow_array::{downcast_integer, ArrowNumericType}; +use arrow_buffer::ArrowNativeType; /// BIT_AND aggregate expression #[derive(Debug, Clone)] @@ -161,7 +77,19 @@ impl AggregateExpr for BitAnd { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(BitAndAccumulator::try_new(&self.data_type)?)) + macro_rules! helper { + ($t:ty) => { + Ok(Box::>::default()) + }; + } + downcast_integer! { + &self.data_type => (helper), + _ => Err(DataFusionError::NotImplemented(format!( + "BitAndAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } } fn state_fields(&self) -> Result> { @@ -186,39 +114,22 @@ impl AggregateExpr for BitAnd { fn create_groups_accumulator(&self) -> Result> { use std::ops::BitAndAssign; - // Note the default value for BitAnd should be all set - // (e.g. `0b11...111`) use MAX / -1 here to get appropriate - // bit pattern for each type - match self.data_type { - DataType::Int8 => { - instantiate_accumulator!(self, -1, Int8Type, |x, y| x.bitand_assign(y)) - } - DataType::Int16 => { - instantiate_accumulator!(self, -1, Int16Type, |x, y| x.bitand_assign(y)) - } - DataType::Int32 => { - instantiate_accumulator!(self, -1, Int32Type, |x, y| x.bitand_assign(y)) - } - DataType::Int64 => { - instantiate_accumulator!(self, -1, Int64Type, |x, y| x.bitand_assign(y)) - } - DataType::UInt8 => { - instantiate_accumulator!(self, u8::MAX, UInt8Type, |x, y| x - .bitand_assign(y)) - } - DataType::UInt16 => { - instantiate_accumulator!(self, u16::MAX, UInt16Type, |x, y| x - .bitand_assign(y)) - } - DataType::UInt32 => { - instantiate_accumulator!(self, u32::MAX, UInt32Type, |x, y| x - .bitand_assign(y)) - } - DataType::UInt64 => { - instantiate_accumulator!(self, u64::MAX, UInt64Type, |x, y| x - .bitand_assign(y)) - } + // Note the default value for BitAnd should be all set, i.e. `!0` + macro_rules! helper { + ($t:ty, $dt:expr) => { + Ok(Box::new( + PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| { + x.bitand_assign(y) + }) + .with_starting_value(!0), + )) + }; + } + + let data_type = &self.data_type; + downcast_integer! { + data_type => (helper, data_type), _ => not_impl_err!( "GroupsAccumulator not supported for {} with {}", self.name(), @@ -246,25 +157,31 @@ impl PartialEq for BitAnd { } } -#[derive(Debug)] -struct BitAndAccumulator { - bit_and: ScalarValue, +struct BitAndAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitAndAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitAndAccumulator({})", T::DATA_TYPE) + } } -impl BitAndAccumulator { - /// new bit_and accumulator - pub fn try_new(data_type: &DataType) -> Result { - Ok(Self { - bit_and: ScalarValue::try_from(data_type)?, - }) +impl Default for BitAndAccumulator { + fn default() -> Self { + Self { value: None } } } -impl Accumulator for BitAndAccumulator { +impl Accumulator for BitAndAccumulator +where + T::Native: std::ops::BitAnd, +{ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - let delta = &bit_and_batch(values)?; - self.bit_and = self.bit_and.bitand(delta)?; + if let Some(x) = bit_and(values[0].as_primitive::()) { + let v = self.value.get_or_insert(x); + *v = *v & x; + } Ok(()) } @@ -273,16 +190,15 @@ impl Accumulator for BitAndAccumulator { } fn state(&self) -> Result> { - Ok(vec![self.bit_and.clone()]) + Ok(vec![self.evaluate()?]) } fn evaluate(&self) -> Result { - Ok(self.bit_and.clone()) + Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) } fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.bit_and) - + self.bit_and.size() + std::mem::size_of_val(self) } } @@ -326,7 +242,19 @@ impl AggregateExpr for BitOr { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(BitOrAccumulator::try_new(&self.data_type)?)) + macro_rules! helper { + ($t:ty) => { + Ok(Box::>::default()) + }; + } + downcast_integer! { + &self.data_type => (helper), + _ => Err(DataFusionError::NotImplemented(format!( + "BitOrAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } } fn state_fields(&self) -> Result> { @@ -351,32 +279,18 @@ impl AggregateExpr for BitOr { fn create_groups_accumulator(&self) -> Result> { use std::ops::BitOrAssign; - match self.data_type { - DataType::Int8 => { - instantiate_accumulator!(self, 0, Int8Type, |x, y| x.bitor_assign(y)) - } - DataType::Int16 => { - instantiate_accumulator!(self, 0, Int16Type, |x, y| x.bitor_assign(y)) - } - DataType::Int32 => { - instantiate_accumulator!(self, 0, Int32Type, |x, y| x.bitor_assign(y)) - } - DataType::Int64 => { - instantiate_accumulator!(self, 0, Int64Type, |x, y| x.bitor_assign(y)) - } - DataType::UInt8 => { - instantiate_accumulator!(self, 0, UInt8Type, |x, y| x.bitor_assign(y)) - } - DataType::UInt16 => { - instantiate_accumulator!(self, 0, UInt16Type, |x, y| x.bitor_assign(y)) - } - DataType::UInt32 => { - instantiate_accumulator!(self, 0, UInt32Type, |x, y| x.bitor_assign(y)) - } - DataType::UInt64 => { - instantiate_accumulator!(self, 0, UInt64Type, |x, y| x.bitor_assign(y)) - } + macro_rules! helper { + ($t:ty, $dt:expr) => { + Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new( + $dt, + |x, y| x.bitor_assign(y), + ))) + }; + } + let data_type = &self.data_type; + downcast_integer! { + data_type => (helper, data_type), _ => not_impl_err!( "GroupsAccumulator not supported for {} with {}", self.name(), @@ -404,29 +318,35 @@ impl PartialEq for BitOr { } } -#[derive(Debug)] -struct BitOrAccumulator { - bit_or: ScalarValue, +struct BitOrAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitOrAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitOrAccumulator({})", T::DATA_TYPE) + } } -impl BitOrAccumulator { - /// new bit_or accumulator - pub fn try_new(data_type: &DataType) -> Result { - Ok(Self { - bit_or: ScalarValue::try_from(data_type)?, - }) +impl Default for BitOrAccumulator { + fn default() -> Self { + Self { value: None } } } -impl Accumulator for BitOrAccumulator { +impl Accumulator for BitOrAccumulator +where + T::Native: std::ops::BitOr, +{ fn state(&self) -> Result> { - Ok(vec![self.bit_or.clone()]) + Ok(vec![self.evaluate()?]) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - let delta = &bit_or_batch(values)?; - self.bit_or = self.bit_or.bitor(delta)?; + if let Some(x) = bit_or(values[0].as_primitive::()) { + let v = self.value.get_or_insert(T::Native::usize_as(0)); + *v = *v | x; + } Ok(()) } @@ -435,12 +355,11 @@ impl Accumulator for BitOrAccumulator { } fn evaluate(&self) -> Result { - Ok(self.bit_or.clone()) + Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) } fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.bit_or) - + self.bit_or.size() + std::mem::size_of_val(self) } } @@ -484,7 +403,19 @@ impl AggregateExpr for BitXor { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(BitXorAccumulator::try_new(&self.data_type)?)) + macro_rules! helper { + ($t:ty) => { + Ok(Box::>::default()) + }; + } + downcast_integer! { + &self.data_type => (helper), + _ => Err(DataFusionError::NotImplemented(format!( + "BitXor not supported for {} with {}", + self.name(), + self.data_type + ))), + } } fn state_fields(&self) -> Result> { @@ -509,32 +440,18 @@ impl AggregateExpr for BitXor { fn create_groups_accumulator(&self) -> Result> { use std::ops::BitXorAssign; - match self.data_type { - DataType::Int8 => { - instantiate_accumulator!(self, 0, Int8Type, |x, y| x.bitxor_assign(y)) - } - DataType::Int16 => { - instantiate_accumulator!(self, 0, Int16Type, |x, y| x.bitxor_assign(y)) - } - DataType::Int32 => { - instantiate_accumulator!(self, 0, Int32Type, |x, y| x.bitxor_assign(y)) - } - DataType::Int64 => { - instantiate_accumulator!(self, 0, Int64Type, |x, y| x.bitxor_assign(y)) - } - DataType::UInt8 => { - instantiate_accumulator!(self, 0, UInt8Type, |x, y| x.bitxor_assign(y)) - } - DataType::UInt16 => { - instantiate_accumulator!(self, 0, UInt16Type, |x, y| x.bitxor_assign(y)) - } - DataType::UInt32 => { - instantiate_accumulator!(self, 0, UInt32Type, |x, y| x.bitxor_assign(y)) - } - DataType::UInt64 => { - instantiate_accumulator!(self, 0, UInt64Type, |x, y| x.bitxor_assign(y)) - } + macro_rules! helper { + ($t:ty, $dt:expr) => { + Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new( + $dt, + |x, y| x.bitxor_assign(y), + ))) + }; + } + let data_type = &self.data_type; + downcast_integer! { + data_type => (helper, data_type), _ => not_impl_err!( "GroupsAccumulator not supported for {} with {}", self.name(), @@ -562,29 +479,35 @@ impl PartialEq for BitXor { } } -#[derive(Debug)] -struct BitXorAccumulator { - bit_xor: ScalarValue, +struct BitXorAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitXorAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitXorAccumulator({})", T::DATA_TYPE) + } } -impl BitXorAccumulator { - /// new bit_xor accumulator - pub fn try_new(data_type: &DataType) -> Result { - Ok(Self { - bit_xor: ScalarValue::try_from(data_type)?, - }) +impl Default for BitXorAccumulator { + fn default() -> Self { + Self { value: None } } } -impl Accumulator for BitXorAccumulator { +impl Accumulator for BitXorAccumulator +where + T::Native: std::ops::BitXor, +{ fn state(&self) -> Result> { - Ok(vec![self.bit_xor.clone()]) + Ok(vec![self.evaluate()?]) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - let delta = &bit_xor_batch(values)?; - self.bit_xor = self.bit_xor.bitxor(delta)?; + if let Some(x) = bit_xor(values[0].as_primitive::()) { + let v = self.value.get_or_insert(T::Native::usize_as(0)); + *v = *v ^ x; + } Ok(()) } @@ -593,12 +516,11 @@ impl Accumulator for BitXorAccumulator { } fn evaluate(&self) -> Result { - Ok(self.bit_xor.clone()) + Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) } fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.bit_xor) - + self.bit_xor.size() + std::mem::size_of_val(self) } } @@ -642,9 +564,19 @@ impl AggregateExpr for DistinctBitXor { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(DistinctBitXorAccumulator::try_new( - &self.data_type, - )?)) + macro_rules! helper { + ($t:ty) => { + Ok(Box::>::default()) + }; + } + downcast_integer! { + &self.data_type => (helper), + _ => Err(DataFusionError::NotImplemented(format!( + "DistinctBitXorAccumulator not supported for {} with {}", + self.name(), + self.data_type + ))), + } } fn state_fields(&self) -> Result> { @@ -679,34 +611,39 @@ impl PartialEq for DistinctBitXor { } } -#[derive(Debug)] -struct DistinctBitXorAccumulator { - hash_values: HashSet, - data_type: DataType, +struct DistinctBitXorAccumulator { + values: HashSet, } -impl DistinctBitXorAccumulator { - pub fn try_new(data_type: &DataType) -> Result { - Ok(Self { - hash_values: HashSet::default(), - data_type: data_type.clone(), - }) +impl std::fmt::Debug for DistinctBitXorAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE) } } -impl Accumulator for DistinctBitXorAccumulator { +impl Default for DistinctBitXorAccumulator { + fn default() -> Self { + Self { + values: HashSet::default(), + } + } +} + +impl Accumulator for DistinctBitXorAccumulator +where + T::Native: std::ops::BitXor + std::hash::Hash + Eq, +{ fn state(&self) -> Result> { // 1. Stores aggregate state in `ScalarValue::List` // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set let state_out = { - let mut distinct_values = Vec::new(); - self.hash_values + let values = self + .values .iter() - .for_each(|distinct_value| distinct_values.push(distinct_value.clone())); - vec![ScalarValue::new_list( - Some(distinct_values), - self.data_type.clone(), - )] + .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) + .collect(); + + vec![ScalarValue::new_list(Some(values), T::DATA_TYPE)] }; Ok(state_out) } @@ -716,14 +653,18 @@ impl Accumulator for DistinctBitXorAccumulator { return Ok(()); } - let arr = &values[0]; - (0..values[0].len()).try_for_each(|index| { - if !arr.is_null(index) { - let v = ScalarValue::try_from_array(arr, index)?; - self.hash_values.insert(v); + let array = values[0].as_primitive::(); + match array.nulls().filter(|x| x.null_count() > 0) { + Some(n) => { + for idx in n.valid_indices() { + self.values.insert(array.value(idx)); + } } - Ok(()) - }) + None => array.values().iter().for_each(|x| { + self.values.insert(*x); + }), + } + Ok(()) } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -731,36 +672,24 @@ impl Accumulator for DistinctBitXorAccumulator { return Ok(()); } - let arr = &states[0]; - (0..arr.len()).try_for_each(|index| { - let scalar = ScalarValue::try_from_array(arr, index)?; - - if let ScalarValue::List(Some(scalar), _) = scalar { - scalar.iter().for_each(|scalar| { - if !ScalarValue::is_null(scalar) { - self.hash_values.insert(scalar.clone()); - } - }); - } else { - return internal_err!("Unexpected accumulator state"); - } - Ok(()) - }) + for x in states[0].as_list::().iter().flatten() { + self.update_batch(&[x])? + } + Ok(()) } fn evaluate(&self) -> Result { - let mut bit_xor_value = ScalarValue::try_from(&self.data_type)?; - for distinct_value in self.hash_values.iter() { - bit_xor_value = bit_xor_value.bitxor(distinct_value)?; + let mut acc = T::Native::usize_as(0); + for distinct_value in self.values.iter() { + acc = acc ^ *distinct_value; } - Ok(bit_xor_value) + let v = (!self.values.is_empty()).then_some(acc); + Ok(ScalarValue::new_primitive::(v, &T::DATA_TYPE)) } fn size(&self) -> usize { - std::mem::size_of_val(self) + ScalarValue::size_of_hashset(&self.hash_values) - - std::mem::size_of_val(&self.hash_values) - + self.data_type.size() - - std::mem::size_of_val(&self.data_type) + std::mem::size_of_val(self) + + self.values.capacity() * std::mem::size_of::() } } @@ -770,6 +699,7 @@ mod tests { use crate::expressions::col; use crate::expressions::tests::aggregate; use crate::generic_test_op; + use arrow::array::*; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; use datafusion_common::Result;