-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support vectorized append and compare for multi group by #12996
Conversation
@@ -128,6 +132,15 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn | |||
} | |||
} | |||
|
|||
fn append_non_nullable_val(&mut self, array: &ArrayRef, row: usize) { | |||
if NULLABLE { | |||
self.nulls.append(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be optimized to append nulls for entire batch instead of per value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I plan to refactor the interface for supporting input a rows: &[usize]
.
And make all parts' appending vectorized, and see the performance again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(i.e. remove it here and call it in such a way we use https://docs.rs/arrow/latest/arrow/array/struct.BooleanBufferBuilder.html#method.append_n
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the append_batch
function to support vectorized append more better.
But the improvement seems still not obvious. #12996 (comment)
🤔 I guess, it is likely due the new introduced branch of equal_to
:
if *group_idx < group_values_len {
for (i, group_val) in self.group_values.iter().enumerate() {
if !check_row_equal(group_val.as_ref(), *group_idx, &cols[i], row)
{
return false;
}
}
} else {
let row_idx_offset = group_idx - group_values_len;
let row_idx = self.append_rows_buffer[row_idx_offset];
return is_rows_eq(cols, row, cols, row_idx).unwrap();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To eliminate this extra branch, I think we need to refactor the intern
process metioned in #12821 (comment)
I am trying it.
The latest benchmark numbers:
|
core(array, row); | ||
struct AggregationHashTable<T: AggregationHashTableEntry> { | ||
/// Raw table storing values in a `Vec` | ||
raw_table: Vec<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on some experiments in changing hash join algorithm, I think it's likely hashbrown
performs much better than implementing a hashtable ourselves although I would like to be surprised 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on some experiments in changing hash join algorithm, I think it's likely
hashbrown
performs much better than implementing a hashtable ourselves although I would like to be surprised 🙂
🤔 Even if we can perform something like vectorized compare
or vectorized append
in our hashtable?
I found in multi group by
case, we will perform the compare
for each row leading to the array downcasting again and again... And actually the downcast
operation will be compiled to many asm codes....
And I foudn we can't eliminate it and perform the vectorized compare
with hashbrown
...
fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
let array = array.as_byte_view::<B>();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can still do "vectorized compare" by doing the lookup in the hashtable (based on hash value only) and the vectorized equality check separately. That way you still can use the fast hashtable, but move the equality check to a separate/vectorized step.
That's at least what is done in the vectorized hash join implementation :). I changed it before to use a Vec
-based index like you did here, but that performed significantly worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I think is that the lookup is incredibly well optimized using the swiss table design and you get fewer 'false" candidates to check for, while we can still use the vectorized/type specialized equality check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, thank you!
601c7b2
to
d4b5820
Compare
3415659
to
d79b813
Compare
The logic is a bit complex, I plan to finish and do benchmark for it today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started going through this code -- I am finding it a really nice read. Nice work @Rachelint @jayzhan211 and @Dandandan
My only real high level concern here is that we have to retain the GroupValuesColumn
as well -- not only does this now have more code to maintain, but the number of paths to test / verify is getting larger too
Is it possible to somehow unify GroupValuesColumn
and VectorizedGroupValuesColumn
?
I plan to keep reviewing this over the weekend.
/// is used to store the rows will be processed in next round. | ||
scalarized_indices: Vec<usize>, | ||
|
||
/// The `vectorized_equal_tod` row indices buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can rename these to "buffer" or something to make it clear they are temp processing space to avoid re-allocations rather than
Something like
buffer_equal_to_row_indices: Vec<usize>,
Or maybe we can even put all the scratch space into their own struct to make it clear
struct ScratchSpace {
vectorized_equal_to_row_indices: Vec<usize>,
/// The `vectorized_equal_tod` group indices buffer
vectorized_equal_to_group_indices: Vec<usize>,
/// The `vectorized_equal_tod` result buffer
vectorized_equal_to_results: Vec<bool>,
/// The `vectorized append` row indices buffer
vectorized_append_row_indices: Vec<usize>,
}
Or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea for readability, I defined VectorizedOperationBuffers
to hold such buffers.
groups.clear(); | ||
groups.resize(n_rows, usize::MAX); | ||
|
||
let mut batch_hashes = mem::take(&mut self.hashes_buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -143,8 +148,12 @@ pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> { | |||
} | |||
} | |||
|
|||
if GroupValuesColumn::supported_schema(schema.as_ref()) { | |||
Ok(Box::new(GroupValuesColumn::try_new(schema)?)) | |||
if column::supported_schema(schema.as_ref()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain here why GroupOrdering::None is required? Is it because the VectorizedGroupValuesColumn
doesn't keep the groups in order?
If that is the case, it seems like maybe emit_n
would never be called 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because the VectorizedGroupValuesColumn doesn't keep the groups in order?
Yes, because we now process all the rows at once (not one by one like before), some rows are appended beforehand so they are not kept in order.
If that is the case, it seems like maybe emit_n would never be called
emit_early_if_necessary
may be called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain here why GroupOrdering::None is required? Is it because the
VectorizedGroupValuesColumn
doesn't keep the groups in order?If that is the case, it seems like maybe
emit_n
would never be called 🤔
The situation is just as @jayzhan211 mentioned, and the detail about why GroupOrdering::None
needed can also see:
https://github.com/Rachelint/arrow-datafusion/blob/406acb4983efe0c2072c5d7759674eec9db9404a/datafusion/physical-plan/src/aggregates/group_values/column.rs#L792-L834
🤔I think It can unify simply, The alternative is that we support a dedicated 🤔 I personally prefer the second one? What do you think about it @alamb ? |
/// And we use [`GroupIndexView`] to represent such `group indices` in table. | ||
/// | ||
/// | ||
map: RawTable<(u64, GroupIndexView)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the case
- If group1 and group2 have exactly the same hash value,
GroupIndexView
will use chaining to resolve the collision - If group1 and group2 have different hash values but map to the same slot in hash table,
hashbrown
will handle the collision for you with probing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally right.
…d operations to make code clearer.
I think this makes sense -- thank you |
BTW I think this code is fairly well covered by the aggregate fuzz tester (also added by @Rachelint :)) Also, @LeslieKid is adding additional data type coverage which is great: #13226 cargo test --test fuzz -- aggregate |
695d29c
to
2d982a1
Compare
Have unified the |
e0635fe
to
c6f8074
Compare
c6f8074
to
14fffb8
Compare
This is top of my list to review tomorrow morning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I am giving this a final review now |
Performance results:
🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 @Rachelint @jayzhan211 @2010YOUY01 and @Dandandan. What great teamwork
This PR is really nice in my opinion. It makes a super tricky and performance sensitive part of the code about as clear as I could imagine it to be.
I also ran some code coverage on this
nice cargo llvm-cov --html test --test fuzz -- aggregate
nice cargo llvm-cov --html test -p datafusion-physical-plan -- group_values
And verified that the new code was well covered
@@ -75,55 +148,653 @@ pub struct GroupValuesColumn { | |||
random_state: RandomState, | |||
} | |||
|
|||
impl GroupValuesColumn { | |||
/// Buffers to store intermediate results in `vectorized_append` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// ======================================================================== | ||
// Initialization functions | ||
// ======================================================================== | ||
|
||
/// Create a new instance of GroupValuesColumn if supported for the specified schema | ||
pub fn try_new(schema: SchemaRef) -> Result<Self> { | ||
let map = RawTable::with_capacity(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This with_capacity
can probably be improved (as a follow on PR) to avoid some smaller allocations
/// `Group indices` order are against with their input order, and this will lead to error | ||
/// in `streaming aggregation`. | ||
/// | ||
fn scalarized_intern( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is basically the same as GroupValuesColumn::intern
was previously, which makes sense to me
@@ -56,14 +59,40 @@ pub trait GroupColumn: Send + Sync { | |||
/// | |||
/// Note that this comparison returns true if both elements are NULL | |||
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; | |||
|
|||
/// Appends the row at `row` in `array` to this builder | |||
fn append_val(&mut self, array: &ArrayRef, row: usize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe as a follow on we can consider removing append_val
and equal_to
and simpl change all codepaths to use the vectorized version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit worried about if we merge them, some extra if else
will be introduced.
It hurt much for performance for the row level operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good thing to benchmark (as a follow on PR) perhaps
/// it will record the `true` result at the corresponding | ||
/// position in `equal_to_results`. | ||
/// | ||
/// And if found nth result in `equal_to_results` is already |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is quite clever to pass in the existing "is equal to results"
|
||
(false, _) => { | ||
for &row in rows { | ||
self.group_values.push(arr.value(row)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆 I think we can even do more, like check if rows.len() == array.len()
, if so we just perform extend
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already could use extend instead of push
? extend
on Vec
is somewhat faster than push as the capacity check / allocation is done once instead of once per value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are several things that could be done to make the append even faster:
extend_from_slice
if rows.len() == array.len()
- use
extend
rather thanpush
for values - Speed up appending nulls (don't append bits one by one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we already could use extend instead of
push
?extend
onVec
is somewhat faster than push as the capacity check / allocation is done once instead of once per value.
Ok, I got it, I think again and found it indeed simple to do it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are several things that could be done to make the append even faster:
1. `extend_from_slice` `if rows.len() == array.len()` 2. use `extend` rather than `push` for values 3. Speed up appending nulls (don't append bits one by one)
I filed an issue to tracking the potential improvements for vecotrized operations.
#13275
@@ -287,6 +469,63 @@ where | |||
}; | |||
} | |||
|
|||
fn vectorized_equal_to( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What i have been dreaming about with @XiangpengHao is maybe something like adding take
/ filter
to arrow array builders
I took this opportunity to write up the idea (finally) for your amusement:
As my admittedly sparse help for this PR I have filed some additional tickets for follow on work after this PR is merged: |
I don't think we need to wait on this PR anymore, let's merge it in and keep moving forward. Thank you everyone again! |
Which issue does this PR close?
Closes #.
Related to
Rationale for this change
Although
GroupValuesColumn
is stored themulti gourp by values
incolumn oriented
way.However, it still use
row oriented
approach to performappend
andequal to
.The most obvious overhead is that we need to downcast the
array
when processing each row, and instructions for downcast is actually not few, and even worse it will introduce branches.And as I guess, the
row oriented
approach will also increase the random memory accesses but I am not sure.What changes are included in this PR?
This pr introduce the
vectorized append
andvectorized equal to
forGroupValuesColumn
.But such vectorized appoach is not compatible with
streaming aggregation
depending on the order betweeninput rows
and their correspondinggourp indices
.So I define a new
VectorizedGroupValuesColumn
for optimizingnon streaming aggregation
cases, and keep the originalGroupValuesColumn
for thestreaming aggregation
cases.Are these changes tested?
Yes, I think enough new unit tests are added.
Are there any user-facing changes?
No.