Skip to content

Commit

Permalink
Comment adjustment and addressed some warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
srh committed Nov 4, 2024
1 parent 7c71b24 commit 18b3fae
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 21 deletions.
2 changes: 1 addition & 1 deletion datafusion/src/cube_ext/joinagg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils::from_plan;
use crate::physical_plan::hash_aggregate::{
create_accumulation_state, AccumulationState, Accumulators, AggregateMode,
create_accumulation_state, AggregateMode,
};
use crate::physical_plan::planner::{physical_name, ExtensionPlanner};
use crate::physical_plan::{hash_aggregate, PhysicalPlanner};
Expand Down
1 change: 0 additions & 1 deletion datafusion/src/physical_plan/expressions/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
use crate::physical_plan::groups_accumulator_adapter::GroupsAccumulatorAdapter;
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/expressions/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
use crate::physical_plan::groups_accumulator_adapter::GroupsAccumulatorAdapter;
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
Expand All @@ -45,7 +44,7 @@ use super::format_state_name;
use smallvec::smallvec;
use smallvec::SmallVec;

// SUM aggregate expression
/// SUM aggregate expression
#[derive(Debug)]
pub struct Sum {
name: String,
Expand Down
7 changes: 4 additions & 3 deletions datafusion/src/physical_plan/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ pub trait GroupsAccumulator: Send {
total_num_groups: usize,
) -> Result<()>;

/// update_batch but where group_indices is already ordered into adjacent groups. `offsets` has
/// the group boundaries, and note that `offsets[0] == 0` (and the last offset is
/// `group_indices.len()`).
/// update_batch but where group_indices is already clumped into groups. `offsets` has the
/// group boundaries, and note that `offsets[0] == 0` (and the last offset is
/// `group_indices.len()`). So offsets[i] .. offsets[i + 1] is a half-open interval of equal
/// group indices.
fn update_batch_preordered(
&mut self,
values: &[ArrayRef],
Expand Down
11 changes: 6 additions & 5 deletions datafusion/src/physical_plan/groups_accumulator_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! Utilities for implementing GroupsAccumulator
//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
use std::mem::{size_of, size_of_val};
use std::mem::size_of;
// use std::mem::size_of_val; // TODO: Remove commented Accumulator::size() code?

use crate::arrow_datafusion_err;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -252,8 +253,8 @@ impl GroupsAccumulatorAdapter {
// RecordBatch(es)
let iter = groups_with_rows.iter().zip(offsets.windows(2));

let mut sizes_pre = 0;
let mut sizes_post = 0;
// let mut sizes_pre = 0;
// let mut sizes_post = 0;
for (&group_idx, offsets) in iter {
let state = &mut self.states[group_idx];
// sizes_pre += state.size(); // TODO: Add Accumulator::size?
Expand Down Expand Up @@ -331,7 +332,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {

let results: Vec<ScalarValue> = states
.into_iter()
.map(|mut state| {
.map(|state| {
// self.free_allocation(state.size()); // TODO: Add Accumulator::size?
state.accumulator.evaluate()
})
Expand All @@ -357,7 +358,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
// which we need to form into columns
let mut results: Vec<Vec<ScalarValue>> = vec![];

for mut state in states {
for state in states {
// self.free_allocation(state.size()); // TODO: Add Accumulator::size?
let accumulator_state = state.accumulator.state()?;
results.resize_with(accumulator_state.len(), Vec::new);
Expand Down
15 changes: 8 additions & 7 deletions datafusion/src/physical_plan/groups_accumulator_flat_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! Utilities for implementing GroupsAccumulator
//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
use std::mem::{size_of, size_of_val};
use std::mem::size_of;
// use std::mem::size_of_val; // TODO: Remove commented Accumulator::size() code?

use crate::arrow_datafusion_err;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -189,8 +190,8 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
"asserting values[0].len() == group_indices.len()"
);

let mut sizes_pre = 0;
let mut sizes_post = 0;
// let mut sizes_pre = 0;
// let mut sizes_post = 0;
for offsets in offsets_param.windows(2) {
let group_idx = group_indices[offsets[0]];
let accumulator: &mut AccumulatorType = &mut self.accumulators[group_idx];
Expand Down Expand Up @@ -271,8 +272,8 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
// RecordBatch(es)
let iter = groups_with_rows.iter().zip(offsets.windows(2));

let mut sizes_pre = 0;
let mut sizes_post = 0;
// let mut sizes_pre = 0;
// let mut sizes_post = 0;
for (&group_idx, offsets) in iter {
// sizes_pre += state.size(); // TODO: Add Accumulator::size?

Expand Down Expand Up @@ -373,7 +374,7 @@ impl<AccumulatorType: Accumulator> GroupsAccumulator

let results: Vec<ScalarValue> = accumulators
.into_iter()
.map(|mut accumulator| {
.map(|accumulator| {
// self.free_allocation(state.size()); // TODO: Add Accumulator::size?
accumulator.evaluate()
})
Expand All @@ -400,7 +401,7 @@ impl<AccumulatorType: Accumulator> GroupsAccumulator
// which we need to form into columns
let mut results: Vec<Vec<ScalarValue>> = vec![];

for mut accumulator in accumulators {
for accumulator in accumulators {
// self.free_allocation(state.size()); // TODO: Add Accumulator::size?
let accumulator_state = accumulator.state()?;
results.resize_with(accumulator_state.len(), Vec::new);
Expand Down
5 changes: 4 additions & 1 deletion datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,10 @@ pub type KeyVec = SmallVec<[u8; 64]>;
type AccumulatorItem = Box<dyn Accumulator>;
#[allow(missing_docs)]
pub type AccumulatorSet = SmallVec<[AccumulatorItem; 2]>;
pub type SpottyAccumulatorSet = SmallVec<[Option<AccumulatorItem>; 2]>; // TODO: Name?
/// Not really a set. Order matters, as this is a parallel array with some GroupsAccumulator array.
/// There are Nones in place where there is (in some AccumulationState, presumably) a groups
/// accumulator in the parallel array.
pub type SpottyAccumulatorSet = SmallVec<[Option<AccumulatorItem>; 2]>;
#[allow(missing_docs)]
pub type Accumulators = HashMap<KeyVec, AccumulationGroupState, RandomState>;

Expand Down
1 change: 0 additions & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ pub trait AggregateExpr: Send + Sync + Debug {
/// return states with the same description as `state_fields`
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;

// TODO: Make create_accumulator just return a Result<Option<Box<..>>> with None in the case this is true.
/// Returns true if and only if create_groups_accumulator returns Ok(Some(_)) (if not an Err(_)).
fn uses_groups_accumulator(&self) -> bool {
return false;
Expand Down

0 comments on commit 18b3fae

Please sign in to comment.