Skip to content

Commit

Permalink
Use GroupsAccumulator exclusively in grouped hash aggregation
Browse files Browse the repository at this point in the history
Makes other AggregateExprs in use GroupsAccumulatorFlatAdapter, and
also uses a GroupsAccumulator implementation that has Box<dyn
Accumulator> inside as a fallback accumulator if some AggregateExpr
implementation does not support that.

This fully removes a batch keys and hash table iteration and brings
that performance benefit from Sum and Avg to other aggregation types.
  • Loading branch information
srh committed Nov 7, 2024
1 parent a8f045a commit c80644c
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 140 deletions.
23 changes: 23 additions & 0 deletions datafusion/src/physical_plan/distinct_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ use smallvec::SmallVec;
use std::collections::hash_map::RandomState;
use std::collections::HashSet;

use super::groups_accumulator::GroupsAccumulator;
use super::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct DistinctScalarValues(Vec<GroupByScalar>);

Expand Down Expand Up @@ -122,6 +125,26 @@ impl AggregateExpr for DistinctCount {
}))
}

fn uses_groups_accumulator(&self) -> bool {
return true;
}

fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
let state_data_types = self.state_data_types.clone();
let count_data_type = self.data_type.clone();
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
DistinctCountAccumulator,
>::new(move || {
Ok(DistinctCountAccumulator {
values: HashSet::default(),
state_data_types: state_data_types.clone(),
count_data_type: count_data_type.clone(),
})
}))))
}

fn name(&self) -> &str {
&self.name
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/physical_plan/expressions/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ impl AggregateExpr for Avg {
return true;
}

/// the groups accumulator used to accumulate values from the expression. If this returns None,
/// create_accumulator must be used.
fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
Expand Down
16 changes: 16 additions & 0 deletions datafusion/src/physical_plan/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::any::Any;
use std::sync::Arc;

use crate::error::Result;
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::compute;
Expand Down Expand Up @@ -90,6 +92,20 @@ impl AggregateExpr for Count {
Ok(Box::new(CountAccumulator::new()))
}

fn uses_groups_accumulator(&self) -> bool {
return true;
}

fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
CountAccumulator,
>::new(move || {
Ok(CountAccumulator::new())
}))))
}

fn name(&self) -> &str {
&self.name
}
Expand Down
34 changes: 34 additions & 0 deletions datafusion/src/physical_plan/expressions/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::convert::TryFrom;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::compute;
Expand Down Expand Up @@ -99,6 +101,23 @@ impl AggregateExpr for Max {
Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?))
}

fn uses_groups_accumulator(&self) -> bool {
return true;
}

/// the groups accumulator used to accumulate values from the expression. If this returns None,
/// create_accumulator must be used.
fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
let data_type = self.data_type.clone();
Ok(Some(Box::new(
GroupsAccumulatorFlatAdapter::<MaxAccumulator>::new(move || {
MaxAccumulator::try_new(&data_type)
}),
)))
}

fn name(&self) -> &str {
&self.name
}
Expand Down Expand Up @@ -523,6 +542,21 @@ impl AggregateExpr for Min {
Ok(Box::new(MinAccumulator::try_new(&self.data_type)?))
}

fn uses_groups_accumulator(&self) -> bool {
return true;
}

fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
let data_type = self.data_type.clone();
Ok(Some(Box::new(
GroupsAccumulatorFlatAdapter::<MinAccumulator>::new(move || {
MinAccumulator::try_new(&data_type)
}),
)))
}

fn name(&self) -> &str {
&self.name
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/src/physical_plan/expressions/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ impl AggregateExpr for Sum {
return true;
}

/// the groups accumulator used to accumulate values from the expression. If this returns None,
/// create_accumulator must be used.
fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
Expand Down
Loading

0 comments on commit c80644c

Please sign in to comment.