Skip to content

Commit

Permalink
Refactor ScalarValue::new_primitive to return Result
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgeny Maruschenko committed Oct 17, 2023
1 parent cb2d03c commit a7f55b8
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 26 deletions.
8 changes: 4 additions & 4 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,21 +744,21 @@ macro_rules! eq_array_primitive {
}

impl ScalarValue {
/// Create a [`ScalarValue`] with the provided value and datatype
/// Create a [`Result<ScalarValue>`] with the provided value and datatype
///
/// # Panics
///
/// Panics if d is not compatible with T
pub fn new_primitive<T: ArrowPrimitiveType>(
a: Option<T::Native>,
d: &DataType,
) -> Self {
) -> Result<Self> {
match a {
None => d.try_into().unwrap(),
None => d.try_into(),
Some(v) => {
let array = PrimitiveArray::<T>::new(vec![v].into(), None)
.with_data_type(d.clone());
Self::try_from_array(&array, 0).unwrap()
Self::try_from_array(&array, 0)
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -356,7 +356,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -517,7 +517,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -638,13 +638,13 @@ where
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
let state_out = {
let values: Vec<ScalarValue> = self
let values = self
.values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x), &T::DATA_TYPE))
.collect();
.collect::<Result<Vec<_>>>();

let arr = ScalarValue::new_list(&values, &T::DATA_TYPE);
let arr = ScalarValue::new_list(&values?, &T::DATA_TYPE);
vec![ScalarValue::List(arr)]
};
Ok(state_out)
Expand Down Expand Up @@ -685,7 +685,7 @@ where
acc = acc ^ *distinct_value;
}
let v = (!self.values.is_empty()).then_some(acc);
Ok(ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-expr/src/aggregate/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ impl<T: ArrowNumericType> std::fmt::Debug for MedianAccumulator<T> {

impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
fn state(&self) -> Result<Vec<ScalarValue>> {
let all_values: Vec<ScalarValue> = self
let all_values = self
.all_values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x), &self.data_type))
.collect();
.collect::<Result<Vec<_>>>();

let arr = ScalarValue::new_list(&all_values, &self.data_type);
let arr = ScalarValue::new_list(&all_values?, &self.data_type);
Ok(vec![ScalarValue::List(arr)])
}

Expand Down Expand Up @@ -188,7 +188,7 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp);
Some(*median)
};
Ok(ScalarValue::new_primitive::<T>(median, &self.data_type))
ScalarValue::new_primitive::<T>(median, &self.data_type)
}

fn size(&self) -> usize {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.sum, &self.data_type))
ScalarValue::new_primitive::<T>(self.sum, &self.data_type)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -265,7 +265,7 @@ impl<T: ArrowNumericType> Accumulator for SlidingSumAccumulator<T> {

fn evaluate(&self) -> Result<ScalarValue> {
let v = (self.count != 0).then_some(self.sum);
Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
ScalarValue::new_primitive::<T>(v, &self.data_type)
}

fn size(&self) -> usize {
Expand Down
19 changes: 10 additions & 9 deletions datafusion/physical-expr/src/aggregate/sum_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,16 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
let state_out = {
let mut distinct_values = Vec::new();
self.values.iter().for_each(|distinct_value| {
distinct_values.push(ScalarValue::new_primitive::<T>(
Some(distinct_value.0),
&self.data_type,
))
});
let distinct_values = self
.values
.iter()
.map(|value| {
ScalarValue::new_primitive::<T>(Some(value.0), &self.data_type)
})
.collect::<Result<Vec<_>>>();

vec![ScalarValue::List(ScalarValue::new_list(
&distinct_values,
&distinct_values?,
&self.data_type,
))]
};
Expand Down Expand Up @@ -206,7 +207,7 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
acc = acc.add_wrapping(distinct_value.0)
}
let v = (!self.values.is_empty()).then_some(acc);
Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
ScalarValue::new_primitive::<T>(v, &self.data_type)
}

fn size(&self) -> usize {
Expand Down

0 comments on commit a7f55b8

Please sign in to comment.