-
Notifications
You must be signed in to change notification settings - Fork 413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement update operation #1390
Conversation
This implementation follows the same structure as the delete command. Something different from previous operations is that a new ExecutionPlan implementation is created to expose a count of how many records were updated. I wanted to avoid creating this count in a scan/loop and was able to take advantage of null counts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is impressive. I appreciate the variety of test cases :)
Had various questions and corrections throughout. I'll be excited to release this.
rust/src/delta_datafusion.rs
Outdated
let partitions = limit.output_partitioning().partition_count(); | ||
let mut tasks = Vec::with_capacity(partitions); | ||
|
||
for i in 0..partitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we limit the maximum concurrency? If I have 10,000 partitions, will this try to process all of them at the same time?
One way to limit the concurrency might be something like:
let partition_tasks = futures::iter(0..partitions)
.map(|part_i| futures::future::ready(Ok(limit.execute(i, task_ctx.clone())) ))
.try_flatten_unordered(max_concurrent_tasks);
rust/src/delta_datafusion.rs
Outdated
let mut tasks = Vec::with_capacity(partitions); | ||
|
||
for i in 0..partitions { | ||
let stream = limit.execute(i, task_ctx.clone())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this plan execute? in the current thread? If so, we might want to wrap these in spawn_blocking()
instead, so they can be sent to execute across multiple threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this area to push parallelism concerns onto Datafusion. I instead call collect and then process each batch for the path that was discovered. It cleans it up quite a bit
rust/src/operations/update.rs
Outdated
// Do not make a commit when there are zero updates to the state | ||
if !actions.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we have eliminating this possibility when finding files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good point. I made a change to return early when find files determines zero candidates. Added a test to ensure the metrics are correct too.
let array = batch.column_by_name("__delta_rs_update_predicate").unwrap(); | ||
let copied_rows = array.null_count(); | ||
let num_updated = array.len() - copied_rows; | ||
let c1 = MetricBuilder::new(&self.metrics).global_counter("num_updated_rows"); | ||
c1.add(num_updated); | ||
|
||
let c2 = MetricBuilder::new(&self.metrics).global_counter("num_copied_rows"); | ||
c2.add(copied_rows); | ||
Some(Ok(batch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed it is!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks it took me a bit to arrive to this solution but I'm glad ended up this simple.
// Take advantage of how null counts are tracked in arrow arrays use the | ||
// null count to track how many records do NOT statisfy the predicate. The | ||
// count is then exposed through the metrics through the `UpdateCountExec` | ||
// execution plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever!
rust/src/operations/update.rs
Outdated
#[tokio::test] | ||
async fn test_str_expressions() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This a TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah my bad. I wanted to add a test to demonstrate that a str can be used for the predicate and update expression but it seemed like a test with little value.
I've updated the null tests to that.
FYI using strings for expressions will require additional work in future PRs.
If you have a predicate like value < 2 or value > 2
DataFusion will return an error about being unable to compare int32
to int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have a predicate like value < 2 or value > 2 DataFusion will return an error about being unable to compare int32 to int64
I stumbled across this one as well, not sure if this is maybe even a datafusion bug, since the expression parser seems to ignore the information in the schema passed to it.
Co-authored-by: Will Jones <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Co-authored-by: Will Jones <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impressive work!
Left some minor comments, and there are some unwraps
floating around that we maybe can have a look at if we can avoid them...
rust/src/delta_datafusion.rs
Outdated
} | ||
let array = batch | ||
.column_by_name(PATH_COLUMN) | ||
.unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we get rid of this unwrap, or add a comment why its safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think now we can just use ?
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😞 Sorry about that.
I factored that entire section into function since two places had the logic should be cleaner now.
let array = batch.column_by_name("__delta_rs_update_predicate").unwrap(); | ||
let copied_rows = array.null_count(); | ||
let num_updated = array.len() - copied_rows; | ||
let c1 = MetricBuilder::new(&self.metrics).global_counter("num_updated_rows"); | ||
c1.add(num_updated); | ||
|
||
let c2 = MetricBuilder::new(&self.metrics).global_counter("num_copied_rows"); | ||
c2.add(copied_rows); | ||
Some(Ok(batch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed it is!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impressive work!
Left some minor comments, and there are some unwraps
floating around that we maybe can have a look at if we can avoid them...
Co-authored-by: Robert Pack <[email protected]>
Co-authored-by: Robert Pack <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! - I'll leave it open for @wjones127 to look at, since he did the bulk of the review.
rust/src/delta_datafusion.rs
Outdated
} | ||
let array = batch | ||
.column_by_name(PATH_COLUMN) | ||
.unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think now we can just use ?
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there! I have a few performance-related suggestions, and one more test case I think we want. After that, I think this is good to go :)
rust/src/delta_datafusion.rs
Outdated
// Given RecordBatches that contains `__delta_rs_path` perform a hash join | ||
// with actions to obtain original add actions | ||
|
||
let mut files = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should know the size ahead of time:
let mut files = Vec::new(); | |
let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum()); |
match actions.remove(path) { | ||
Some(action) => files.push(action), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why mutate the hashmap? isn't the path already guaranteed to be unique?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the current implementation does guarantee that paths are unique but I want to make be defensive against any unexpected changes from Datafusion or future refactoring. My assumption is that removal from the hashmap is O(1) and that rust does not realloc the underlying array when active items go below a certain threshold.
rust/src/operations/update.rs
Outdated
/// Time taken to execute the entire operation. | ||
pub execution_time_ms: u128, | ||
/// Time taken to scan the files for matches. | ||
pub scan_time_ms: u128, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As before, micro (us) or milliseconds (ms)?
Also, u128 seems a little excessive. Even with microseconds, I think u64 gets you at least 10,000 years.
/// Time taken to execute the entire operation. | |
pub execution_time_ms: u128, | |
/// Time taken to scan the files for matches. | |
pub scan_time_ms: u128, | |
/// Time taken to execute the entire operation. | |
pub execution_time_ms: u64, | |
/// Time taken to scan the files for matches. | |
pub scan_time_ms: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the expected unit is actually ms since I'm just pulling the metrics here
I've changed the call sites to use as_millis()
which returns a u128. Should I really explicitly downcast that to a u64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that will always be safe on the timescales we care about.
rust/src/operations/update.rs
Outdated
})) | ||
} | ||
|
||
metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean milliseconds or microseconds? IMO milliseconds is plenty, but if you do microseconds, we should use the us
abbreviation:
metrics.execution_time_ms = Instant::now().duration_since(exec_start).as_micros(); | |
metrics.execution_time_us = Instant::now().duration_since(exec_start).as_micros(); |
#[tokio::test] | ||
async fn test_update_partitions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since a partition-only predicate is a separate code path from one with a mix of partition and normal columns, I think we are missing some coverage on partition column handling in the case where we need to scan in find files.
Could you add either a separate test or just another part in this test where you update a partitioned table with a predicate that is on the partition column and a normal column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Co-authored-by: Will Jones <[email protected]>
Description
Users can now update data that matches a predicate.
This operation should be encouraged over the replace write operation since update determines which values require rewriting based on the supplied predicate.
Related Issue(s)