Skip to content

Commit

Permalink
re-design the sketch.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 14, 2024
1 parent fd237f8 commit ab92626
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
9 changes: 9 additions & 0 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum EmitTo {
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
/// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
First(usize),
/// Emit all groups managed by blocks
CurrentBlock(bool),
}

impl EmitTo {
Expand All @@ -52,6 +54,7 @@ impl EmitTo {
std::mem::swap(v, &mut t);
t
}
EmitTo::CurrentBlock(_) => unimplemented!(),
}
}
}
Expand Down Expand Up @@ -143,6 +146,12 @@ pub trait GroupsAccumulator: Send {
/// [`Accumulator::state`]: crate::accumulator::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Returns `true` if blocked emission is supported
/// The blocked emission is possible to avoid result splitting in aggregation.
fn supports_blocked_emission(&self) -> bool {
false
}

/// Merges intermediate state (the output from [`Self::state`])
/// into this accumulator's current state.
///
Expand Down
15 changes: 13 additions & 2 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bytes_view::GroupValuesBytesView;
use datafusion_common::Result;

pub(crate) mod primitive;
use datafusion_expr::EmitTo;
use datafusion_expr::{groups_accumulator::GroupIndicesType, EmitTo};
use primitive::GroupValuesPrimitive;

mod row;
Expand All @@ -36,7 +36,12 @@ use datafusion_physical_expr::binary_map::OutputType;
/// An interning store for group keys
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<u64>,
group_type: GroupIndicesType,
) -> Result<()>;

/// Returns the number of bytes used by this [`GroupValues`]
fn size(&self) -> usize;
Expand All @@ -52,6 +57,12 @@ pub trait GroupValues: Send {

/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);

/// Returns `true` if blocked emission is supported
/// The blocked emission is possible to avoid result splitting in aggregation.
fn supports_blocked_emission(&self) -> bool {
false
}
}

pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
Expand Down
71 changes: 63 additions & 8 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub(crate) enum ExecutionState {
/// When producing output, the remaining rows to output are stored
/// here and are sliced off as needed in batch_size chunks
ProducingOutput(RecordBatch),
ProducingBlocks(Option<usize>),
/// Produce intermediate aggregate state for each input row without
/// aggregation.
///
Expand Down Expand Up @@ -387,6 +388,10 @@ pub(crate) struct GroupedHashAggregateStream {
/// Optional probe for skipping data aggregation, if supported by
/// current stream.
skip_aggregation_probe: Option<SkipAggregationProbe>,

enable_blocked_group_states: bool,

block_size: usize,
}

impl GroupedHashAggregateStream {
Expand Down Expand Up @@ -676,6 +681,43 @@ impl Stream for GroupedHashAggregateStream {
)));
}

ExecutionState::ProducingBlocks(blocks) => {
if let Some(blk) = blocks {
if blk > 0 {
self.exec_state = ExecutionState::ProducingBlocks(Some(*blk - 1));
} else {
self.exec_state = if self.input_done {
ExecutionState::Done
} else if self.should_skip_aggregation() {
ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
};
continue;
}
}

let emit_result = self.emit(EmitTo::CurrentBlock(true), false);
if emit_result.is_err() {
return Poll::Ready(Some(emit_result));
}

let emit_batch = emit_result.unwrap();
if emit_batch.num_rows() == 0 {
self.exec_state = if self.input_done {
ExecutionState::Done
} else if self.should_skip_aggregation() {
ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
};
}

return Poll::Ready(Some(Ok(
emit_batch.record_output(&self.baseline_metrics)
)));
}

ExecutionState::Done => {
// release the memory reservation since sending back output batch itself needs
// some memory reservation, so make some room for it.
Expand Down Expand Up @@ -900,10 +942,15 @@ impl GroupedHashAggregateStream {
&& matches!(self.group_ordering, GroupOrdering::None)
&& matches!(self.mode, AggregateMode::Partial)
&& self.update_memory_reservation().is_err()
{
let n = self.group_values.len() / self.batch_size * self.batch_size;
let batch = self.emit(EmitTo::First(n), false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
{
if self.enable_blocked_group_states {
let n = self.group_values.len() / self.batch_size * self.batch_size;
let batch = self.emit(EmitTo::First(n), false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
} else {
let blocks = self.group_values.len() / self.block_size;
self.exec_state = ExecutionState::ProducingBlocks(Some(blocks));
}
}
Ok(())
}
Expand Down Expand Up @@ -961,8 +1008,12 @@ impl GroupedHashAggregateStream {
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
let timer = elapsed_compute.timer();
self.exec_state = if self.spill_state.spills.is_empty() {
let batch = self.emit(EmitTo::All, false)?;
ExecutionState::ProducingOutput(batch)
if !self.enable_blocked_group_states {
let batch = self.emit(EmitTo::All, false)?;
ExecutionState::ProducingOutput(batch)
} else {
ExecutionState::ProducingBlocks(None)
}
} else {
// If spill files exist, stream-merge them.
self.update_merged_stream()?;
Expand Down Expand Up @@ -994,8 +1045,12 @@ impl GroupedHashAggregateStream {
fn switch_to_skip_aggregation(&mut self) -> Result<()> {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if probe.should_skip() {
let batch = self.emit(EmitTo::All, false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
if !self.enable_blocked_group_states {
let batch = self.emit(EmitTo::All, false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
} else {
self.exec_state = ExecutionState::ProducingBlocks(None);
}
}
}

Expand Down

0 comments on commit ab92626

Please sign in to comment.