Skip to content

Commit

Permalink
refactor: perform bulk deletes during metadata cleanup
Browse files Browse the repository at this point in the history
In addition to doing bulk deletes, I removed what seems like (at least to me)
unnecessary code. At it's core, files are considered up for deletion
when their last_modified time is older than the cutoff time AND the version
if less than the specific version (usually the latest version).
  • Loading branch information
cmackenzie1 authored and rtyler committed Oct 25, 2023
1 parent 3d85b9b commit 8a1b5d6
Showing 1 changed file with 31 additions and 120 deletions.
151 changes: 31 additions & 120 deletions rust/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::iter::Iterator;
use std::ops::Add;

use arrow::datatypes::Schema as ArrowSchema;
use arrow::error::ArrowError;
use arrow::json::ReaderBuilder;

use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use chrono::{Datelike, Utc};
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use log::*;
use object_store::{path::Path, ObjectMeta, ObjectStore};
use object_store::ObjectStore;
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
use regex::Regex;
Expand Down Expand Up @@ -141,44 +140,12 @@ pub async fn create_checkpoint_for(
Ok(())
}

async fn flush_delete_files<T: Fn(&(i64, ObjectMeta)) -> bool>(
storage: &DeltaObjectStore,
maybe_delete_files: &mut Vec<(i64, ObjectMeta)>,
files_to_delete: &mut Vec<(i64, ObjectMeta)>,
should_delete_file: T,
) -> Result<i32, ProtocolError> {
if !maybe_delete_files.is_empty() && should_delete_file(maybe_delete_files.last().unwrap()) {
files_to_delete.append(maybe_delete_files);
}

let deleted = files_to_delete
.iter_mut()
.map(|file| async move {
match storage.delete(&file.1.location).await {
Ok(_) => Ok(1),
Err(e) => Err(ProtocolError::from(e)),
}
})
.collect::<Vec<_>>();

let mut deleted_num = 0;
for x in deleted {
match x.await {
Ok(_) => deleted_num += 1,
Err(e) => return Err(e),
}
}

files_to_delete.clear();

Ok(deleted_num)
}

/// exposed only for integration testing - DO NOT USE otherwise
/// Deletes all delta log commits that are older than the cutoff time
/// and less than the specified version.
pub async fn cleanup_expired_logs_for(
until_version: i64,
storage: &DeltaObjectStore,
log_retention_timestamp: i64,
cutoff_timestamp: i64,
) -> Result<i32, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Expand All @@ -188,101 +155,45 @@ pub async fn cleanup_expired_logs_for(
let mut deleted_log_num = 0;

// Get file objects from table.
let mut candidates: Vec<(i64, ObjectMeta)> = Vec::new();
let mut candidates = Vec::new();
let mut stream = storage.list(Some(storage.log_path())).await?;
while let Some(obj_meta) = stream.next().await {
let obj_meta = obj_meta?;

while let Some(obj_meta) = stream.try_next().await? {
let ts = obj_meta.last_modified.timestamp_millis();

if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: i64 = log_ver_str.parse().unwrap();
if log_ver < until_version && ts <= log_retention_timestamp {
candidates.push((log_ver, obj_meta));
if log_ver < until_version && ts <= cutoff_timestamp {
candidates.push(obj_meta);
}
}
}

// Sort files by file object version.
candidates.sort_by(|a, b| a.0.cmp(&b.0));

let mut last_file: (i64, ObjectMeta) = (
0,
ObjectMeta {
location: Path::from(""),
last_modified: DateTime::<Utc>::MIN_UTC,
size: 0,
e_tag: None,
},
);
let file_needs_time_adjustment =
|current_file: &(i64, ObjectMeta), last_file: &(i64, ObjectMeta)| {
last_file.0 < current_file.0
&& last_file.1.last_modified.timestamp() >= current_file.1.last_modified.timestamp()
};

let should_delete_file = |file: &(i64, ObjectMeta)| {
file.1.last_modified.timestamp() <= log_retention_timestamp && file.0 < until_version
};

let mut maybe_delete_files: Vec<(i64, ObjectMeta)> = Vec::new();
let mut files_to_delete: Vec<(i64, ObjectMeta)> = Vec::new();

// Init
if !candidates.is_empty() {
let removed = candidates.remove(0);
last_file = (removed.0, removed.1.clone());
maybe_delete_files.push(removed);
// Perform rolling deletes if more than 1000 objects are up for deletion
if candidates.len() > 1000 {
let deleted = storage
.delete_stream(
futures::stream::iter(candidates.iter().cloned().map(|f| Ok(f.location)))
.boxed(),
)
.try_collect::<Vec<_>>()
.await?;
deleted_log_num += deleted.len() as i32;
candidates.clear();
}
}

let mut current_file: (i64, ObjectMeta);
loop {
if candidates.is_empty() {
deleted_log_num += flush_delete_files(
storage,
&mut maybe_delete_files,
&mut files_to_delete,
should_delete_file,
)
.await?;

return Ok(deleted_log_num);
}
current_file = candidates.remove(0);

if file_needs_time_adjustment(&current_file, &last_file) {
let updated = (
current_file.0,
ObjectMeta {
location: current_file.1.location.clone(),
last_modified: last_file.1.last_modified.add(Duration::seconds(1)),
size: 0,
e_tag: None,
},
);
maybe_delete_files.push(updated);
last_file = (
maybe_delete_files.last().unwrap().0,
maybe_delete_files.last().unwrap().1.clone(),
);
} else {
let deleted = flush_delete_files(
storage,
&mut maybe_delete_files,
&mut files_to_delete,
should_delete_file,
if !candidates.is_empty() {
let deleted = storage
.delete_stream(
futures::stream::iter(candidates.iter().cloned().map(|f| Ok(f.location))).boxed(),
)
.try_collect::<Vec<_>>()
.await?;
if deleted == 0 {
return Ok(deleted_log_num);
}
deleted_log_num += deleted;

maybe_delete_files.push(current_file.clone());
last_file = current_file;
}
deleted_log_num += deleted.len() as i32;
candidates.clear();
}

Ok(deleted_log_num)
}

fn parquet_bytes_from_state(
Expand Down

0 comments on commit 8a1b5d6

Please sign in to comment.