Skip to content

Commit

Permalink
perf: backport CountGroupsAccumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Nov 27, 2024
1 parent d3fa8ef commit 7940cf9
Showing 1 changed file with 212 additions and 9 deletions.
221 changes: 212 additions & 9 deletions datafusion/src/physical_plan/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
use std::any::Any;
use std::sync::Arc;
use std::mem::size_of;

use crate::error::Result;
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::groups_accumulator::{EmitTo, GroupsAccumulator};
use crate::physical_plan::null_state::accumulate_indices;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::array::{Array, ArrayData, BooleanArray, PrimitiveArray};
use arrow::buffer::Buffer;
use arrow::compute;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, UInt64Type};
use arrow::{
array::{ArrayRef, UInt64Array},
datatypes::Field,
Expand Down Expand Up @@ -99,11 +102,7 @@ impl AggregateExpr for Count {
fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
CountAccumulator,
>::new(move || {
Ok(CountAccumulator::new())
}))))
Ok(Some(Box::new(CountGroupsAccumulator::new())))
}

fn name(&self) -> &str {
Expand Down Expand Up @@ -170,6 +169,210 @@ impl Accumulator for CountAccumulator {
}
}

/// An accumulator to compute the counts of [`PrimitiveArray<T>`].
/// Stores values as native types, and does overflow checking
///
/// Unlike most other accumulators, COUNT never produces NULLs. If no
/// non-null values are seen in any group the output is 0. Thus, this
/// accumulator has no additional null or seen filter tracking.
#[derive(Debug)]
struct CountGroupsAccumulator {
/// Count per group.
///
/// Note that in upstream this is a Vec<i64>, and the count output and intermediate data type is
/// `DataType::Int64`. But here we are still using UInt64.
counts: Vec<u64>,
}

impl CountGroupsAccumulator {
pub fn new() -> Self {
Self { counts: vec![] }
}
}

impl GroupsAccumulator for CountGroupsAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = &values[0];

// Add one to each group's counter for each non null, non
// filtered value
self.counts.resize(total_num_groups, 0);
accumulate_indices(
group_indices,
values
.data_ref()
.null_bitmap()
.as_ref()
.map(|bitmap| (bitmap, values.offset(), values.len())),
opt_filter,
|group_index| {
self.counts[group_index] += 1;
},
);

Ok(())
}

fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
// first batch is counts, second is partial sums
let partial_counts = match values[0]
.as_any()
.downcast_ref::<PrimitiveArray<UInt64Type>>()
{
Some(x) => x,
None => {
panic!("values[0] is of unexpected type {:?}, expecting UInt64Type for intermediate count batch", values[0].data_type());
}
};

// intermediate counts are always created as non null
assert_eq!(partial_counts.null_count(), 0);
let partial_counts = partial_counts.values();

// Adds the counts with the partial counts
self.counts.resize(total_num_groups, 0);
match opt_filter {
Some(filter) => filter
.iter()
.zip(group_indices.iter())
.zip(partial_counts.iter())
.for_each(|((filter_value, &group_index), partial_count)| {
if let Some(true) = filter_value {
self.counts[group_index] += partial_count;
}
}),
None => group_indices.iter().zip(partial_counts.iter()).for_each(
|(&group_index, partial_count)| {
self.counts[group_index] += partial_count;
},
),
}

Ok(())
}

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let counts = emit_to.take_needed(&mut self.counts);

// Count is always non null (null inputs just don't contribute to the overall values)

// TODO: This copies. Ideally, don't. Note: Avoiding this memcpy had minimal effect in PrimitiveGroupsAccumulator
let buffers = vec![Buffer::from_slice_ref(&counts)];

let data = ArrayData::new(
DataType::UInt64,
counts.len(),
None,
None,
0, /* offset */
buffers,
vec![],
);
Ok(Arc::new(PrimitiveArray::<UInt64Type>::from(data)))
}

// return arrays for counts
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let counts = emit_to.take_needed(&mut self.counts);
// Backporting note: UInt64Array::from actually does copy here in old DF.
let counts: PrimitiveArray<UInt64Type> = UInt64Array::from(counts); // zero copy, no nulls
Ok(vec![Arc::new(counts) as ArrayRef])
}

/// Converts an input batch directly to a state batch
///
/// The state of `COUNT` is always a single Int64Array (backporting note: it is a UInt64Array):
/// * `1` (for non-null, non filtered values)
/// * `0` (for null values)
fn convert_to_state(
&self,
_values: &[ArrayRef],
_opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
// convert_to_state only gets used in upstream datafusion, and we set
// supports_convert_to_state to false. Because values.data_ref().offset() and the null
// bitmap have differences that require care to backport, we comment this out instead.
return Err(DataFusionError::NotImplemented(
"Input batch conversion to state not implemented".to_owned(),
));
/*
let values = &values[0];
let state_array = match (values.logical_nulls(), opt_filter) {
(None, None) => {
// In case there is no nulls in input and no filter, returning array of 1
Arc::new(Int64Array::from_value(1, values.len()))
}
(Some(nulls), None) => {
// If there are any nulls in input values -- casting `nulls` (true for values, false for nulls)
// of input array to Int64
let nulls = BooleanArray::new(nulls.into_inner(), None);
compute::cast(&nulls, &DataType::Int64)?
}
(None, Some(filter)) => {
// If there is only filter
// - applying filter null mask to filter values by bitand filter values and nulls buffers
// (using buffers guarantees absence of nulls in result)
// - casting result of bitand to Int64 array
let (filter_values, filter_nulls) = filter.clone().into_parts();
let state_buf = match filter_nulls {
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
None => filter_values,
};
let boolean_state = BooleanArray::new(state_buf, None);
compute::cast(&boolean_state, &DataType::Int64)?
}
(Some(nulls), Some(filter)) => {
// For both input nulls and filter
// - applying filter null mask to filter values by bitand filter values and nulls buffers
// (using buffers guarantees absence of nulls in result)
// - applying values null mask to filter buffer by another bitand on filter result and
// nulls from input values
// - casting result to Int64 array
let (filter_values, filter_nulls) = filter.clone().into_parts();
let filter_buf = match filter_nulls {
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
None => filter_values,
};
let state_buf = &filter_buf & nulls.inner();
let boolean_state = BooleanArray::new(state_buf, None);
compute::cast(&boolean_state, &DataType::Int64)?
}
};
Ok(vec![state_array])
*/
}

fn supports_convert_to_state(&self) -> bool {
// Is set to true in upstream (as it's implemented above in upstream). But convert_to_state
// is not used in this branch anyway.
false
}

fn size(&self) -> usize {
self.counts.capacity() * size_of::<usize>()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 7940cf9

Please sign in to comment.