Skip to content

Commit

Permalink
perf: Use primitive accumulator for min and max, and backport CountGr…
Browse files Browse the repository at this point in the history
…oupsAccumulator

Note that we avoid the upstream DF bugs on min/max with groups accumulators on float32 and float64,
where, in the max case, max({-infinity}) returns `f32::MIN` or `f64::MIN`, respectively, which are *not* negative infinity.
  • Loading branch information
srh authored Nov 27, 2024
1 parent 64ae03e commit 8d4663b
Show file tree
Hide file tree
Showing 3 changed files with 500 additions and 23 deletions.
221 changes: 212 additions & 9 deletions datafusion/src/physical_plan/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
//! Defines physical expressions that can evaluated at runtime during query execution
use std::any::Any;
use std::mem::size_of;
use std::sync::Arc;

use crate::error::Result;
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::groups_accumulator::{EmitTo, GroupsAccumulator};
use crate::physical_plan::null_state::accumulate_indices;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::array::{Array, ArrayData, BooleanArray, PrimitiveArray};
use arrow::buffer::Buffer;
use arrow::compute;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, UInt64Type};
use arrow::{
array::{ArrayRef, UInt64Array},
datatypes::Field,
Expand Down Expand Up @@ -99,11 +102,7 @@ impl AggregateExpr for Count {
fn create_groups_accumulator(
&self,
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
CountAccumulator,
>::new(move || {
Ok(CountAccumulator::new())
}))))
Ok(Some(Box::new(CountGroupsAccumulator::new())))
}

fn name(&self) -> &str {
Expand Down Expand Up @@ -170,6 +169,210 @@ impl Accumulator for CountAccumulator {
}
}

/// An accumulator to compute the counts of [`PrimitiveArray<T>`].
/// Stores values as native types, and does overflow checking
///
/// Unlike most other accumulators, COUNT never produces NULLs. If no
/// non-null values are seen in any group the output is 0. Thus, this
/// accumulator has no additional null or seen filter tracking.
#[derive(Debug)]
struct CountGroupsAccumulator {
/// Count per group.
///
/// Note that in upstream this is a Vec<i64>, and the count output and intermediate data type is
/// `DataType::Int64`. But here we are still using UInt64.
counts: Vec<u64>,
}

impl CountGroupsAccumulator {
pub fn new() -> Self {
Self { counts: vec![] }
}
}

impl GroupsAccumulator for CountGroupsAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = &values[0];

// Add one to each group's counter for each non null, non
// filtered value
self.counts.resize(total_num_groups, 0);
accumulate_indices(
group_indices,
values
.data_ref()
.null_bitmap()
.as_ref()
.map(|bitmap| (bitmap, values.offset(), values.len())),
opt_filter,
|group_index| {
self.counts[group_index] += 1;
},
);

Ok(())
}

fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "one argument to merge_batch");
// first batch is counts, second is partial sums
let partial_counts = match values[0]
.as_any()
.downcast_ref::<PrimitiveArray<UInt64Type>>()
{
Some(x) => x,
None => {
panic!("values[0] is of unexpected type {:?}, expecting UInt64Type for intermediate count batch", values[0].data_type());
}
};

// intermediate counts are always created as non null
assert_eq!(partial_counts.null_count(), 0);
let partial_counts = partial_counts.values();

// Adds the counts with the partial counts
self.counts.resize(total_num_groups, 0);
match opt_filter {
Some(filter) => filter
.iter()
.zip(group_indices.iter())
.zip(partial_counts.iter())
.for_each(|((filter_value, &group_index), partial_count)| {
if let Some(true) = filter_value {
self.counts[group_index] += partial_count;
}
}),
None => group_indices.iter().zip(partial_counts.iter()).for_each(
|(&group_index, partial_count)| {
self.counts[group_index] += partial_count;
},
),
}

Ok(())
}

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let counts = emit_to.take_needed(&mut self.counts);

// Count is always non null (null inputs just don't contribute to the overall values)

// TODO: This copies. Ideally, don't. Note: Avoiding this memcpy had minimal effect in PrimitiveGroupsAccumulator
let buffers = vec![Buffer::from_slice_ref(&counts)];

let data = ArrayData::new(
DataType::UInt64,
counts.len(),
None,
None,
0, /* offset */
buffers,
vec![],
);
Ok(Arc::new(PrimitiveArray::<UInt64Type>::from(data)))
}

// return arrays for counts
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let counts = emit_to.take_needed(&mut self.counts);
// Backporting note: UInt64Array::from actually does copy here in old DF.
let counts: PrimitiveArray<UInt64Type> = UInt64Array::from(counts); // zero copy, no nulls
Ok(vec![Arc::new(counts) as ArrayRef])
}

/// Converts an input batch directly to a state batch
///
/// The state of `COUNT` is always a single Int64Array (backporting note: it is a UInt64Array):
/// * `1` (for non-null, non filtered values)
/// * `0` (for null values)
fn convert_to_state(
&self,
_values: &[ArrayRef],
_opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
// convert_to_state only gets used in upstream datafusion, and we set
// supports_convert_to_state to false. Because values.data_ref().offset() and the null
// bitmap have differences that require care to backport, we comment this out instead.
return Err(DataFusionError::NotImplemented(
"Input batch conversion to state not implemented".to_owned(),
));
/*
let values = &values[0];
let state_array = match (values.logical_nulls(), opt_filter) {
(None, None) => {
// In case there is no nulls in input and no filter, returning array of 1
Arc::new(Int64Array::from_value(1, values.len()))
}
(Some(nulls), None) => {
// If there are any nulls in input values -- casting `nulls` (true for values, false for nulls)
// of input array to Int64
let nulls = BooleanArray::new(nulls.into_inner(), None);
compute::cast(&nulls, &DataType::Int64)?
}
(None, Some(filter)) => {
// If there is only filter
// - applying filter null mask to filter values by bitand filter values and nulls buffers
// (using buffers guarantees absence of nulls in result)
// - casting result of bitand to Int64 array
let (filter_values, filter_nulls) = filter.clone().into_parts();
let state_buf = match filter_nulls {
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
None => filter_values,
};
let boolean_state = BooleanArray::new(state_buf, None);
compute::cast(&boolean_state, &DataType::Int64)?
}
(Some(nulls), Some(filter)) => {
// For both input nulls and filter
// - applying filter null mask to filter values by bitand filter values and nulls buffers
// (using buffers guarantees absence of nulls in result)
// - applying values null mask to filter buffer by another bitand on filter result and
// nulls from input values
// - casting result to Int64 array
let (filter_values, filter_nulls) = filter.clone().into_parts();
let filter_buf = match filter_nulls {
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
None => filter_values,
};
let state_buf = &filter_buf & nulls.inner();
let boolean_state = BooleanArray::new(state_buf, None);
compute::cast(&boolean_state, &DataType::Int64)?
}
};
Ok(vec![state_array])
*/
}

fn supports_convert_to_state(&self) -> bool {
// Is set to true in upstream (as it's implemented above in upstream). But convert_to_state
// is not used in this branch anyway.
false
}

fn size(&self) -> usize {
self.counts.capacity() * size_of::<usize>()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 8d4663b

Please sign in to comment.