Skip to content

Commit

Permalink
feat: add enforce_retention_duration param to vacuum method
Browse files Browse the repository at this point in the history
This maps to the reference implementation's
spark.databricks.delta.retentionDurationCheck.enabled config.
  • Loading branch information
houqp committed Jun 19, 2022
1 parent 649fdce commit c746008
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 14 deletions.
7 changes: 5 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,10 @@ def history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
]

def vacuum(
self, retention_hours: Optional[int] = None, dry_run: bool = True
self,
retention_hours: Optional[int] = None,
dry_run: bool = True,
enforce_retention_duration: bool = True,
) -> List[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
Expand All @@ -254,7 +257,7 @@ def vacuum(
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

return self._table.vacuum(dry_run, retention_hours)
return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration)

def pyarrow_schema(self) -> pyarrow.Schema:
"""
Expand Down
13 changes: 11 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,18 @@ impl RawDeltaTable {
}

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&mut self, dry_run: bool, retention_hours: Option<u64>) -> PyResult<Vec<String>> {
pub fn vacuum(
&mut self,
dry_run: bool,
retention_hours: Option<u64>,
enforce_retention_duration: bool,
) -> PyResult<Vec<String>> {
rt()?
.block_on(self._table.vacuum(retention_hours, dry_run))
.block_on(self._table.vacuum(
retention_hours,
dry_run,
Some(enforce_retention_duration),
))
.map_err(PyDeltaTableError::from_raw)
}

Expand Down
25 changes: 19 additions & 6 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,14 @@ pub enum DeltaTableError {
},
/// Error returned when Vacuum retention period is below the safe threshold
#[error(
"Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
"Invalid retention period, retention for Vacuum is configured to be greater than {} milliseconds, got {} milliseconds", .provided, .min
)]
InvalidVacuumRetentionPeriod,
InvalidVacuumRetentionPeriod {
/// User provided retention on vacuum call
provided: DeltaDataTypeLong,
/// Minimal retention configured in delta table config
min: DeltaDataTypeLong,
},
/// Error returned when a line from log record is invalid.
#[error("Failed to read line from log record")]
Io {
Expand Down Expand Up @@ -1154,13 +1159,20 @@ impl DeltaTable {
fn get_stale_files(
&self,
retention_hours: Option<u64>,
enforce_retention_duration: Option<bool>,
) -> Result<HashSet<&str>, DeltaTableError> {
let retention_millis = retention_hours
.map(|hours| 3600000 * hours as i64)
.unwrap_or_else(|| self.state.tombstone_retention_millis());

if retention_millis < self.state.tombstone_retention_millis() {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod);
if enforce_retention_duration.unwrap_or(true) {
let min_retention_mills = self.state.tombstone_retention_millis();
if retention_millis < min_retention_mills {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod {
provided: retention_millis,
min: min_retention_mills,
});
}
}

let tombstone_retention_timestamp = Utc::now().timestamp_millis() - retention_millis;
Expand Down Expand Up @@ -1206,8 +1218,10 @@ impl DeltaTable {
&self,
retention_hours: Option<u64>,
dry_run: bool,
enforce_retention_duration: Option<bool>,
) -> Result<Vec<String>, DeltaTableError> {
let expired_tombstones = self.get_stale_files(retention_hours)?;
let expired_tombstones =
self.get_stale_files(retention_hours, enforce_retention_duration)?;
let valid_files = self.get_file_set();

let mut files_to_delete = vec![];
Expand Down Expand Up @@ -1775,7 +1789,6 @@ mod tests {
tmp_dir.path().to_str().unwrap(),
));
let mut dt = DeltaTable::new(path, backend, DeltaTableConfig::default()).unwrap();
// let mut dt = DeltaTable::new(path, backend, DeltaTableLoadOptions::default()).unwrap();

let mut commit_info = Map::<String, Value>::new();
commit_info.insert(
Expand Down
29 changes: 25 additions & 4 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,16 +420,34 @@ async fn vacuum_delta_8_0_table() {

assert!(matches!(
table
.vacuum(Some(retention_hours), dry_run)
.vacuum(Some(retention_hours), dry_run, None)
.await
.unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod {
provided,
min,
} if provided == (retention_hours * 3600000) as i64
&& min == table.get_state().tombstone_retention_millis(),
));

// do not enforce retention duration check with 0 hour will purge all files
assert_eq!(
table.vacuum(Some(0), dry_run, Some(false)).await.unwrap(),
vec![backend.join_paths(&[
"tests",
"data",
"delta-0.8.0",
"part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet",
])]
);

let retention_hours = 169;

assert_eq!(
table.vacuum(Some(retention_hours), dry_run).await.unwrap(),
table
.vacuum(Some(retention_hours), dry_run, None)
.await
.unwrap(),
vec![backend.join_paths(&[
"tests",
"data",
Expand All @@ -446,7 +464,10 @@ async fn vacuum_delta_8_0_table() {
let empty: Vec<String> = Vec::new();

assert_eq!(
table.vacuum(Some(retention_hours), dry_run).await.unwrap(),
table
.vacuum(Some(retention_hours), dry_run, None)
.await
.unwrap(),
empty
);
}
Expand Down

0 comments on commit c746008

Please sign in to comment.