From 6de516d5abd8aa8dfcd824e371976fe574106e7c Mon Sep 17 00:00:00 2001 From: Evgeny Maruschenko Date: Mon, 16 Oct 2023 19:51:39 +1000 Subject: [PATCH] Refactor ScalarValue::new_primitive to return Result --- datafusion/common/src/scalar.rs | 6 +++--- .../src/aggregate/bit_and_or_xor.rs | 12 ++++++------ .../physical-expr/src/aggregate/median.rs | 6 +++--- datafusion/physical-expr/src/aggregate/sum.rs | 4 ++-- .../src/aggregate/sum_distinct.rs | 19 ++++++++++--------- 5 files changed, 24 insertions(+), 23 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 242d784edc9d6..4aac042e6254c 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -744,13 +744,13 @@ impl ScalarValue { pub fn new_primitive( a: Option, d: &DataType, - ) -> Self { + ) -> Result { match a { - None => d.try_into().unwrap(), + None => d.try_into(), Some(v) => { let array = PrimitiveArray::::new(vec![v].into(), None) .with_data_type(d.clone()); - Self::try_from_array(&array, 0).unwrap() + Self::try_from_array(&array, 0) } } } diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs index 93b911c939d67..9f36174c43868 100644 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs @@ -194,7 +194,7 @@ where } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } fn size(&self) -> usize { @@ -355,7 +355,7 @@ where } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } fn size(&self) -> usize { @@ -516,7 +516,7 @@ where } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.value, &T::DATA_TYPE)) + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } fn size(&self) -> usize { @@ -641,9 +641,9 @@ where .values .iter() .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) - .collect(); + .collect::>>(); - vec![ScalarValue::new_list(Some(values), T::DATA_TYPE)] + vec![ScalarValue::new_list(Some(values?), T::DATA_TYPE)] }; Ok(state_out) } @@ -684,7 +684,7 @@ where acc = acc ^ *distinct_value; } let v = (!self.values.is_empty()).then_some(acc); - Ok(ScalarValue::new_primitive::(v, &T::DATA_TYPE)) + ScalarValue::new_primitive::(v, &T::DATA_TYPE) } fn size(&self) -> usize { diff --git a/datafusion/physical-expr/src/aggregate/median.rs b/datafusion/physical-expr/src/aggregate/median.rs index 1ec4124026381..61e15ffacbb85 100644 --- a/datafusion/physical-expr/src/aggregate/median.rs +++ b/datafusion/physical-expr/src/aggregate/median.rs @@ -150,8 +150,8 @@ impl Accumulator for MedianAccumulator { .all_values .iter() .map(|x| ScalarValue::new_primitive::(Some(*x), &self.data_type)) - .collect(); - let state = ScalarValue::new_list(Some(all_values), self.data_type.clone()); + .collect::>>(); + let state = ScalarValue::new_list(Some(all_values?), self.data_type.clone()); Ok(vec![state]) } @@ -188,7 +188,7 @@ impl Accumulator for MedianAccumulator { let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp); Some(*median) }; - Ok(ScalarValue::new_primitive::(median, &self.data_type)) + ScalarValue::new_primitive::(median, &self.data_type) } fn size(&self) -> usize { diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 5cc8e933324e3..d6c23d0dfafde 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -205,7 +205,7 @@ impl Accumulator for SumAccumulator { } fn evaluate(&self) -> Result { - Ok(ScalarValue::new_primitive::(self.sum, &self.data_type)) + ScalarValue::new_primitive::(self.sum, &self.data_type) } fn size(&self) -> usize { @@ -265,7 +265,7 @@ impl Accumulator for SlidingSumAccumulator { fn evaluate(&self) -> Result { let v = (self.count != 0).then_some(self.sum); - Ok(ScalarValue::new_primitive::(v, &self.data_type)) + ScalarValue::new_primitive::(v, &self.data_type) } fn size(&self) -> usize { diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index c3d8d5e870683..fee4178baad44 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -159,15 +159,16 @@ impl Accumulator for DistinctSumAccumulator { // 1. Stores aggregate state in `ScalarValue::List` // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set let state_out = { - let mut distinct_values = Vec::new(); - self.values.iter().for_each(|distinct_value| { - distinct_values.push(ScalarValue::new_primitive::( - Some(distinct_value.0), - &self.data_type, - )) - }); + let distinct_values = self + .values + .iter() + .map(|value| { + ScalarValue::new_primitive::(Some(value.0), &self.data_type) + }) + .collect::>>(); + vec![ScalarValue::new_list( - Some(distinct_values), + Some(distinct_values?), self.data_type.clone(), )] }; @@ -206,7 +207,7 @@ impl Accumulator for DistinctSumAccumulator { acc = acc.add_wrapping(distinct_value.0) } let v = (!self.values.is_empty()).then_some(acc); - Ok(ScalarValue::new_primitive::(v, &self.data_type)) + ScalarValue::new_primitive::(v, &self.data_type) } fn size(&self) -> usize {