From 399ab8fdd982e6ad06373fe6a9a15e2b68a76177 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 3 Oct 2019 18:10:19 -0600 Subject: [PATCH] ARROW-6736: [Rust] [DataFusion] Evaluate the input to the aggregate expression just once per batch The current implementation of aggregate expressions in the new physical plan had a flaw where the input to the aggregate expression was repeatedly being evaluated (once per row instead of once per batch). This PR fixes this. Closes #5542 from andygrove/ARROW-6736 and squashes the following commits: f0fadafb1 Evaluate the input to the aggregate expression just once per batch Authored-by: Andy Grove Signed-off-by: Andy Grove --- .../execution/physical_plan/expressions.rs | 15 ++++++++-- .../execution/physical_plan/hash_aggregate.rs | 29 +++++++++++++++++-- .../src/execution/physical_plan/mod.rs | 9 +++++- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs index 9e0741c263ca6..f63b40cb688ff 100644 --- a/rust/datafusion/src/execution/physical_plan/expressions.rs +++ b/rust/datafusion/src/execution/physical_plan/expressions.rs @@ -106,6 +106,10 @@ impl AggregateExpr for Sum { } } + fn evaluate_input(&self, batch: &RecordBatch) -> Result { + self.expr.evaluate(batch) + } + fn create_accumulator(&self) -> Rc> { Rc::new(RefCell::new(SumAccumulator { expr: self.expr.clone(), @@ -149,8 +153,12 @@ struct SumAccumulator { } impl Accumulator for SumAccumulator { - fn accumulate(&mut self, batch: &RecordBatch, row_index: usize) -> Result<()> { - let array = self.expr.evaluate(batch)?; + fn accumulate( + &mut self, + batch: &RecordBatch, + array: &ArrayRef, + row_index: usize, + ) -> Result<()> { match self.expr.data_type(batch.schema())? { DataType::Int8 => { sum_accumulate!(self, array, row_index, Int8Array, Int64, i64) @@ -697,9 +705,10 @@ mod tests { fn do_sum(batch: &RecordBatch) -> Result> { let sum = sum(col(0)); let accum = sum.create_accumulator(); + let input = sum.evaluate_input(batch)?; let mut accum = accum.borrow_mut(); for i in 0..batch.num_rows() { - accum.accumulate(&batch, i)?; + accum.accumulate(&batch, &input, i)?; } accum.get_value() } diff --git a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs index 491a81af8557e..0f593616446f8 100644 --- a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs @@ -281,6 +281,13 @@ impl BatchIterator for GroupedHashAggregateIterator { .map(|expr| expr.evaluate(&batch)) .collect::>>()?; + // evaluate the inputs to the aggregate expressions for this batch + let aggr_input_values = self + .aggr_expr + .iter() + .map(|expr| expr.evaluate_input(&batch)) + .collect::>>()?; + // iterate over each row in the batch for row in 0..batch.num_rows() { // create grouping key for this row @@ -290,7 +297,10 @@ impl BatchIterator for GroupedHashAggregateIterator { Some(accumulators) => { let _ = accumulators .iter() - .map(|accum| accum.borrow_mut().accumulate(&batch, row)) + .zip(aggr_input_values.iter()) + .map(|(accum, input)| { + accum.borrow_mut().accumulate(&batch, input, row) + }) .collect::>>()?; Ok(true) } @@ -306,7 +316,10 @@ impl BatchIterator for GroupedHashAggregateIterator { let _ = accumulators .iter() - .map(|accum| accum.borrow_mut().accumulate(&batch, row)) + .zip(aggr_input_values.iter()) + .map(|(accum, input)| { + accum.borrow_mut().accumulate(&batch, input, row) + }) .collect::>>()?; map.insert(key.clone(), accumulators); @@ -511,11 +524,21 @@ impl BatchIterator for HashAggregateIterator { // iterate over input and perform aggregation while let Some(batch) = input.next()? { + // evaluate the inputs to the aggregate expressions for this batch + let aggr_input_values = self + .aggr_expr + .iter() + .map(|expr| expr.evaluate_input(&batch)) + .collect::>>()?; + // iterate over each row in the batch for row in 0..batch.num_rows() { let _ = accumulators .iter() - .map(|accum| accum.borrow_mut().accumulate(&batch, row)) + .zip(aggr_input_values.iter()) + .map(|(accum, input)| { + accum.borrow_mut().accumulate(&batch, input, row) + }) .collect::>>()?; } } diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index b820af86b2699..08a2737391b58 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -65,6 +65,8 @@ pub trait AggregateExpr: Send + Sync { fn name(&self) -> String; /// Get the data type of this expression, given the schema of the input fn data_type(&self, input_schema: &Schema) -> Result; + /// Evaluate the expressioon being aggregated + fn evaluate_input(&self, batch: &RecordBatch) -> Result; /// Create an accumulator for this aggregate expression fn create_accumulator(&self) -> Rc>; /// Create an aggregate expression for combining the results of accumulators from partitions. @@ -76,7 +78,12 @@ pub trait AggregateExpr: Send + Sync { /// Aggregate accumulator pub trait Accumulator { /// Update the accumulator based on a row in a batch - fn accumulate(&mut self, batch: &RecordBatch, row_index: usize) -> Result<()>; + fn accumulate( + &mut self, + batch: &RecordBatch, + input: &ArrayRef, + row_index: usize, + ) -> Result<()>; /// Get the final value for the accumulator fn get_value(&self) -> Result>; }