diff --git a/rust/src/protocol/checkpoints.rs b/rust/src/protocol/checkpoints.rs index b8b60ae0f4..532f1cf68c 100644 --- a/rust/src/protocol/checkpoints.rs +++ b/rust/src/protocol/checkpoints.rs @@ -74,7 +74,7 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> /// Delete expires log files before given version from table. The table log retention is based on /// the `logRetentionDuration` property of the Delta Table, 30 days by default. -pub async fn cleanup_metadata(table: &DeltaTable) -> Result { +pub async fn cleanup_metadata(table: &DeltaTable) -> Result { let log_retention_timestamp = Utc::now().timestamp_millis() - table.get_state().log_retention_millis(); cleanup_expired_logs_for( @@ -146,54 +146,49 @@ pub async fn cleanup_expired_logs_for( until_version: i64, storage: &DeltaObjectStore, cutoff_timestamp: i64, -) -> Result { +) -> Result { lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap(); } - let mut deleted_log_num = 0; - - // Get file objects from table. - let mut candidates = Vec::new(); - let mut stream = storage.list(Some(storage.log_path())).await?; - while let Some(obj_meta) = stream.try_next().await? { - let ts = obj_meta.last_modified.timestamp_millis(); - - if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) { - let log_ver_str = captures.get(1).unwrap().as_str(); - let log_ver: i64 = log_ver_str.parse().unwrap(); - if log_ver < until_version && ts <= cutoff_timestamp { - candidates.push(obj_meta); - } - } - - // Perform rolling deletes if more than 1000 objects are up for deletion - if candidates.len() > 1000 { - let deleted = storage - .delete_stream( - futures::stream::iter(candidates.iter().cloned().map(|f| Ok(f.location))) - .boxed(), - ) - .try_collect::>() - .await?; - deleted_log_num += deleted.len() as i32; - candidates.clear(); - } - } - - if !candidates.is_empty() { - let deleted = storage - .delete_stream( - futures::stream::iter(candidates.iter().cloned().map(|f| Ok(f.location))).boxed(), - ) - .try_collect::>() - .await?; - deleted_log_num += deleted.len() as i32; - candidates.clear(); - } + // Feed a stream of candidate deletion files directly into the delete_stream + // function to try to improve the speed of cleanup and reduce the need for + // intermediate memory. + let deleted = storage + .delete_stream( + storage + .list(Some(storage.log_path())) + .await? + // Pass along only the Ok results from storage.list + .filter(|res| futures::future::ready(res.is_ok())) + // This predicate function will filter out any locations that don't + // match the given timestamp range + .filter_map(|meta: Result| async move { + let meta = meta.unwrap(); + let ts = meta.last_modified.timestamp_millis(); + + match DELTA_LOG_REGEX.captures(meta.location.as_ref()) { + Some(captures) => { + let log_ver_str = captures.get(1).unwrap().as_str(); + let log_ver: i64 = log_ver_str.parse().unwrap(); + if log_ver < until_version && ts <= cutoff_timestamp { + // This location is ready to be deleted + Some(Ok(meta.location)) + } else { + None + } + } + None => None, + } + }) + .boxed(), + ) + .try_collect::>() + .await?; - Ok(deleted_log_num) + debug!("Deleted {} expired logs", deleted.len()); + Ok(deleted.len()) } fn parquet_bytes_from_state(