Skip to content

Commit

Permalink
Fix S3 list_objs and cleanup_metadata (#518)
Browse files Browse the repository at this point in the history
  • Loading branch information
mosyp authored Dec 3, 2021
1 parent 04c846f commit cba4e3d
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 105 deletions.
25 changes: 13 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion aws/delta-checkpoint/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async fn process_event(event: &Value) -> Result<(), CheckPointLambdaError> {
table_uri, version
);

checkpoints::create_checkpoint_from_table_uri(&table_uri, version).await?;
checkpoints::create_checkpoint_from_table_uri_and_cleanup(&table_uri, version, None)
.await?;
} else {
info!(
"Not writing checkpoint for table uri {} at delta version {}.",
Expand Down
164 changes: 109 additions & 55 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,75 +86,62 @@ impl From<CheckpointError> for ArrowError {
}
}

/// Loads table from given `table_uri` at given `version` and creates checkpoints for it.
pub async fn create_checkpoint_from_table_uri(
table_uri: &str,
version: DeltaDataTypeVersion,
) -> Result<(), CheckpointError> {
let table = open_table_with_version(table_uri, version).await?;
create_checkpoint(
version,
/// Creates checkpoint at `table.version` for given `table`.
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> {
create_checkpoint_for(
table.version,
table.get_state(),
table.storage.as_ref(),
table_uri,
&table.table_uri,
)
.await?;

if table.version >= 0 && table.get_state().enable_expired_log_cleanup() {
let deleted_log_num = cleanup_expired_logs(
table.version + 1,
table.storage.as_ref(),
table.get_state(),
table_uri,
)
.await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}

Ok(())
}

/// Creates checkpoint at `table.version` for given `table`.
pub async fn create_checkpoint_from_table(table: &DeltaTable) -> Result<(), CheckpointError> {
create_checkpoint(
table.version,
table.get_state(),
/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, DeltaTableError> {
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
table.version + 1,
table.storage.as_ref(),
log_retention_timestamp,
&table.table_uri,
)
.await?;

if table.version >= 0 && table.get_state().enable_expired_log_cleanup() {
let deleted_log_num = cleanup_expired_logs(
table.version + 1,
table.storage.as_ref(),
table.get_state(),
&table.table_uri,
)
.await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}

Ok(())
.await
}

/// Creates checkpoint at `table.version` for given `table`, without deleting expired log files.
/// Exposed for tests.
pub async fn create_checkpoint_from_table_without_cleaning_logs(
table: &DeltaTable,
/// Loads table from given `table_uri` at given `version` and creates checkpoint for it.
/// The `cleanup` param decides whether to run metadata cleanup of obsolete logs.
/// If it's empty then the table's `enableExpiredLogCleanup` is used.
pub async fn create_checkpoint_from_table_uri_and_cleanup(
table_uri: &str,
version: DeltaDataTypeVersion,
cleanup: Option<bool>,
) -> Result<(), CheckpointError> {
create_checkpoint(
table.version,
let table = open_table_with_version(table_uri, version).await?;
create_checkpoint_for(
version,
table.get_state(),
table.storage.as_ref(),
&table.table_uri,
table_uri,
)
.await?;

let enable_expired_log_cleanup =
cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup());

if table.version >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}

Ok(())
}

async fn create_checkpoint(
async fn create_checkpoint_for(
version: DeltaDataTypeVersion,
state: &DeltaTableState,
storage: &dyn StorageBackend,
Expand Down Expand Up @@ -224,20 +211,17 @@ async fn flush_delete_files<T: Fn(&(DeltaDataTypeVersion, ObjectMeta)) -> bool>(
Ok(deleted_num)
}

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
async fn cleanup_expired_logs(
version: DeltaDataTypeVersion,
async fn cleanup_expired_logs_for(
until_version: DeltaDataTypeVersion,
storage: &dyn StorageBackend,
state: &DeltaTableState,
log_retention_timestamp: i64,
table_uri: &str,
) -> Result<i32, DeltaTableError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"^*[/\\]_delta_log[/\\](\d{20})\.(json|checkpoint)*$"#).unwrap();
}

let log_retention_timestamp = Utc::now().timestamp_millis() - state.log_retention_millis();
let mut deleted_log_num = 0;

// Get file objects from table.
Expand All @@ -252,7 +236,7 @@ async fn cleanup_expired_logs(
if let Some(captures) = DELTA_LOG_REGEX.captures(&obj_meta.path) {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: DeltaDataTypeVersion = log_ver_str.parse().unwrap();
if log_ver < version && ts <= log_retention_timestamp {
if log_ver < until_version && ts <= log_retention_timestamp {
candidates.push((log_ver, obj_meta));
}
}
Expand All @@ -275,7 +259,7 @@ async fn cleanup_expired_logs(
};

let should_delete_file = |file: &(i64, ObjectMeta)| {
file.1.modified.timestamp() <= log_retention_timestamp && file.0 < version
file.1.modified.timestamp() <= log_retention_timestamp && file.0 < until_version
};

let mut maybe_delete_files: Vec<(DeltaDataTypeVersion, ObjectMeta)> = Vec::new();
Expand Down Expand Up @@ -609,6 +593,8 @@ fn apply_stats_conversion(
mod tests {
use super::*;
use lazy_static::lazy_static;
use std::time::Duration;
use uuid::Uuid;

#[test]
fn typed_partition_value_from_string_test() {
Expand Down Expand Up @@ -870,4 +856,72 @@ mod tests {
}
});
}

// Last-Modified for S3 could not be altered by user, hence using system pauses which makes
// test to run longer but reliable
async fn cleanup_metadata_test(table_path: &str) {
let log_path = |version| format!("{}/_delta_log/{:020}.json", table_path, version);
let backend = crate::storage::get_backend_for_uri(&table_path).unwrap();

// we don't need to actually populate files with content as cleanup works only with file's metadata
backend.put_obj(&log_path(0), &[]).await.unwrap();

// since we cannot alter s3 object metadata, we mimic it with pauses
// also we forced to use 2 seconds since Last-Modified is stored in seconds
std::thread::sleep(Duration::from_secs(2));
backend.put_obj(&log_path(1), &[]).await.unwrap();

std::thread::sleep(Duration::from_secs(3));
backend.put_obj(&log_path(2), &[]).await.unwrap();

let v0time = backend.head_obj(&log_path(0)).await.unwrap().modified;
let v1time = backend.head_obj(&log_path(1)).await.unwrap().modified;
let v2time = backend.head_obj(&log_path(2)).await.unwrap().modified;

// we choose the retention timestamp to be between v1 and v2 so v2 will be kept but other removed.
let retention_timestamp =
v1time.timestamp_millis() + (v2time.timestamp_millis() - v1time.timestamp_millis()) / 2;

assert!(retention_timestamp > v0time.timestamp_millis());
assert!(retention_timestamp > v1time.timestamp_millis());
assert!(retention_timestamp < v2time.timestamp_millis());

let removed = crate::checkpoints::cleanup_expired_logs_for(
3,
backend.as_ref(),
retention_timestamp,
&table_path,
)
.await
.unwrap();

assert_eq!(removed, 2);
assert!(backend.head_obj(&log_path(0)).await.is_err());
assert!(backend.head_obj(&log_path(1)).await.is_err());
assert!(backend.head_obj(&log_path(2)).await.is_ok());

// after test cleanup
backend.delete_obj(&log_path(2)).await.unwrap();
}

#[tokio::test]
async fn cleanup_metadata_fs_test() {
let table_path = format!("./tests/data/md_cleanup/{}", Uuid::new_v4());
cleanup_metadata_test(&table_path).await;
std::fs::remove_dir_all(&table_path).unwrap();
}

#[cfg(feature = "s3")]
mod cleanup_metadata_s3_test {
use super::*;

#[tokio::test]
async fn cleanup_metadata_s3_test() {
std::env::set_var("AWS_ACCESS_KEY_ID", "test");
std::env::set_var("AWS_SECRET_ACCESS_KEY", "test");
std::env::set_var("AWS_ENDPOINT_URL", "http://localhost:4566");
let table_path = format!("s3://deltars/md_cleanup/{}", Uuid::new_v4());
cleanup_metadata_test(&table_path).await;
}
}
}
4 changes: 2 additions & 2 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl TryFrom<&schema::Schema> for ArrowSchema {
let fields = s
.get_fields()
.iter()
.map(|field| <ArrowField as TryFrom<&schema::SchemaField>>::try_from(field))
.map(<ArrowField as TryFrom<&schema::SchemaField>>::try_from)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;

Ok(ArrowSchema::new(fields))
Expand Down Expand Up @@ -125,7 +125,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType {
schema::SchemaDataType::r#struct(s) => Ok(ArrowDataType::Struct(
s.get_fields()
.iter()
.map(|f| <ArrowField as TryFrom<&schema::SchemaField>>::try_from(f))
.map(<ArrowField as TryFrom<&schema::SchemaField>>::try_from)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?,
)),
schema::SchemaDataType::array(a) => {
Expand Down
1 change: 1 addition & 0 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ impl From<AzureError> for StorageError {
}

/// Describes metadata of a storage object.
#[derive(Debug)]
pub struct ObjectMeta {
/// The path where the object is stored. This is the path component of the object URI.
///
Expand Down
Loading

0 comments on commit cba4e3d

Please sign in to comment.