diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index b03b9abb30..4bd6916527 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -490,7 +490,11 @@ pub enum DeltaOperation { predicate: Option, /// Target optimize size target_size: DeltaDataTypeLong, - }, // TODO: Add more operations + }, + #[serde(rename_all = "camelCase")] + /// Represents a `FileSystemCheck` operation + FileSystemCheck {}, + // TODO: Add more operations } impl DeltaOperation { @@ -502,6 +506,7 @@ impl DeltaOperation { DeltaOperation::Write { .. } => "delta-rs.Write", DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate", DeltaOperation::Optimize { .. } => "delta-rs.Optimize", + DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck", }; commit_info.insert( "operation".to_string(), diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs new file mode 100644 index 0000000000..279748c19d --- /dev/null +++ b/rust/src/operations/filesystem_check.rs @@ -0,0 +1,221 @@ +//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them. +//! +//! Active files are ones that have an add action in the log, but no corresponding remove action. +//! This operation creates a new transaction containing a remove action for each of the missing files. +//! +//! This can be used to repair tables where a data file has been deleted accidentally or +//! purposefully, if the file was corrupted. +//! +//! # Example +//! ```rust ignore +//! let mut table = open_table("../path/to/table")?; +//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(), table.state).await?; +//! ```` +use crate::action::{Action, Add, DeltaOperation, Remove}; +use crate::operations::transaction::commit; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::DeltaDataTypeVersion; +use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError}; +use futures::future::BoxFuture; +use futures::StreamExt; +pub use object_store::path::Path; +use object_store::ObjectStore; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; +use url::{ParseError, Url}; + +/// Audit the Delta Table's active files with the underlying file system. +/// See this module's documentaiton for more information +#[derive(Debug)] +pub struct FileSystemCheckBuilder { + /// A snapshot of the to-be-checked table's state + state: DeltaTableState, + /// Delta object store for handling data files + store: Arc, + /// Don't remove actions to the table log. Just determine which files can be removed + dry_run: bool, +} + +/// Details of the FSCK operation including which files were removed from the log +#[derive(Debug)] +pub struct FileSystemCheckMetrics { + /// Was this a dry run + pub dry_run: bool, + /// Files that wrere removed successfully + pub files_removed: Vec, +} + +struct FileSystemCheckPlan { + /// Version of the snapshot provided + version: DeltaDataTypeVersion, + /// Delta object store for handling data files + store: Arc, + /// Files that no longer exists in undlying ObjectStore but have active add actions + pub files_to_remove: Vec, +} + +fn is_absolute_path(path: &str) -> DeltaResult { + match Url::parse(path) { + Ok(_) => Ok(true), + Err(ParseError::RelativeUrlWithoutBase) => Ok(false), + Err(_) => Err(DeltaTableError::Generic(format!( + "Unable to parse path: {}", + &path + ))), + } +} + +impl FileSystemCheckBuilder { + /// Create a new [`FileSystemCheckBuilder`] + pub fn new(store: Arc, state: DeltaTableState) -> Self { + FileSystemCheckBuilder { + state, + store, + dry_run: false, + } + } + + /// Only determine which add actions should be removed. A dry run will not commit actions to the Delta log + pub fn with_dry_run(mut self, dry_run: bool) -> Self { + self.dry_run = dry_run; + self + } + + async fn create_fsck_plan(&self) -> DeltaResult { + let mut files_relative: HashMap<&str, &Add> = + HashMap::with_capacity(self.state.files().len()); + let version = self.state.version(); + let store = self.store.clone(); + + for active in self.state.files() { + if is_absolute_path(&active.path)? { + return Err(DeltaTableError::Generic( + "Filesystem check does not support absolute paths".to_string(), + )); + } else { + files_relative.insert(&active.path, active); + } + } + + let mut files = self.store.list(None).await?; + while let Some(result) = files.next().await { + let file = result?; + files_relative.remove(file.location.as_ref()); + + if files_relative.is_empty() { + break; + } + } + + let files_to_remove: Vec = files_relative + .into_values() + .map(|file| file.to_owned()) + .collect(); + + Ok(FileSystemCheckPlan { + files_to_remove, + version, + store, + }) + } +} + +impl FileSystemCheckPlan { + pub async fn execute(self) -> DeltaResult { + if self.files_to_remove.is_empty() { + return Ok(FileSystemCheckMetrics { + dry_run: false, + files_removed: Vec::new(), + }); + } + + let mut actions = Vec::with_capacity(self.files_to_remove.len()); + let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len()); + let version = self.version; + let store = &self.store; + + for file in self.files_to_remove { + let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong; + removed_file_paths.push(file.path.clone()); + actions.push(Action::remove(Remove { + path: file.path, + deletion_timestamp: Some(deletion_time), + data_change: true, + extended_file_metadata: None, + partition_values: Some(file.partition_values), + size: Some(file.size), + tags: file.tags, + })); + } + + commit( + store, + version + 1, + actions, + DeltaOperation::FileSystemCheck {}, + None, + ) + .await?; + + Ok(FileSystemCheckMetrics { + dry_run: false, + files_removed: removed_file_paths, + }) + } +} + +impl std::future::IntoFuture for FileSystemCheckBuilder { + type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let plan = this.create_fsck_plan().await?; + if this.dry_run { + return Ok(( + DeltaTable::new_with_state(this.store, this.state), + FileSystemCheckMetrics { + files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), + dry_run: true, + }, + )); + } + + let metrics = plan.execute().await?; + let mut table = DeltaTable::new_with_state(this.store, this.state); + table.update().await?; + Ok((table, metrics)) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn absolute_path() { + assert!(!is_absolute_path( + "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet" + ) + .unwrap()); + assert!(!is_absolute_path( + "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet" + ) + .unwrap()); + + assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap()); + assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap()); + assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap()); + assert!(is_absolute_path("s3://container/path/file.parquet").unwrap()); + assert!(is_absolute_path("gs://container/path/file.parquet").unwrap()); + assert!(is_absolute_path("scheme://table/file.parquet").unwrap()); + } +} diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index b49fd44b6d..212bf34414 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -8,11 +8,13 @@ //! if the operation returns data as well. use self::create::CreateBuilder; +use self::filesystem_check::FileSystemCheckBuilder; use self::vacuum::VacuumBuilder; use crate::builder::DeltaTableBuilder; use crate::{DeltaResult, DeltaTable, DeltaTableError}; pub mod create; +pub mod filesystem_check; pub mod transaction; pub mod vacuum; @@ -115,6 +117,12 @@ impl DeltaOps { pub fn vacuum(self) -> VacuumBuilder { VacuumBuilder::new(self.0.object_store(), self.0.state) } + + /// Audit active files with files present on the filesystem + #[must_use] + pub fn filesystem_check(self) -> FileSystemCheckBuilder { + FileSystemCheckBuilder::new(self.0.object_store(), self.0.state) + } } impl From for DeltaOps { diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 639ec7dc22..13a350095c 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -53,7 +53,6 @@ impl IntegrationContext { StorageIntegration::Google => format!("gs://{}", &bucket), StorageIntegration::Local => format!("file://{}", &bucket), }; - // the "storage_backend" will always point to the root ofg the object store. // TODO should we provide the store via object_Store builders? let store = match integration { @@ -89,6 +88,12 @@ impl IntegrationContext { } } + pub fn table_builder(&self, table: TestTables) -> DeltaTableBuilder { + let name = table.as_name(); + let table_uri = format!("{}/{}", self.root_uri(), &name); + DeltaTableBuilder::from_uri(table_uri).with_allow_http(true) + } + pub fn uri_for_table(&self, table: TestTables) -> String { format!("{}/{}", self.root_uri(), table.as_name()) } @@ -186,6 +191,7 @@ pub enum TestTables { Simple, SimpleCommit, Golden, + Delta0_8_0Partitioned, Custom(String), } @@ -204,6 +210,11 @@ impl TestTables { .to_str() .unwrap() .to_owned(), + Self::Delta0_8_0Partitioned => data_path + .join("delta-0.8.0-partitioned") + .to_str() + .unwrap() + .to_owned(), // the data path for upload does not apply to custom tables. Self::Custom(_) => todo!(), } @@ -214,12 +225,14 @@ impl TestTables { Self::Simple => "simple".into(), Self::SimpleCommit => "simple_commit".into(), Self::Golden => "golden".into(), + Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(), Self::Custom(name) => name.to_owned(), } } } -fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { +/// Set environment variable if it is not set +pub fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { if std::env::var(key.as_ref()).is_err() { std::env::set_var(key.as_ref(), value.as_ref()) }; diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs new file mode 100644 index 0000000000..602371f99f --- /dev/null +++ b/rust/tests/command_filesystem_check.rs @@ -0,0 +1,142 @@ +#![cfg(all(feature = "integration_test"))] + +use deltalake::test_utils::{ + set_env_if_not_set, IntegrationContext, StorageIntegration, TestResult, TestTables, +}; +use deltalake::Path; +use deltalake::{DeltaOps, DeltaTableError}; +use serial_test::serial; + +mod common; + +#[tokio::test] +#[serial] +async fn test_filesystem_check_local() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Local).await?) +} + +#[cfg(any(feature = "s3", feature = "s3-rustls"))] +#[tokio::test] +#[serial] +async fn test_filesystem_check_aws() -> TestResult { + set_env_if_not_set("AWS_S3_ALLOW_UNSAFE_RENAME", "true"); + set_env_if_not_set("AWS_S3_LOCKING_PROVIDER", "none"); + Ok(test_filesystem_check(StorageIntegration::Amazon).await?) +} + +#[cfg(feature = "azure")] +#[tokio::test] +#[serial] +async fn test_filesystem_check_azure() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Microsoft).await?) +} + +#[cfg(feature = "gcs")] +#[tokio::test] +#[serial] +async fn test_filesystem_check_gcp() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Google).await?) +} + +async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { + let context = IntegrationContext::new(storage)?; + context.load_table(TestTables::Simple).await?; + let file = "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet"; + let path = Path::from_iter([&TestTables::Simple.as_name(), file]); + + // Delete an active file from underlying storage without an update to the log to simulate an external fault + context.object_store().delete(&path).await?; + + let table = context.table_builder(TestTables::Simple).load().await?; + let version = table.state.version(); + let active = table.state.files().len(); + + // Validate a Dry run does not mutate the table log and indentifies orphaned add actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().with_dry_run(true).await?; + assert_eq!(version, table.state.version()); + assert_eq!(active, table.state.files().len()); + assert_eq!(vec![file.to_string()], metrics.files_removed); + + // Validate a run updates the table version with proper remove actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().await?; + assert_eq!(version + 1, table.state.version()); + assert_eq!(active - 1, table.state.files().len()); + assert_eq!(vec![file.to_string()], metrics.files_removed); + + let remove = table.state.all_tombstones().get(file).unwrap(); + assert_eq!(remove.data_change, true); + + // An additonal run should return an empty list of orphaned actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().await?; + assert_eq!(version + 1, table.state.version()); + assert_eq!(active - 1, table.state.files().len()); + assert!(metrics.files_removed.is_empty()); + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn test_filesystem_check_partitioned() -> TestResult { + let storage = StorageIntegration::Local; + let context = IntegrationContext::new(storage)?; + context + .load_table(TestTables::Delta0_8_0Partitioned) + .await?; + let file = "year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet"; + let path = Path::parse(TestTables::Delta0_8_0Partitioned.as_name() + "/" + file).unwrap(); + + // Delete an active file from underlying storage without an update to the log to simulate an external fault + context.object_store().delete(&path).await?; + + let table = context + .table_builder(TestTables::Delta0_8_0Partitioned) + .load() + .await?; + let version = table.state.version(); + let active = table.state.files().len(); + + // Validate a run updates the table version with proper remove actions + let op = DeltaOps::from(table); + let (table, metrics) = op.filesystem_check().await?; + assert_eq!(version + 1, table.state.version()); + assert_eq!(active - 1, table.state.files().len()); + assert_eq!(vec![file.to_string()], metrics.files_removed); + + let remove = table.state.all_tombstones().get(file).unwrap(); + assert_eq!(remove.data_change, true); + Ok(()) +} + +#[tokio::test] +#[serial] +async fn test_filesystem_check_outdated() -> TestResult { + // Validate failure when a non dry only executes on the latest version + let context = IntegrationContext::new(StorageIntegration::Local)?; + context.load_table(TestTables::Simple).await?; + let file = "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"; + let path = Path::from_iter([&TestTables::Simple.as_name(), file]); + + // Delete an active file from underlying storage without an update to the log to simulate an external fault + context.object_store().delete(&path).await?; + + let table = context + .table_builder(TestTables::Simple) + .with_version(2) + .load() + .await?; + + let op = DeltaOps::from(table); + let res = op.filesystem_check().with_dry_run(false).await; + + if let Err(DeltaTableError::VersionAlreadyExists(version)) = res { + assert!(version == 3); + } else { + assert!(false); + } + + Ok(()) +}