Skip to content

Commit

Permalink
Consolidate BoundedAggregateStream (#6932)
Browse files Browse the repository at this point in the history
* Consolidate `BoundedAggregateStream`

* Clarify end of input

* Apply suggestions from code review

Co-authored-by: Mustafa Akur <[email protected]>

* Update diagram for partial sort

* assert input is not done

* Apply suggestions from code review

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* clarify text

* Add more comments about delta memory accounting

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
3 people authored Jul 19, 2023
1 parent 7a029a5 commit 1810a15
Show file tree
Hide file tree
Showing 17 changed files with 983 additions and 1,398 deletions.
1,080 changes: 0 additions & 1,080 deletions datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs

This file was deleted.

19 changes: 4 additions & 15 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
//! Aggregates functionalities

use crate::physical_plan::aggregates::{
bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
row_hash::GroupedHashAggregateStream,
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
Expand All @@ -46,10 +45,9 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

mod bounded_aggregate_stream;
mod no_grouping;
mod order;
mod row_hash;
mod utils;

pub use datafusion_expr::AggregateFunction;
use datafusion_physical_expr::aggregate::is_order_sensitive;
Expand Down Expand Up @@ -95,7 +93,7 @@ pub enum AggregateMode {
/// Specifically, each distinct combination of the relevant columns
/// are contiguous in the input, and once a new combination is seen
/// previous combinations are guaranteed never to appear again
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupByOrderMode {
/// The input is not (known to be) ordered by any of the
/// expressions in the GROUP BY clause.
Expand Down Expand Up @@ -218,15 +216,13 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
BoundedAggregate(BoundedAggregateStream),
}

impl From<StreamType> for SendableRecordBatchStream {
fn from(stream: StreamType) -> Self {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
}
Expand Down Expand Up @@ -725,14 +721,6 @@ impl AggregateExec {
Ok(StreamType::AggregateStream(AggregateStream::new(
self, context, partition,
)?))
} else if let Some(aggregation_ordering) = &self.aggregation_ordering {
let aggregation_ordering = aggregation_ordering.clone();
Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
self,
context,
partition,
aggregation_ordering,
)?))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(self, context, partition)?,
Expand Down Expand Up @@ -1116,6 +1104,7 @@ fn create_accumulators(
.collect::<Result<Vec<_>>>()
}

#[allow(dead_code)]
fn create_row_accumulators(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> Result<Vec<RowAccumulatorItem>> {
Expand Down
163 changes: 163 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/order/full.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_execution::memory_pool::proxy::VecAllocExt;

use crate::physical_expr::EmitTo;

/// Tracks grouping state when the data is ordered entirely by its
/// group keys
///
/// When the group values are sorted, as soon as we see group `n+1` we
/// know we will never see any rows for group `n again and thus they
/// can be emitted.
///
/// For example, given `SUM(amt) GROUP BY id` if the input is sorted
/// by `id` as soon as a new `id` value is seen all previous values
/// can be emitted.
///
/// The state is tracked like this:
///
/// ```text
/// ┌─────┐ ┌──────────────────┐
/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃
/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
/// │ ... │ │ ... │ │
/// │┌───┐│ │ ┌──────────────┐ │ │ current
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││12 ││ │ │ 234 │ │ │
/// │├───┤│ │ ├──────────────┤ │ │
/// ││13 ││ │ │ 456 │◀┼───┘
/// │└───┘│ │ └──────────────┘ │
/// └─────┘ └──────────────────┘
///
/// group indices group_values current tracks the most
/// (in group value recent group index
/// order)
/// ```
///
/// In this diagram, the current group is `13`, and thus groups
/// `0..12` can be emitted. Note that `13` can not yet be emitted as
/// there may be more values in the next batch with the same group_id.
#[derive(Debug)]
pub(crate) struct GroupOrderingFull {
state: State,
/// Hash values for groups in 0..current
hashes: Vec<u64>,
}

#[derive(Debug)]
enum State {
/// Seen no input yet
Start,

/// Data is in progress. `current is the current group for which
/// values are being generated. Can emit `current` - 1
InProgress { current: usize },

/// Seen end of input: all groups can be emitted
Complete,
}

impl GroupOrderingFull {
pub fn new() -> Self {
Self {
state: State::Start,
hashes: vec![],
}
}

// How many groups be emitted, or None if no data can be emitted
pub fn emit_to(&self) -> Option<EmitTo> {
match &self.state {
State::Start => None,
State::InProgress { current, .. } => {
if *current == 0 {
// Can not emit if still on the first row
None
} else {
// otherwise emit all rows prior to the current group
Some(EmitTo::First(*current))
}
}
State::Complete { .. } => Some(EmitTo::All),
}
}

/// remove the first n groups from the internal state, shifting
/// all existing indexes down by `n`. Returns stored hash values
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
match &mut self.state {
State::Start => panic!("invalid state: start"),
State::InProgress { current } => {
// shift down by n
assert!(*current >= n);
*current -= n;
self.hashes.drain(0..n);
}
State::Complete { .. } => panic!("invalid state: complete"),
};
&self.hashes
}

/// Note that the input is complete so any outstanding groups are done as well
pub fn input_done(&mut self) {
self.state = State::Complete;
}

/// Called when new groups are added in a batch. See documentation
/// on [`super::GroupOrdering::new_groups`]
pub fn new_groups(
&mut self,
group_indices: &[usize],
batch_hashes: &[u64],
total_num_groups: usize,
) {
assert_ne!(total_num_groups, 0);
assert_eq!(group_indices.len(), batch_hashes.len());

// copy any hash values
self.hashes.resize(total_num_groups, 0);
for (&group_index, &hash) in group_indices.iter().zip(batch_hashes.iter()) {
self.hashes[group_index] = hash;
}

// Update state
let max_group_index = total_num_groups - 1;
self.state = match self.state {
State::Start => State::InProgress {
current: max_group_index,
},
State::InProgress { current } => {
// expect to see new group indexes when called again
assert!(current <= max_group_index, "{current} <= {max_group_index}");
State::InProgress {
current: max_group_index,
}
}
State::Complete { .. } => {
panic!("Saw new group after input was complete");
}
};
}

pub(crate) fn size(&self) -> usize {
std::mem::size_of::<Self>() + self.hashes.allocated_size()
}
}
139 changes: 139 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::ArrayRef;
use arrow_schema::Schema;
use datafusion_common::Result;
use datafusion_physical_expr::EmitTo;

use super::{AggregationOrdering, GroupByOrderMode};

mod full;
mod partial;

pub(crate) use full::GroupOrderingFull;
pub(crate) use partial::GroupOrderingPartial;

/// Ordering information for each group in the hash table
#[derive(Debug)]
pub(crate) enum GroupOrdering {
/// Groups are not ordered
None,
/// Groups are ordered by some pre-set of the group keys
Partial(GroupOrderingPartial),
/// Groups are entirely contiguous,
Full(GroupOrderingFull),
}

impl GroupOrdering {
/// Create a `GroupOrdering` for the the specified ordering
pub fn try_new(
input_schema: &Schema,
ordering: &AggregationOrdering,
) -> Result<Self> {
let AggregationOrdering {
mode,
order_indices,
ordering,
} = ordering;

Ok(match mode {
GroupByOrderMode::None => GroupOrdering::None,
GroupByOrderMode::PartiallyOrdered => {
let partial =
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
GroupOrdering::Partial(partial)
}
GroupByOrderMode::FullyOrdered => {
GroupOrdering::Full(GroupOrderingFull::new())
}
})
}

// How many groups be emitted, or None if no data can be emitted
pub fn emit_to(&self) -> Option<EmitTo> {
match self {
GroupOrdering::None => None,
GroupOrdering::Partial(partial) => partial.emit_to(),
GroupOrdering::Full(full) => full.emit_to(),
}
}

/// Updates the state the input is done
pub fn input_done(&mut self) {
match self {
GroupOrdering::None => {}
GroupOrdering::Partial(partial) => partial.input_done(),
GroupOrdering::Full(full) => full.input_done(),
}
}

/// remove the first n groups from the internal state, shifting
/// all existing indexes down by `n`. Returns stored hash values
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
match self {
GroupOrdering::None => &[],
GroupOrdering::Partial(partial) => partial.remove_groups(n),
GroupOrdering::Full(full) => full.remove_groups(n),
}
}

/// Called when new groups are added in a batch
///
/// * `total_num_groups`: total number of groups (so max
/// group_index is total_num_groups - 1).
///
/// * `group_values`: group key values for *each row* in the batch
///
/// * `group_indices`: indices for each row in the batch
///
/// * `hashes`: hash values for each row in the batch
pub fn new_groups(
&mut self,
batch_group_values: &[ArrayRef],
group_indices: &[usize],
batch_hashes: &[u64],
total_num_groups: usize,
) -> Result<()> {
match self {
GroupOrdering::None => {}
GroupOrdering::Partial(partial) => {
partial.new_groups(
batch_group_values,
group_indices,
batch_hashes,
total_num_groups,
)?;
}

GroupOrdering::Full(full) => {
full.new_groups(group_indices, batch_hashes, total_num_groups);
}
};
Ok(())
}

/// Return the size of memory used by the ordering state, in bytes
pub(crate) fn size(&self) -> usize {
std::mem::size_of::<Self>()
+ match self {
GroupOrdering::None => 0,
GroupOrdering::Partial(partial) => partial.size(),
GroupOrdering::Full(full) => full.size(),
}
}
}
Loading

0 comments on commit 1810a15

Please sign in to comment.