-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid RowConverter for multi column grouping (10% faster clickbench queries) #12269
Changes from 29 commits
37d1382
f40a164
9c61a8b
f20780e
d3f54ed
6e0b179
10d3d18
7602a18
7a4dbd5
05fb466
8efcf07
bc16d55
fb94485
07ed966
925166d
f59d11e
e9f9abc
720c343
b6cd012
5b8aceb
ed9b78e
fb1b745
2cab4c2
0090008
e6d1890
77efb1a
ef274a4
864bdfa
103c45d
0bd5251
bf5dcc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,315 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use crate::aggregates::group_values::group_value_row::{ | ||
ArrayRowEq, ByteGroupValueBuilder, PrimitiveGroupValueBuilder, | ||
}; | ||
use crate::aggregates::group_values::GroupValues; | ||
use ahash::RandomState; | ||
use arrow::compute::cast; | ||
use arrow::datatypes::{ | ||
Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, | ||
Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, | ||
}; | ||
use arrow::record_batch::RecordBatch; | ||
use arrow_array::{Array, ArrayRef}; | ||
use arrow_schema::{DataType, SchemaRef}; | ||
use datafusion_common::hash_utils::create_hashes; | ||
use datafusion_common::{DataFusionError, Result}; | ||
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; | ||
use datafusion_expr::EmitTo; | ||
use datafusion_physical_expr::binary_map::OutputType; | ||
|
||
use hashbrown::raw::RawTable; | ||
|
||
/// Compare GroupValue Rows column by column | ||
pub struct GroupValuesColumn { | ||
/// The output schema | ||
schema: SchemaRef, | ||
|
||
/// Logically maps group values to a group_index in | ||
/// [`Self::group_values`] and in each accumulator | ||
/// | ||
/// Uses the raw API of hashbrown to avoid actually storing the | ||
/// keys (group values) in the table | ||
/// | ||
/// keys: u64 hashes of the GroupValue | ||
/// values: (hash, group_index) | ||
map: RawTable<(u64, usize)>, | ||
|
||
/// The size of `map` in bytes | ||
map_size: usize, | ||
|
||
/// The actual group by values, stored column-wise. Compare from | ||
/// the left to right, each column is stored as `ArrayRowEq`. | ||
/// This is shown faster than the row format | ||
group_values: Vec<Box<dyn ArrayRowEq>>, | ||
|
||
/// reused buffer to store hashes | ||
hashes_buffer: Vec<u64>, | ||
|
||
/// Random state for creating hashes | ||
random_state: RandomState, | ||
} | ||
|
||
impl GroupValuesColumn { | ||
pub fn try_new(schema: SchemaRef) -> Result<Self> { | ||
let map = RawTable::with_capacity(0); | ||
Ok(Self { | ||
schema, | ||
map, | ||
map_size: 0, | ||
group_values: vec![], | ||
hashes_buffer: Default::default(), | ||
random_state: Default::default(), | ||
}) | ||
} | ||
} | ||
|
||
impl GroupValues for GroupValuesColumn { | ||
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> { | ||
let n_rows = cols[0].len(); | ||
|
||
if self.group_values.is_empty() { | ||
let len = cols.len(); | ||
let mut v = Vec::with_capacity(len); | ||
|
||
for f in self.schema.fields().iter() { | ||
let nullable = f.is_nullable(); | ||
match f.data_type() { | ||
&DataType::Int8 => { | ||
let b = PrimitiveGroupValueBuilder::<Int8Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Int16 => { | ||
let b = PrimitiveGroupValueBuilder::<Int16Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Int32 => { | ||
let b = PrimitiveGroupValueBuilder::<Int32Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Int64 => { | ||
let b = PrimitiveGroupValueBuilder::<Int64Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::UInt8 => { | ||
let b = PrimitiveGroupValueBuilder::<UInt8Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::UInt16 => { | ||
let b = PrimitiveGroupValueBuilder::<UInt16Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::UInt32 => { | ||
let b = PrimitiveGroupValueBuilder::<UInt32Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::UInt64 => { | ||
let b = PrimitiveGroupValueBuilder::<UInt64Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Float32 => { | ||
let b = PrimitiveGroupValueBuilder::<Float32Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Float64 => { | ||
let b = PrimitiveGroupValueBuilder::<Float64Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Date32 => { | ||
let b = PrimitiveGroupValueBuilder::<Date32Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Date64 => { | ||
let b = PrimitiveGroupValueBuilder::<Date64Type>::new(nullable); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Utf8 => { | ||
let b = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::LargeUtf8 => { | ||
let b = ByteGroupValueBuilder::<i64>::new(OutputType::Utf8); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::Binary => { | ||
let b = ByteGroupValueBuilder::<i32>::new(OutputType::Binary); | ||
v.push(Box::new(b) as _) | ||
} | ||
&DataType::LargeBinary => { | ||
let b = ByteGroupValueBuilder::<i64>::new(OutputType::Binary); | ||
v.push(Box::new(b) as _) | ||
} | ||
dt => todo!("{dt} not impl"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if this was an internal error rather than a panic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in #12620 |
||
} | ||
} | ||
self.group_values = v; | ||
} | ||
|
||
// tracks to which group each of the input rows belongs | ||
groups.clear(); | ||
|
||
// 1.1 Calculate the group keys for the group values | ||
let batch_hashes = &mut self.hashes_buffer; | ||
batch_hashes.clear(); | ||
batch_hashes.resize(n_rows, 0); | ||
create_hashes(cols, &self.random_state, batch_hashes)?; | ||
|
||
for (row, &target_hash) in batch_hashes.iter().enumerate() { | ||
let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| { | ||
// Somewhat surprisingly, this closure can be called even if the | ||
// hash doesn't match, so check the hash first with an integer | ||
// comparison first avoid the more expensive comparison with | ||
// group value. https://github.com/apache/datafusion/pull/11718 | ||
if target_hash != *exist_hash { | ||
return false; | ||
} | ||
|
||
fn check_row_equal( | ||
array_row: &dyn ArrayRowEq, | ||
lhs_row: usize, | ||
array: &ArrayRef, | ||
rhs_row: usize, | ||
) -> bool { | ||
array_row.equal_to(lhs_row, array, rhs_row) | ||
} | ||
|
||
for (i, group_val) in self.group_values.iter().enumerate() { | ||
if !check_row_equal(group_val.as_ref(), *group_idx, &cols[i], row) { | ||
return false; | ||
} | ||
} | ||
|
||
true | ||
}); | ||
|
||
let group_idx = match entry { | ||
// Existing group_index for this group value | ||
Some((_hash, group_idx)) => *group_idx, | ||
// 1.2 Need to create new entry for the group | ||
None => { | ||
// Add new entry to aggr_state and save newly created index | ||
// let group_idx = group_values.num_rows(); | ||
// group_values.push(group_rows.row(row)); | ||
|
||
let mut checklen = 0; | ||
let group_idx = self.group_values[0].len(); | ||
for (i, group_value) in self.group_values.iter_mut().enumerate() { | ||
group_value.append_val(&cols[i], row); | ||
let len = group_value.len(); | ||
if i == 0 { | ||
checklen = len; | ||
} else { | ||
debug_assert_eq!(checklen, len); | ||
} | ||
} | ||
|
||
// for hasher function, use precomputed hash value | ||
self.map.insert_accounted( | ||
(target_hash, group_idx), | ||
|(hash, _group_index)| *hash, | ||
&mut self.map_size, | ||
); | ||
group_idx | ||
} | ||
}; | ||
groups.push(group_idx); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
fn size(&self) -> usize { | ||
let group_values_size = self.group_values.len(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that size should return allocated size in bytes, so this does not look correct for
and add a size method to |
||
group_values_size + self.map_size + self.hashes_buffer.allocated_size() | ||
} | ||
|
||
fn is_empty(&self) -> bool { | ||
self.len() == 0 | ||
} | ||
|
||
fn len(&self) -> usize { | ||
if self.group_values.is_empty() { | ||
return 0; | ||
} | ||
|
||
self.group_values[0].len() | ||
} | ||
|
||
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> { | ||
let mut output = match emit_to { | ||
EmitTo::All => { | ||
let group_values = std::mem::take(&mut self.group_values); | ||
debug_assert!(self.group_values.is_empty()); | ||
|
||
group_values | ||
.into_iter() | ||
.map(|v| v.build()) | ||
.collect::<Vec<_>>() | ||
} | ||
EmitTo::First(n) => { | ||
let output = self | ||
.group_values | ||
.iter_mut() | ||
.map(|v| v.take_n(n)) | ||
.collect::<Vec<_>>(); | ||
|
||
// SAFETY: self.map outlives iterator and is not modified concurrently | ||
unsafe { | ||
for bucket in self.map.iter() { | ||
// Decrement group index by n | ||
match bucket.as_ref().1.checked_sub(n) { | ||
// Group index was >= n, shift value down | ||
Some(sub) => bucket.as_mut().1 = sub, | ||
// Group index was < n, so remove from table | ||
None => self.map.erase(bucket), | ||
} | ||
} | ||
} | ||
|
||
output | ||
} | ||
}; | ||
|
||
// TODO: Materialize dictionaries in group keys (#7647) | ||
for (field, array) in self.schema.fields.iter().zip(&mut output) { | ||
let expected = field.data_type(); | ||
if let DataType::Dictionary(_, v) = expected { | ||
let actual = array.data_type(); | ||
if v.as_ref() != actual { | ||
return Err(DataFusionError::Internal(format!( | ||
"Converted group rows expected dictionary of {v} got {actual}" | ||
))); | ||
} | ||
*array = cast(array.as_ref(), expected)?; | ||
} | ||
} | ||
|
||
Ok(output) | ||
} | ||
|
||
fn clear_shrink(&mut self, batch: &RecordBatch) { | ||
let count = batch.num_rows(); | ||
self.group_values.clear(); | ||
self.map.clear(); | ||
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared | ||
self.map_size = self.map.capacity() * std::mem::size_of::<(u64, usize)>(); | ||
self.hashes_buffer.clear(); | ||
self.hashes_buffer.shrink_to(count); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should give
len
a better name liken_cols
or just "inline" it likebecause for me it currently just hurts readability.