diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index fc0bf4af89d8..95d443b257a3 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -79,7 +79,7 @@ use super::{ use crate::cube_ext; use crate::cube_ext::ordfloat::{OrdF32, OrdF64}; -use crate::physical_plan::sorted_aggregate::SortedAggState; +use crate::physical_plan::sorted_aggregate::{agg_key_equals, SortedAggState}; use compute::cast; use smallvec::smallvec; use smallvec::SmallVec; @@ -440,13 +440,20 @@ pub(crate) fn group_aggregate_batch( // Keys received in this batch let mut batch_keys = BinaryBuilder::new(0); - for row in 0..batch.num_rows() { + let mut row: usize = 0; + + while row < batch.num_rows() { if skip_row(&batch, row) { continue; } // 1.1 create_key(&group_values, row, &mut key) .map_err(DataFusionError::into_arrow_external_error)?; + let start = row; + row += 1; + while row < batch.num_rows() && agg_key_equals(&key, &group_values, row)? { + row += 1; + } accumulators .raw_entry_mut() @@ -456,7 +463,7 @@ pub(crate) fn group_aggregate_batch( if v.is_empty() { batch_keys.append_value(&key).expect("must not fail"); }; - v.push(row as u32) + v.extend((start as u32..row as u32).into_iter()) }) // 1.2 .or_insert_with(|| { @@ -469,7 +476,11 @@ pub(crate) fn group_aggregate_batch( std::mem::swap(&mut taken_values, &mut group_by_values); ( key.clone(), - (taken_values, accumulator_set, smallvec![row as u32]), + ( + taken_values, + accumulator_set, + smallvec![(start as u32..row as u32).into_iter()], + ), ) }); } diff --git a/datafusion/src/physical_plan/sorted_aggregate.rs b/datafusion/src/physical_plan/sorted_aggregate.rs index c4dba9f61d1b..7e92ec058bb3 100644 --- a/datafusion/src/physical_plan/sorted_aggregate.rs +++ b/datafusion/src/physical_plan/sorted_aggregate.rs @@ -192,7 +192,7 @@ impl SortedAggState { } } -fn agg_key_equals( +pub fn agg_key_equals( key: &[GroupByScalar], key_columns: &[ArrayRef], row: usize,