Skip to content

Commit

Permalink
Refactor ScalarValue::new_primitive to return Result
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgeny Maruschenko committed Oct 16, 2023
1 parent fa2bb6c commit 6de516d
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 23 deletions.
6 changes: 3 additions & 3 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,13 +744,13 @@ impl ScalarValue {
pub fn new_primitive<T: ArrowPrimitiveType>(
a: Option<T::Native>,
d: &DataType,
) -> Self {
) -> Result<Self> {
match a {
None => d.try_into().unwrap(),
None => d.try_into(),
Some(v) => {
let array = PrimitiveArray::<T>::new(vec![v].into(), None)
.with_data_type(d.clone());
Self::try_from_array(&array, 0).unwrap()
Self::try_from_array(&array, 0)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -355,7 +355,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -516,7 +516,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -641,9 +641,9 @@ where
.values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x), &T::DATA_TYPE))
.collect();
.collect::<Result<Vec<_>>>();

vec![ScalarValue::new_list(Some(values), T::DATA_TYPE)]
vec![ScalarValue::new_list(Some(values?), T::DATA_TYPE)]
};
Ok(state_out)
}
Expand Down Expand Up @@ -684,7 +684,7 @@ where
acc = acc ^ *distinct_value;
}
let v = (!self.values.is_empty()).then_some(acc);
Ok(ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/aggregate/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
.all_values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x), &self.data_type))
.collect();
let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());
.collect::<Result<Vec<_>>>();
let state = ScalarValue::new_list(Some(all_values?), self.data_type.clone());

Ok(vec![state])
}
Expand Down Expand Up @@ -188,7 +188,7 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp);
Some(*median)
};
Ok(ScalarValue::new_primitive::<T>(median, &self.data_type))
ScalarValue::new_primitive::<T>(median, &self.data_type)
}

fn size(&self) -> usize {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.sum, &self.data_type))
ScalarValue::new_primitive::<T>(self.sum, &self.data_type)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -265,7 +265,7 @@ impl<T: ArrowNumericType> Accumulator for SlidingSumAccumulator<T> {

fn evaluate(&self) -> Result<ScalarValue> {
let v = (self.count != 0).then_some(self.sum);
Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
ScalarValue::new_primitive::<T>(v, &self.data_type)
}

fn size(&self) -> usize {
Expand Down
19 changes: 10 additions & 9 deletions datafusion/physical-expr/src/aggregate/sum_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,16 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
let state_out = {
let mut distinct_values = Vec::new();
self.values.iter().for_each(|distinct_value| {
distinct_values.push(ScalarValue::new_primitive::<T>(
Some(distinct_value.0),
&self.data_type,
))
});
let distinct_values = self
.values
.iter()
.map(|value| {
ScalarValue::new_primitive::<T>(Some(value.0), &self.data_type)
})
.collect::<Result<Vec<_>>>();

vec![ScalarValue::new_list(
Some(distinct_values),
Some(distinct_values?),
self.data_type.clone(),
)]
};
Expand Down Expand Up @@ -206,7 +207,7 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
acc = acc.add_wrapping(distinct_value.0)
}
let v = (!self.values.is_empty()).then_some(acc);
Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
ScalarValue::new_primitive::<T>(v, &self.data_type)
}

fn size(&self) -> usize {
Expand Down

0 comments on commit 6de516d

Please sign in to comment.