-
Notifications
You must be signed in to change notification settings - Fork 784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comparable Row Format #2593
Comparable Row Format #2593
Conversation
Possibly related: http://wwwlgis.informatik.uni-kl.de/archiv/wwwdvs.informatik.uni-kl.de/courses/DBSREAL/SS2005/Vorlesungsunterlagen/Implementing_Sorting.pdf In the discussion of "Normalized Keys" section |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking very cool
arrow/src/row/mod.rs
Outdated
/// | ||
/// A null is encoded as a big endian encoded `0_u32` | ||
/// | ||
/// A valid value is encoded as a big endian length, with the most significant bit set, followed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise there are probably some corner cases here related to NULLS FIRST / NULLS LAST
arrow/src/row/mod.rs
Outdated
|
||
impl RowBatch { | ||
/// Create a [`RowBatch`] from the provided [`RecordBatch`] | ||
pub fn new(batch: &RecordBatch) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend making this API look like
https://docs.rs/arrow/21.0.0/arrow/compute/kernels/sort/fn.lexsort_to_indices.html
Namely, rather than take a RecordBatch
as input, it takes columns: &[[SortColumn]
to be consistent with the sorting kernels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was being lazy, supporting SortOptions is on my radar 👍
arrow/src/row/mod.rs
Outdated
let buffer = vec![0_u8; cur_offset]; | ||
|
||
let mut out = Self { | ||
buffer: buffer.into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just curious why make this a Box<[]>
rather than a Vec
, given they are Vec
to start with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes it a bit clearer that it isn't being resized, maybe??
arrow/src/row/mod.rs
Outdated
assert_eq!(*out.offsets.last().unwrap(), out.buffer.len()); | ||
out.offsets | ||
.windows(2) | ||
.for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic")); | |
.for_each(|w| assert!(w[0] < w[1], "offsets should be monotonic")); |
They should be strictly increasing, right?
I am not sure if it is possible, but figuring how to share the same RowFormat machinery in DataFusion -- https://github.com/apache/arrow-datafusion/blob/master/datafusion/row/src/lib.rs would be awesome. Maybe the DataFusion one could wrap this one, or maybe this one could offer some generic framework that we could refactor DataFusion to use. I feel like the APIs are quite similar and with a bit more effort we could avoid having two copies of almost the same code 🤔 |
My plan is to get this working well for SortPreservingMerge and then see about whether we can unify DataFusion onto a singular row format. I see no obvious reason this wouldn't be possible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very cool.
My feelings for the comparable row format go to the third variant RawComparable
of RowLayout
enum https://github.com/apache/arrow-datafusion/blob/master/datafusion/row/src/layout.rs#L74.
Each variant serves its circumstances with a different usage pattern: RawComparable
requires extra bytes flip or float encode/decode, WordAligned
needs fast in-place update, and Compact
needs to be memory-efficient to buffer a large number of records in-memory for pipeline breakers.
DuckDB sort https://duckdb.org/2021/08/27/external-sorting.html with its implementation is a nice read. https://github.com/duckdb/duckdb/blob/master/src/common/sort/radix_sort.cpp#L274
arrow/src/row/mod.rs
Outdated
/// A null is encoded as a big endian encoded `0_u32` | ||
/// | ||
/// A valid value is encoded as a big endian length, with the most significant bit set, followed | ||
/// by the byte values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current encoding seems flawed on var-length attributes comparison since it compares length bytes before content. Therefore a longer string is always bigger than a shorter string. So the current pass test for test_variable_width
.
assert!(&"foo" < &"he");
assert!(rows.row(3) > rows.row(1));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Darn, guess I need to do null termination instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 It sounded so good when we've discussed this. This however also means that you need to escape NULLs within the string, otherwise your termination marker can be confused with actual string content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup - you will need some framing protocol, of which COBS is one possibility
1cca91e
to
a0acad3
Compare
I think this is now ready for review, I intend to experiment with integration in DataFusion's SortPreservingMerge in the coming days and report back |
I plan to review this later today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort
in arrow-rs kernel may leverage the row format to improve performance?
Yes, switching lexsort to performing a radix sort on this format will likely yield significant returns. |
There is currently a bug concerning nulls in primitive dictionaries, working on a fix |
For what it is worth, I think there will likely be cases where the cost of creating the row format (copying bytes around) -- both CPU and Memory -- will outweigh the sorting performance improvement, so we will have to think about when to use one or the other carefully. For example, I suspect sorting a single UInt64 array will likely work faster without copying the data into a row format (that is the same) |
Agreed, we already have a special case for lexsort which falls back on the single column sort kernel. I do think that the performance of the row format will almost always outweigh the costs of the |
Urgh... Fixing the null handling for dictionaries, is now causing other false-positives because LexicographicalComparator is wrong 😭 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code looks very cool. I have some questions about how the variable length and dictionary encoding work but I think this PR is quite close to mergable.
😓 I also haven't completed reviewing the tests in row/mod.rs
as I found the comparison code hard to read, as written. I left a comment about how that might be easier to verify
Another random thought is how we might make a row format feature flag (or maybe this would be a good initial set of code to break out into its own crate (e.g. arrow-row
or something)?
arrow/src/row/mod.rs
Outdated
/// ## Reconstruction | ||
/// | ||
/// Given a schema it would theoretically be possible to reconstruct the columnar data from | ||
/// the row format, however, this is currently not supported. It is recommended that the row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// the row format, however, this is currently not supported. It is recommended that the row | |
/// the row format, however, this is currently not implemented. It is recommended that the row |
arrow/src/row/mod.rs
Outdated
} | ||
} | ||
|
||
/// Convert `cols` into [`Rows`] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Convert `cols` into [`Rows`] | |
/// Convert [`ArrayRef`]s into [`Rows`] |
|
||
/// A byte array interner that generates normalized keys that are sorted with respect | ||
/// to the interned values, e.g. `inter(a) < intern(b) => a < b` | ||
#[derive(Debug, Default)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would help me review this code if the algorithm could be described. Specifically the relation ship between keys
values
and Bucket
I especially find it confusing that keys and values are both interned
) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend also testing interning the values in a single call as part of this test.
arrow/src/row/interner.rs
Outdated
Ok(_) => unreachable!("value already exists"), | ||
Err(idx) => { | ||
let slot = &mut self.slots[idx]; | ||
// Must always spill final slot so can always create a value less than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment
] | ||
); | ||
|
||
assert!(rows.row(3) < rows.row(6)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test would be easier for me to verify if it sorted the rows based on values and then validated the new data order
As in something like
let sorted indexes = rows.lex_sort_indicies();
assert_eq!(sorted_indexes, vec![0,3,4,1,2]);
assert_eq
.collect() | ||
} | ||
|
||
fn generate_dictionary<K>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these data generator functions into test_util
perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "issue" is that the test_util variants are fixed seeds. I would rather keep it simple for now and potentially revisit if we can unify these as a follow up PR
Great to see this happening! I suggest we move the majority of the code in this PR to the DataFusion repo and only keep the API changes on the arrow sort compute kernel (the visibility changes) in arrow-rs. My suggestion mainly comes from two folds: we could ease the development by iterating on a single repo in DataFusion instead of counting on a separate arrow-rs release, and we could minimize confusion by having two row modules in two repos. After checking the usage of this comparable row format in apache/datafusion#3386, I think it's still valid for us to have three variants of the row format to serve different purposes. One for storing efficiency, one for updating efficiency, and one for sort efficiency. For example, if we use this comparable format for aggregation buffer, we would need to repeatedly flip bytes back and force for each cell update. |
I covered this in the issue, but I wish for this code to live here so that it can be used within the arrow kernels, in particular lexsort, and benefit use-cases outside DataFusion.
I see no issue with DataFusion retaining its own row formats for these purposes, I'm personally unsure that these bit flips will add up to a significant overhead, but that's a somewhat ancillary discussion to be had down the line. I want to provide a better story for multi-column operations within arrow-rs, what aspects of that DataFusion chooses to use is tbd 😅 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments explaining how Bucket works really help me. I think there are some outstanding small items from my perspective, but I think they could be done as follow on PRs (or not at all).
Very nice work @tustvold
let to_write = &mut out.buffer[*offset..end_offset]; | ||
|
||
// Set validity | ||
to_write[0] = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still 2 -- I read the comments as saying the validity byte is a 1
. Sorry for my ignorance but I don't understand this particular constant -- can you elaborate?
/// the process for that bucket. | ||
/// | ||
/// The final key will consists of the slot indexes visited incremented by 1, | ||
/// with the final value incremented by 2, followed by a null terminator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, the null terminator idea is the key thing I was missing in my initial reading. This is a very clever idea 👍
fn test_interner() { | ||
test_intern_values(&[8, 6, 5, 7]); | ||
|
||
let mut values: Vec<_> = (0_u64..2000).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is any value (as a separate integration test perhaps) for inserting enough values to result in at least two bucket levels (so a bucket has an entry that is a child that has an entry that has a child). Or maybe 2000 is enough to do this already
Integration failure is unrelated, so going to get this one in. Thank you all for the reviews, if I've missed something please shout and I'll address it in a follow on PR 😄 |
Benchmark runs are scheduled for baseline = 41e0187 and contender = a1d24e4. a1d24e4 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Part of #2677
Rationale for this change
I'm trying to improve the performance of the SortPreservingMerge operator in DataFusion, and one of the major bottlenecks is performing multi-column comparisons across
RecordBatch
. I started work on a JITed comparator, but not only was this rather complicated and hard to validate, but the number of unpredictable branches in the generated code made me unsure that the performance would justify the complexity.Taking a step-back with some bit-twiddling it is possible to construct a row format that represents the values in a row in manner that can be compared with
memcmp
. This will likely yield the fastest comparator possible, at the cost of the time and memory overheads of assembling the row format.I need to run some benchmarks, but I'm fairly happy that the conversion code as implemented in this PR manages to avoid bump-allocating and large amounts of dynamic dispatch, and so I'm optimistic the performance will be reasonable
It is possible this might improve the performance of multi-column sorts in addition.
What changes are included in this PR?
Are there any user-facing changes?
FYI @yjshen @alamb @crepererum