Skip to content

Commit

Permalink
fix: ensure cleanup_metadata never removes current version
Browse files Browse the repository at this point in the history
Previously, if a table had no new commits but log_retention_expiry
lapsed for the latest commit or checkpoint file, the file would be
removed leaving the table in a corrupted state.

To prevent this, cleanup_metadata should only run with `current_version`
equal to the table states current version. For example, `cleanup_metadata`
for v10 would only touch files for v0-v9.
  • Loading branch information
cmackenzie1 committed Jun 1, 2023
1 parent 4d862a0 commit 3dda954
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 3 deletions.
2 changes: 1 addition & 1 deletion rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, DeltaTableError
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
table.version() + 1,
table.version(),
table.storage.as_ref(),
log_retention_timestamp,
)
Expand Down
111 changes: 109 additions & 2 deletions rust/tests/integration_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
#![cfg(feature = "integration_test")]

use deltalake::checkpoints::cleanup_expired_logs_for;
use chrono::Utc;
use deltalake::checkpoints::{cleanup_expired_logs_for, create_checkpoint};
use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult};
use deltalake::{DeltaTableBuilder, ObjectStore};
use deltalake::writer::{DeltaWriter, JsonWriter};
use deltalake::{DeltaOps, DeltaResult, DeltaTableBuilder, ObjectStore, SchemaDataType};
use object_store::path::Path;
use serde_json::json;
use serial_test::serial;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::test]
async fn cleanup_metadata_fs_test() -> TestResult {
Expand Down Expand Up @@ -104,3 +109,105 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult {

Ok(())
}

#[tokio::test]
async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> {
let _ = std::fs::remove_dir_all("./tests/data/issue_1420");

let mut table = DeltaOps::try_from_uri("./tests/data/issue_1420")
.await?
.create()
.with_column(
"id",
SchemaDataType::primitive("integer".to_string()),
false,
None,
)
.await?;

let mut writer = JsonWriter::for_table(&table)?;
writer.write(vec![json!({"id": 1})]).await?;
writer.flush_and_commit(&mut table).await?; // v1

let ts = Utc::now(); // use this ts for log retention expiry
sleep(Duration::from_secs(1)).await;

writer.write(vec![json!({"id": 2})]).await?;
writer.flush_and_commit(&mut table).await?; // v2
assert_eq!(table.version(), 2);

create_checkpoint(&table).await.unwrap(); // v2.checkpoint.parquet

// Should delete v1 but not v2 or v2.checkpoint.parquet
cleanup_expired_logs_for(
table.version(),
table.object_store().as_ref(),
ts.timestamp_millis(),
)
.await?;

assert!(
table
.object_store()
.head(&Path::from(format!("_delta_log/{:020}.json", 1)))
.await
.is_err(),
"commit should exist"
);

assert!(
table
.object_store()
.head(&Path::from(format!("_delta_log/{:020}.json", 2)))
.await
.is_ok(),
"commit should exist"
);

assert!(
table
.object_store()
.head(&Path::from(format!(
"_delta_log/{:020}.checkpoint.parquet",
2
)))
.await
.is_ok(),
"checkpoint should exist"
);

// pretend time advanced but there is no new versions after v2
// v2 and v2.checkpoint.parquet should still be there
let ts = Utc::now();
sleep(Duration::from_secs(1)).await;

cleanup_expired_logs_for(
table.version(),
table.object_store().as_ref(),
ts.timestamp_millis(),
)
.await?;

assert!(
table
.object_store()
.head(&Path::from(format!("_delta_log/{:020}.json", 2)))
.await
.is_ok(),
"commit should exist"
);

assert!(
table
.object_store()
.head(&Path::from(format!(
"_delta_log/{:020}.checkpoint.parquet",
2
)))
.await
.is_ok(),
"checkpoint should exist"
);

Ok(())
}

0 comments on commit 3dda954

Please sign in to comment.