-
Notifications
You must be signed in to change notification settings - Fork 406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: move vacuum command to operations module #1045
Conversation
I do agree that the builder interface makes the style more consistent and the argument passing less error prone 👍 That said, I think having an extra thin wrapper API on the table struct like below will provide a better developer UX: let result = table.vacuum()
.with_retention_period(Duration::hours(169))
.with_dry_run(true)
.await
.unwrap(); |
Absoluetely, even more so on the python side of things. This unfortuantely lead me down a little bit of a rabbit hole :D. Essentially again the question on how to most efficiently share the log for commands etc. Right now in the code we just clone that state, but to my understanding this is not the way to go ... Currently I am exploring a few options without a clear winner yet. However I do want to avoid making the |
rust/src/delta.rs
Outdated
let state = std::mem::take(&mut self.state); | ||
let mut plan = VacuumBuilder::new(self.object_store(), state) | ||
.with_dry_run(dry_run) | ||
.with_enforce_retention_duration(enforce_retention_duration); | ||
if let Some(hours) = retention_hours { | ||
plan = plan.with_retention_period(Duration::hours(hours as i64)); | ||
} | ||
|
||
let (table, metrics) = plan.await?; | ||
self.state = table.state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I am a little bit concernbed about the error handling. If we fail duing the execution, we are still left with a DeltaTable with an empty state. If users wanted to recover from a failure and keep using the table, they would probably see unexpexted results.
Once we manage the state in arrow, we can likely have near zero-cost clones of the state and mitigate the issue. For now one a way to handle this could be to handle the error internally, and load the previous table state again. But not sure what users wold expect / want to see.
The same applies on the python side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make it immutable and put it behind an Arc, so we can easily clone pointers to it. Not sure how viable that is though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would make the mutation operations like merge()
and process_action()
less efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we clone the state for now, and as a follow up let's figure out how to do this more efficiently?
@@ -330,6 +345,39 @@ impl DeltaTableState { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
/// Obtain Add actions for files that match the filter | |||
pub fn get_active_add_actions_by_partitions<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving this function to the state was part of an experiment. I did not move it back, since it exclusiveliy works on the state and from what I can see we are going to need functionality as such as we are evolving operations capabilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice improvement.
However I do want to avoid making the DeltaTableState mutable when being shared
I'm thinking maybe the builders need to take &mut DeltaTable
or &mut DeltaTableState
. Then the borrow checker will enforce this. DeltaTableState
is clone-able, so that means users can always clone themselves if they don't want to deal with the lifetimes.
let expired_tombstones = get_stale_files(&self.snapshot, retention_period, now_millis); | ||
let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>(); | ||
|
||
let mut files_to_delete = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to eventually make this an iterator, so we don't have to wait to resolve the entire file list to start deleting.
.ok_or(DeltaTableError::NoMetadata)? | ||
.partition_columns | ||
.iter() | ||
.any(|partition_column| path_name.starts_with(partition_column))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need this line. I worry it will accidentally exclude a directory that shouldn't be there. And the partition directories should already be handled by the tombstones, right? Plus, Hive-partitioned directory structure isn't guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we should take this out and see if the _date=2022-07-03/delete_me.parquet
test case passes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just deleting the line unfortunately led to failing tests, but I did not dive deeper. My general feeling is we have to re-visit the logic anyhow, since we are not yet cleaning up "associated" files. I can look further into this here, but would prefer in a follow up :) - maybe when doing #688.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Thanks for looking into it.
rust/src/delta.rs
Outdated
let state = std::mem::take(&mut self.state); | ||
let mut plan = VacuumBuilder::new(self.object_store(), state) | ||
.with_dry_run(dry_run) | ||
.with_enforce_retention_duration(enforce_retention_duration); | ||
if let Some(hours) = retention_hours { | ||
plan = plan.with_retention_period(Duration::hours(hours as i64)); | ||
} | ||
|
||
let (table, metrics) = plan.await?; | ||
self.state = table.state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could make it immutable and put it behind an Arc, so we can easily clone pointers to it. Not sure how viable that is though.
rust/src/delta.rs
Outdated
let state = std::mem::take(&mut self.state); | ||
let mut plan = VacuumBuilder::new(self.object_store(), state) | ||
.with_dry_run(dry_run) | ||
.with_enforce_retention_duration(enforce_retention_duration); | ||
if let Some(hours) = retention_hours { | ||
plan = plan.with_retention_period(Duration::hours(hours as i64)); | ||
} | ||
|
||
let (table, metrics) = plan.await?; | ||
self.state = table.state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would make the mutation operations like merge()
and process_action()
less efficient.
rust/src/delta.rs
Outdated
let state = std::mem::take(&mut self.state); | ||
let mut plan = VacuumBuilder::new(self.object_store(), state) | ||
.with_dry_run(dry_run) | ||
.with_enforce_retention_duration(enforce_retention_duration); | ||
if let Some(hours) = retention_hours { | ||
plan = plan.with_retention_period(Duration::hours(hours as i64)); | ||
} | ||
|
||
let (table, metrics) = plan.await?; | ||
self.state = table.state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we clone the state for now, and as a follow up let's figure out how to do this more efficiently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Though if you want, hiding that one function would be good.
Co-authored-by: Will Jones <[email protected]>
@wjones127 - applied your suggestions! Could you re-approve? :) |
# Description Moving the `vacuum` operation into the operations module and adopting `IntoFuture` for the command builder. This is breaking the APIs for the builder (now with consistent setter names) but we are able to keep the APIs for `DeltaTable` in rust and python. In a follow up I would like to move th optimize command as well, This however may require refactoring the `PartitionValue` since we can only deal with `static` lifetimes when using `IntoFuture`, A while back we talked about pulling in `ScalarValue` from datafusion to optimize that implementation and maybe that's a good opportunitiy to look into that as well. # Related Issue(s) <!--- For example: - closes delta-io#106 ---> # Documentation <!--- Share links to useful documentation ---> Co-authored-by: Will Jones <[email protected]>
Description
Moving the
vacuum
operation into the operations module and adoptingIntoFuture
for the command builder. This is breaking the APIs for the builder (now with consistent setter names) but we are able to keep the APIs forDeltaTable
in rust and python.In a follow up I would like to move th optimize command as well, This however may require refactoring the
PartitionValue
since we can only deal withstatic
lifetimes when usingIntoFuture
, A while back we talked about pulling inScalarValue
from datafusion to optimize that implementation and maybe that's a good opportunitiy to look into that as well.Related Issue(s)
Documentation