Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove GroupByScalar and use ScalarValue in preparation for supporting null values in GroupBy #786

Merged
merged 1 commit into from
Jul 30, 2021

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jul 27, 2021

Which issue does this PR close?

Fixes #376

Rationale for this change

This proposal is both both a code cleanup and a step towards computing the correct answers when group by has nulls (#781 and #782).

Since GroupByScalar does not have any notion of a NULL or None now, but ScalarValue does, rather than trying to teach GroupByScalar about Nulls it seemed like a good opportunity to remove it entirely.

In parallel I am working on a proposal for the rest of #781

What changes are included in this PR?

  1. Remove GroupByScalar and related conversion code
  2. Add Eq and Hash implementations for ScalarValue, based on OrderedFloat (which was used by GroupByScalar)
  3. Add impl From<Option<..>> for ScalarValue to make it easier to use

Are there any user-facing changes?

Not functionally (though more memory will be used when grouping)

Benchmark results

TLDR; I don't think there is any measurable performance difference with this change in the microbenchmarks

I used the following command (with the baseline name changed):

cargo bench --bench aggregate_query_sql -- --save-baseline unify_scalars2

Here are the benchmark results of master and this branch using https://github.com/BurntSushi/critcmp to compare

(arrow_dev) alamb@MacBook-Pro:~/Software/arrow-datafusion$ critcmp master unify_scalars
group                                                master                                 unify_scalars
-----                                                ------                                 -------------
aggregate_query_group_by                             1.00      2.9±0.02ms        ? ?/sec    1.00      2.9±0.05ms        ? ?/sec
aggregate_query_group_by_u64 15 12                   1.06      3.3±0.05ms        ? ?/sec    1.00      3.1±0.18ms        ? ?/sec
aggregate_query_group_by_with_filter                 1.04      2.1±0.06ms        ? ?/sec    1.00  1978.6±17.07µs        ? ?/sec
aggregate_query_group_by_with_filter_u64 15 12       1.00      2.2±0.03ms        ? ?/sec    1.03      2.3±0.03ms        ? ?/sec
aggregate_query_no_group_by 15 12                    1.03  1180.0±36.04µs        ? ?/sec    1.00  1147.6±12.87µs        ? ?/sec
aggregate_query_no_group_by_count_distinct_narrow    1.00      5.3±0.02ms        ? ?/sec    1.01      5.4±0.14ms        ? ?/sec
aggregate_query_no_group_by_count_distinct_wide      1.02      7.7±0.34ms        ? ?/sec    1.00      7.5±0.18ms        ? ?/sec
aggregate_query_no_group_by_min_max_f64              1.05  1183.1±65.49µs        ? ?/sec    1.00  1123.4±35.90µs        ? ?/sec

To see how consistent the measurements were, I gathered numbers twice on master and twice on this branch. My conclusion is that all changes are within the margin of error of my measurement setup

group                                                master                                 master2                                unify_scalars                          unify_scalars2
-----                                                ------                                 -------                                -------------                          --------------
aggregate_query_group_by                             1.00      2.9±0.02ms        ? ?/sec    1.01      3.0±0.04ms        ? ?/sec    1.00      2.9±0.05ms        ? ?/sec    1.06      3.1±0.11ms        ? ?/sec
aggregate_query_group_by_u64 15 12                   1.06      3.3±0.05ms        ? ?/sec    1.11      3.4±0.06ms        ? ?/sec    1.00      3.1±0.18ms        ? ?/sec    1.13      3.5±0.07ms        ? ?/sec
aggregate_query_group_by_with_filter                 1.04      2.1±0.06ms        ? ?/sec    1.15      2.3±0.07ms        ? ?/sec    1.00  1978.6±17.07µs        ? ?/sec    1.19      2.4±0.06ms        ? ?/sec
aggregate_query_group_by_with_filter_u64 15 12       1.00      2.2±0.03ms        ? ?/sec    1.04      2.3±0.05ms        ? ?/sec    1.03      2.3±0.03ms        ? ?/sec    1.03      2.3±0.04ms        ? ?/sec
aggregate_query_no_group_by 15 12                    1.03  1180.0±36.04µs        ? ?/sec    1.00  1151.5±32.25µs        ? ?/sec    1.00  1147.6±12.87µs        ? ?/sec    1.03  1180.9±13.72µs        ? ?/sec
aggregate_query_no_group_by_count_distinct_narrow    1.00      5.3±0.02ms        ? ?/sec    1.01      5.4±0.13ms        ? ?/sec    1.01      5.4±0.14ms        ? ?/sec    1.03      5.5±0.16ms        ? ?/sec
aggregate_query_no_group_by_count_distinct_wide      1.02      7.7±0.34ms        ? ?/sec    1.00      7.5±0.24ms        ? ?/sec    1.00      7.5±0.18ms        ? ?/sec    1.05      7.9±0.25ms        ? ?/sec
aggregate_query_no_group_by_min_max_f64              1.06  1183.1±65.49µs        ? ?/sec    1.00  1116.3±24.45µs        ? ?/sec    1.01  1123.4±35.90µs        ? ?/sec    1.05  1174.9±12.77µs        ? ?/sec

Concerns

This change increases the size of each potential group key by a factor of 4 in HashAggregates to 64, the size_of(ScalarValue) up from from 16 (the size of GroupScalar).

UPDATE: I can shrink the overhead to an extra 32 bytes in this PR: can #788;

This optimization was added by @Dandandan in 1ecdf3f / apache/arrow#8765 and I would be interested in hearing his thoughts on the matter here. In any event we will likely need more than the existing 16 bytes per group key to handle null values, but we probably don't need an extra 56 bytes per key.

Some options I can think of

  1. Take the increased memory hit for now (this PR) and re-optimize later
  2. Take the approach in ARROW-10722: [Rust][DataFusion] Reduce overhead of some data types in aggregations / joins, improve benchmarks arrow#8765 and Box/Arc the non primitive values in ScalarValue (which might have some small overhead)

Perhaps @jorgecarleitao has some additional thoughts

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jul 27, 2021
/// Extract the value in `col[row]` as a GroupByScalar
fn create_group_by_value(col: &ArrayRef, row: usize) -> Result<GroupByScalar> {
match col.data_type() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that none of this code that created GroupByScalar checks col.is_valid(row) and thus is using whatever value happens to be in the array slots when they are NULL

@Dandandan
Copy link
Contributor

In any event we will likely need more than the existing 16 bytes per group key to handle null values, but we probably don't need an extra 56 bytes per key.

I think in the longer run, we can keep the keys in a contiguous (mutable) array instead and keep offsets/pointers to the values in this array (and null values can be stored in a bitmap, so only 1 bit per value). This will only need roughly 8 bytes for the pointer + the key value in Arrow format. This will also enable other optimizations.

The worst case is now something like max(id) from t group by id where the id is unique and has a key like u64.

@alamb
Copy link
Contributor Author

alamb commented Jul 27, 2021

we can keep the keys in a contiguous (mutable) array instead and keep offsets/pointers to the values in this array (and null values can be stored in a bitmap, so only 1 bit per value).

@Dandandan this is a great idea - I will write up in more detail what I think you are proposing to make sure we are on the same page

@alamb alamb marked this pull request as ready for review July 27, 2021 21:24
@alamb
Copy link
Contributor Author

alamb commented Jul 27, 2021

I played around a bit and I can shrink the key size overhead from 16 bytes --> 32 bytes with #788

I also think there is a possibility to shrink it even farther, but I owe a writeup of how that will work

@alamb
Copy link
Contributor Author

alamb commented Jul 27, 2021

FYI @tustvold

@Dandandan
Copy link
Contributor

Dandandan commented Jul 28, 2021

we can keep the keys in a contiguous (mutable) array instead and keep offsets/pointers to the values in this array (and null values can be stored in a bitmap, so only 1 bit per value).

@Dandandan this is a great idea - I will write up in more detail what I think you are proposing to make sure we are on the same page

Some more details to get you started, this is the rough idea that is a similar to how hash join currently works (there it's a bit easier as we can concatenate the data upfront):

  • We can store both group by keys and aggregate values in typed / mutable Arrays. This means the memory overhead is kept to a minimum and is much more vectorization / cache friendly.
    E.g. for a count grouped on u64 values, the state we keep for the group by values is MutableArray<U64Type> and for the count state another MutableArray<U64Type> (no bitmap needed here). The group by values can have null values which are stored in a bitmap.

  • A hashmap that only contains hashes / offsets: Hashmap<u64, SmallVec<[u64; 1]>

    • first u64 is the hash of the group by values -> this can be calculated in a vectorized manner for the entire batch
    • second u64 is the offset in both the group by value(s) and aggregate state arrays -> inserted to the mutable arrays when there is no match
  • hash collisions (group by values mapping to same u64 should/can be handled by comparing values in the arrays at the group by offset and scanning the SmallVec

Above structure will reduce the memory needed for the state (only needs about 32 bytes total per value + data itself and most of it will be stack allocated / allocated in large chunks). Currently total usage per value will be at least a few times higher. It should also reduce the time to create / (re)hash the keys and will reduce the amount of intermediate vec / small allocations that are needed in the hash aggregate code.

There are maybe slightly different ways to encode the data in the hashmap / check collisions, and above structure makes some further optimizations / vectorization possible.

@alamb
Copy link
Contributor Author

alamb commented Jul 28, 2021

Given this PR is a potential memory usage regression and we are considering some different approaches (e.g what @Dandandan has described in #786 (comment)) marking this PR as a draft so it is clear I am not sure it should be merged.

@alamb alamb marked this pull request as draft July 28, 2021 11:43
@Dandandan
Copy link
Contributor

Btw, I think it is totally fine for now to have a small regression in order to support null values and clean up the code base a bit.

@alamb
Copy link
Contributor Author

alamb commented Jul 28, 2021

Btw, I think it is totally fine for now to have a small regression in order to support null values and clean up the code base a bit.

👍 I want to ponder / try out some of the ideas you have explained too before coming to a conclusion

@alamb
Copy link
Contributor Author

alamb commented Jul 28, 2021

@Dandandan here is my proposal: #790 for reworking group by hash, along the lines you proposed (I think). I plan to spend some time prototyping it later today, and any comments / concerns / corrections are most welcome

@alamb alamb marked this pull request as ready for review July 30, 2021 17:40
@alamb
Copy link
Contributor Author

alamb commented Jul 30, 2021

This one seems like an incremental change towards #790 so I am going to merge this in and rebase 790 so the changes are in that PR are smaller.

@alamb alamb merged commit a4941ee into apache:master Jul 30, 2021
@alamb alamb deleted the alamb/unify_scalars branch July 30, 2021 17:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Combine GroupByScalar and ScalarValue
2 participants