-
Notifications
You must be signed in to change notification settings - Fork 406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add z-order optimize #1429
Conversation
took the liberty of resolving the merge conflicts I caused :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice ❤️ - folks are going to be quite excited about this ...
Left a few minor comments, mainly to also better understand the inner workings :).
rust/src/operations/optimize.rs
Outdated
read_stream: impl Future< | ||
Output = Result< | ||
BoxStream<'static, Result<RecordBatch, ParquetError>>, | ||
DeltaTableError, | ||
>, | ||
> + Send | ||
+ 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this be defined as a separate type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LMK what you think of the type I added.
rust/src/operations/optimize.rs
Outdated
@@ -414,6 +439,56 @@ impl MergePlan { | |||
Ok((partial_actions, partial_metrics)) | |||
} | |||
|
|||
/// Creates a stream of batches that are zordered | |||
/// | |||
/// Currently requires loading all the data into memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds a bit like loading all data from the table - for a partitioned table, this would be loading a single partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah if it's non-partitioned, it's everything. If it's partitioned then only max_concurrent_tasks
number of partitions are loaded at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment for this.
rust/src/operations/optimize.rs
Outdated
.as_slice(), | ||
) | ||
.unwrap(); | ||
let indices = arrow::compute::lexsort_to_indices( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be completely off here, but we may just get away with sorting an enumerate()
'ed vector of values here. IIUC, we should not have null values at this point since this is already the encoded array and they byte values can be compared using the native methods. If that is true, we could avoid the non-trivial internals of the lexsort_to_indices
function.
I guess the optimizations on the CPU level mentioned in the arrow row blog posts should take effekt either way since we are comparing bytes nonetheless. So leave it to your expertise if this is worthwhile :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I think you are right! I'll give that a shot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely simplified :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great Work!
Description
Implements Z-order in Rust. This is a very basic version that requires loading the whole partition into memory for sorting. In the future, we can implement a DataFusion-based code path that allows sorting with spilling to disk for even larger optimize jobs.
The Z-order function here is based on Arrow's row format. We truncate to take the first 16 bytes of data (padding with zeros at end as necessary). So for variable-width columns like strings, this means that we are only using the first 15 bytes of the string (the first byte is used to differentiate null and empty strings, see row format docs). If a user has a string column where they all share the same prefix, this z-order function won't work well for them. But in many common cases it will work.
We'll also expose this in Python as a follow up.
Related Issue(s)
Documentation