Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Oct 4, 2019
1 parent ebc3acb commit cc32f22
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ impl AggregateExpr for Avg {
}
}

fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
self.expr.evaluate(batch)
}

fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
Rc::new(RefCell::new(AvgAccumulator {
expr: self.expr.clone(),
Expand Down Expand Up @@ -288,8 +292,12 @@ struct AvgAccumulator {
}

impl Accumulator for AvgAccumulator {
fn accumulate(&mut self, batch: &RecordBatch, row_index: usize) -> Result<()> {
let array = self.expr.evaluate(batch)?;
fn accumulate(
&mut self,
batch: &RecordBatch,
array: &ArrayRef,
row_index: usize,
) -> Result<()> {
match self.expr.data_type(batch.schema())? {
DataType::Int8 => avg_accumulate!(self, array, row_index, Int8Array),
DataType::Int16 => avg_accumulate!(self, array, row_index, Int16Array),
Expand Down Expand Up @@ -1501,9 +1509,10 @@ mod tests {
fn do_avg(batch: &RecordBatch) -> Result<Option<ScalarValue>> {
let avg = avg(col(0));
let accum = avg.create_accumulator();
let input = avg.evaluate_input(batch)?;
let mut accum = accum.borrow_mut();
for i in 0..batch.num_rows() {
accum.accumulate(&batch, i)?;
accum.accumulate(&batch, &input, i)?;
}
accum.get_value()
}
Expand Down

0 comments on commit cc32f22

Please sign in to comment.