-
Notifications
You must be signed in to change notification settings - Fork 406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: delete operation #1176
feat: delete operation #1176
Conversation
I want to continue to push this forward to have a MVP for deletion. My biggest issue with the current implementation is the Requires further changes when #1303 is merged. Looking forward to the great feedback as always. 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great work @Blajda!
Left some comments mostly around some Option treatment. Also had a thought on how we can achieve the parallelisim you mentioned when scanning files, but not sure yet if that will actually work :)
rust/src/operations/delete.rs
Outdated
let mut table = DeltaTable::new_with_state(this.store, this.snapshot); | ||
table.update().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can probably avoid some io by returning the actions and version from execute
and updating the new state directly. Essentially something like.
this.snapshot.merge(DeltaTableState::from_actions(actions, version)?, true, true);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented this and I imagine we would want to use this pattern for other operations. Should we return the version and actions in an optional? Currently I return an empty list + the current version if there are no changes.
rust/src/operations/delete.rs
Outdated
&execution_props, | ||
)?; | ||
let filter = Arc::new(FilterExec::try_new(predicate_expr, parquet_scan)?); | ||
let limit: Arc<dyn ExecutionPlan> = Arc::new(GlobalLimitExec::new(filter, 0, Some(1))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't fully thought this through, but maybe one way for us to exploit parallelism may be to create the ParquetScan
in a way that each file group contains exactly one file and use LocalLimitExec
. Looked a little bit throught the df code, and I think the order would be preserved, so we can infer from the partition number the file that was read.
If that cannot work, and we have to add the file name as a column, that should be feasible by treating it as as "virtual partition coluimn". However this may required some more updates and special caeses how we create the schema / statistics in various places.. Including the file name as partition value in PartitionedFile
is straight forward though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. The primary insight that I missed was adding the file path as a partition column. I'll give this a try and report any issues :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to implement the suggestions. Let me know if you have any suggestions for the new implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a closer look tomorrow, but provided some initial comments. Thanks for working on this!
rust/src/operations/delete.rs
Outdated
// rewrite phases. | ||
match expr { | ||
Expr::ScalarVariable(_, _) | Expr::Literal(_) => (), | ||
Expr::Alias(expr, _) => validate_expr(expr, partition_columns, properties)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can avoid recursion using the TreeNode
trait methods on Expr
?
https://docs.rs/datafusion/23.0.0/datafusion/prelude/enum.Expr.html#impl-TreeNode-for-Expr
Would help avoid stack overflows if there is a very nested expression. I could imagine that happening if someone passes a filter like:
x = 1
OR x = 2
OR x = 3
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Datafusion implementation still uses recursion but overflow issues are now on them. The visitor pattern also makes the code look cleaner too!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Datafusion implementation still uses recursion but overflow issues are now on them
🤣
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. I have a few more suggestions.
|
||
#[tokio::test] | ||
async fn test_delete_on_nonpartiton_column() { | ||
// Delete based on a nonpartition column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are null values handled? For example, if I have a column x: [1, 2, null, 4]
, does DELETE FROM table_name WHERE x > 2
delete just the last row, or also the third row? Seems like it's worth a unit test to verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah great call out.
The previous commit would delete the row. I've added a new test and changed the behavior to not delete the third record in this case.
A record should only be deleted if the predicate evaluates to true other it is kept. null > 2 evaluates to UNKNOWN.
I've checked the spark implementation and the behavior aligns
rust/src/operations/delete.rs
Outdated
} | ||
|
||
// Create a record batch that contains the partition columns plus the path of the file | ||
fn create_partition_record_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can re-use this function for now:
delta-rs/rust/src/table_state_arrow.rs
Line 51 in e046a77
pub fn add_actions_table( |
It handles the different data types for partition values. It has a little more overhead since it also parses out the statistics, but I think that's fine for now. Later on, I expect we'll replace this with expression simplification which will let us use statistics and remove redundant parts of the predicate:
Hi @wjones127 @roeap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my end this look great - thanks for this excellent contribution @Blajda.
I'll leave it open though for @wjones127 to chine in.
Description
This is a full implementation of the Delta Delete Command. Users can now delete records that match a predicate. This implementation is not limited to only partition columns and allows non-partition columns.
This also implements a
find_files
function which can be used to implement the Update command.Related Issue(s)
Documentation