diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 2d47b3e314723..2c3dd4c5ca554 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -744,7 +744,7 @@ macro_rules! eq_array_primitive { } impl ScalarValue { - /// Create a [`ScalarValue`] with the provided value and datatype + /// Create a [`Result`] with the provided value and datatype /// /// # Panics /// @@ -752,13 +752,13 @@ impl ScalarValue { pub fn new_primitive( a: Option, d: &DataType, - ) -> Self { + ) -> Result { match a { - None => d.try_into().unwrap(), + None => d.try_into(), Some(v) => { let array = PrimitiveArray::::new(vec![v].into(), None) .with_data_type(d.clone()); - Self::try_from_array(&array, 0).unwrap() + Self::try_from_array(&array, 0) } } } diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs index d7934e79c366d..4d6381c3e80ad 100644 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs @@ -195,7 +195,7 @@ where } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } fn size(&self) -> usize { @@ -356,7 +356,7 @@ where } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } fn size(&self) -> usize { @@ -517,7 +517,7 @@ where } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } fn size(&self) -> usize { @@ -638,13 +638,13 @@ where // 1. Stores aggregate state in `ScalarValue::List` // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set let state_out = { - let values: Vec = self + let values = self .values .iter() .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) - .collect(); + .collect::>>(); - let arr = ScalarValue::new_list(&values, &T::DATA_TYPE); + let arr = ScalarValue::new_list(&values?, &T::DATA_TYPE); vec![ScalarValue::List(arr)] }; Ok(state_out) @@ -685,7 +685,7 @@ where acc = acc ^ *distinct_value; } let v = (!self.values.is_empty()).then_some(acc); - Ok(ScalarValue::new_primitive::(v, &T::DATA_TYPE)) + ScalarValue::new_primitive::(v, &T::DATA_TYPE) } fn size(&self) -> usize { diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index 477dcadceee73..088dff824e110 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -146,13 +146,13 @@ impl std::fmt::Debug for MedianAccumulator { impl Accumulator for MedianAccumulator { fn state(&self) -> Result> { - let all_values: Vec = self + let all_values = self .all_values .iter() .map(|x| ScalarValue::new_primitive::(Some(*x), &self.data_type)) - .collect(); + .collect::>>(); - let arr = ScalarValue::new_list(&all_values, &self.data_type); + let arr = ScalarValue::new_list(&all_values?, &self.data_type); Ok(vec![ScalarValue::List(arr)]) } @@ -188,7 +188,7 @@ impl Accumulator for MedianAccumulator { let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp); Some(*median) }; - Ok(ScalarValue::new_primitive::(median, &self.data_type)) + ScalarValue::new_primitive::(median, &self.data_type) } fn size(&self) -> usize { diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 5cc8e933324e3..d6c23d0dfafde 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -205,7 +205,7 @@ impl Accumulator for SumAccumulator { } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.sum, &self.data_type)) + ScalarValue::new_primitive::(self.sum, &self.data_type) } fn size(&self) -> usize { @@ -265,7 +265,7 @@ impl Accumulator for SlidingSumAccumulator { fn evaluate(&self) -> Result { let v = (self.count != 0).then_some(self.sum); - Ok(ScalarValue::new_primitive::(v, &self.data_type)) + ScalarValue::new_primitive::(v, &self.data_type) } fn size(&self) -> usize { diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index 742e24b99e71e..d81473b2a8061 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -159,15 +159,16 @@ impl Accumulator for DistinctSumAccumulator { // 1. Stores aggregate state in `ScalarValue::List` // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set let state_out = { - let mut distinct_values = Vec::new(); - self.values.iter().for_each(|distinct_value| { - distinct_values.push(ScalarValue::new_primitive::( - Some(distinct_value.0), - &self.data_type, - )) - }); + let distinct_values = self + .values + .iter() + .map(|value| { + ScalarValue::new_primitive::(Some(value.0), &self.data_type) + }) + .collect::>>(); + vec![ScalarValue::List(ScalarValue::new_list( - &distinct_values, + &distinct_values?, &self.data_type, ))] }; @@ -206,7 +207,7 @@ impl Accumulator for DistinctSumAccumulator { acc = acc.add_wrapping(distinct_value.0) } let v = (!self.values.is_empty()).then_some(acc); - Ok(ScalarValue::new_primitive::(v, &self.data_type)) + ScalarValue::new_primitive::(v, &self.data_type) } fn size(&self) -> usize {