Skip to content

Commit

Permalink
Make an attempt at improving the utilization of delete_stream for cle…
Browse files Browse the repository at this point in the history
…aning up expired logs

This change builds on @cmackenzie1's work and feeds the list stream directly into
the delete_stream with a predicate function to identify paths for deletion
  • Loading branch information
rtyler committed Oct 25, 2023
1 parent 8a1b5d6 commit 1847b90
Showing 1 changed file with 38 additions and 43 deletions.
81 changes: 38 additions & 43 deletions rust/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError>

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, ProtocolError> {
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<usize, ProtocolError> {
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
Expand Down Expand Up @@ -146,54 +146,49 @@ pub async fn cleanup_expired_logs_for(
until_version: i64,
storage: &DeltaObjectStore,
cutoff_timestamp: i64,
) -> Result<i32, ProtocolError> {
) -> Result<usize, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap();
}

let mut deleted_log_num = 0;

// Get file objects from table.
let mut candidates = Vec::new();
let mut stream = storage.list(Some(storage.log_path())).await?;
while let Some(obj_meta) = stream.try_next().await? {
let ts = obj_meta.last_modified.timestamp_millis();

if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: i64 = log_ver_str.parse().unwrap();
if log_ver < until_version && ts <= cutoff_timestamp {
candidates.push(obj_meta);
}
}

// Perform rolling deletes if more than 1000 objects are up for deletion
if candidates.len() > 1000 {
let deleted = storage
.delete_stream(
futures::stream::iter(candidates.iter().cloned().map(|f| Ok(f.location)))
.boxed(),
)
.try_collect::<Vec<_>>()
.await?;
deleted_log_num += deleted.len() as i32;
candidates.clear();
}
}

if !candidates.is_empty() {
let deleted = storage
.delete_stream(
futures::stream::iter(candidates.iter().cloned().map(|f| Ok(f.location))).boxed(),
)
.try_collect::<Vec<_>>()
.await?;
deleted_log_num += deleted.len() as i32;
candidates.clear();
}
// Feed a stream of candidate deletion files directly into the delete_stream
// function to try to improve the speed of cleanup and reduce the need for
// intermediate memory.
let deleted = storage
.delete_stream(
storage
.list(Some(storage.log_path()))
.await?
// Pass along only the Ok results from storage.list
.filter(|res| futures::future::ready(res.is_ok()))
// This predicate function will filter out any locations that don't
// match the given timestamp range
.filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
let meta = meta.unwrap();
let ts = meta.last_modified.timestamp_millis();

match DELTA_LOG_REGEX.captures(meta.location.as_ref()) {
Some(captures) => {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: i64 = log_ver_str.parse().unwrap();
if log_ver < until_version && ts <= cutoff_timestamp {
// This location is ready to be deleted
Some(Ok(meta.location))
} else {
None
}
}
None => None,
}
})
.boxed(),
)
.try_collect::<Vec<_>>()
.await?;

Ok(deleted_log_num)
debug!("Deleted {} expired logs", deleted.len());
Ok(deleted.len())
}

fn parquet_bytes_from_state(
Expand Down

0 comments on commit 1847b90

Please sign in to comment.