Skip to content

Commit

Permalink
Implement filesystem check (delta-io#1103)
Browse files Browse the repository at this point in the history
# Description
Implementation of the filesystem check operation.

The implementation is fairly straight forward with a HEAD call being
made for each active file to check if it exists.
A remove action is then made for each file that is orphaned.

An alternative solution is instead to maintain a hashset with all active
files and then recursively list all files. If the file exists then
remove from the set. All remaining files in the set are then considered
orphaned.
 
Looking for feedback and if the second approach is preferred I can make
the changes

# Related Issue(s)
- closes delta-io#1092

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
2 people authored and chitralverma committed Mar 17, 2023
1 parent 4afdaef commit 3d25da0
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 3 deletions.
7 changes: 6 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,11 @@ pub enum DeltaOperation {
predicate: Option<String>,
/// Target optimize size
target_size: DeltaDataTypeLong,
}, // TODO: Add more operations
},
#[serde(rename_all = "camelCase")]
/// Represents a `FileSystemCheck` operation
FileSystemCheck {},
// TODO: Add more operations
}

impl DeltaOperation {
Expand All @@ -502,6 +506,7 @@ impl DeltaOperation {
DeltaOperation::Write { .. } => "delta-rs.Write",
DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate",
DeltaOperation::Optimize { .. } => "delta-rs.Optimize",
DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck",
};
commit_info.insert(
"operation".to_string(),
Expand Down
221 changes: 221 additions & 0 deletions rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them.
//!
//! Active files are ones that have an add action in the log, but no corresponding remove action.
//! This operation creates a new transaction containing a remove action for each of the missing files.
//!
//! This can be used to repair tables where a data file has been deleted accidentally or
//! purposefully, if the file was corrupted.
//!
//! # Example
//! ```rust ignore
//! let mut table = open_table("../path/to/table")?;
//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(), table.state).await?;
//! ````
use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::commit;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::DeltaDataTypeVersion;
use crate::{DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError};
use futures::future::BoxFuture;
use futures::StreamExt;
pub use object_store::path::Path;
use object_store::ObjectStore;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use url::{ParseError, Url};

/// Audit the Delta Table's active files with the underlying file system.
/// See this module's documentaiton for more information
#[derive(Debug)]
pub struct FileSystemCheckBuilder {
/// A snapshot of the to-be-checked table's state
state: DeltaTableState,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Don't remove actions to the table log. Just determine which files can be removed
dry_run: bool,
}

/// Details of the FSCK operation including which files were removed from the log
#[derive(Debug)]
pub struct FileSystemCheckMetrics {
/// Was this a dry run
pub dry_run: bool,
/// Files that wrere removed successfully
pub files_removed: Vec<String>,
}

struct FileSystemCheckPlan {
/// Version of the snapshot provided
version: DeltaDataTypeVersion,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// Files that no longer exists in undlying ObjectStore but have active add actions
pub files_to_remove: Vec<Add>,
}

fn is_absolute_path(path: &str) -> DeltaResult<bool> {
match Url::parse(path) {
Ok(_) => Ok(true),
Err(ParseError::RelativeUrlWithoutBase) => Ok(false),
Err(_) => Err(DeltaTableError::Generic(format!(
"Unable to parse path: {}",
&path
))),
}
}

impl FileSystemCheckBuilder {
/// Create a new [`FileSystemCheckBuilder`]
pub fn new(store: Arc<DeltaObjectStore>, state: DeltaTableState) -> Self {
FileSystemCheckBuilder {
state,
store,
dry_run: false,
}
}

/// Only determine which add actions should be removed. A dry run will not commit actions to the Delta log
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}

async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_relative: HashMap<&str, &Add> =
HashMap::with_capacity(self.state.files().len());
let version = self.state.version();
let store = self.store.clone();

for active in self.state.files() {
if is_absolute_path(&active.path)? {
return Err(DeltaTableError::Generic(
"Filesystem check does not support absolute paths".to_string(),
));
} else {
files_relative.insert(&active.path, active);
}
}

let mut files = self.store.list(None).await?;
while let Some(result) = files.next().await {
let file = result?;
files_relative.remove(file.location.as_ref());

if files_relative.is_empty() {
break;
}
}

let files_to_remove: Vec<Add> = files_relative
.into_values()
.map(|file| file.to_owned())
.collect();

Ok(FileSystemCheckPlan {
files_to_remove,
version,
store,
})
}
}

impl FileSystemCheckPlan {
pub async fn execute(self) -> DeltaResult<FileSystemCheckMetrics> {
if self.files_to_remove.is_empty() {
return Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: Vec::new(),
});
}

let mut actions = Vec::with_capacity(self.files_to_remove.len());
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
let version = self.version;
let store = &self.store;

for file in self.files_to_remove {
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let deletion_time = deletion_time.as_millis() as DeltaDataTypeLong;
removed_file_paths.push(file.path.clone());
actions.push(Action::remove(Remove {
path: file.path,
deletion_timestamp: Some(deletion_time),
data_change: true,
extended_file_metadata: None,
partition_values: Some(file.partition_values),
size: Some(file.size),
tags: file.tags,
}));
}

commit(
store,
version + 1,
actions,
DeltaOperation::FileSystemCheck {},
None,
)
.await?;

Ok(FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
})
}
}

impl std::future::IntoFuture for FileSystemCheckBuilder {
type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
let plan = this.create_fsck_plan().await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.store, this.state),
FileSystemCheckMetrics {
files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(),
dry_run: true,
},
));
}

let metrics = plan.execute().await?;
let mut table = DeltaTable::new_with_state(this.store, this.state);
table.update().await?;
Ok((table, metrics))
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn absolute_path() {
assert!(!is_absolute_path(
"part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"
)
.unwrap());
assert!(!is_absolute_path(
"x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
)
.unwrap());

assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap());
assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap());
assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap());
assert!(is_absolute_path("s3://container/path/file.parquet").unwrap());
assert!(is_absolute_path("gs://container/path/file.parquet").unwrap());
assert!(is_absolute_path("scheme://table/file.parquet").unwrap());
}
}
8 changes: 8 additions & 0 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
//! if the operation returns data as well.

use self::create::CreateBuilder;
use self::filesystem_check::FileSystemCheckBuilder;
use self::vacuum::VacuumBuilder;
use crate::builder::DeltaTableBuilder;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

pub mod create;
pub mod filesystem_check;
pub mod transaction;
pub mod vacuum;

Expand Down Expand Up @@ -115,6 +117,12 @@ impl DeltaOps {
pub fn vacuum(self) -> VacuumBuilder {
VacuumBuilder::new(self.0.object_store(), self.0.state)
}

/// Audit active files with files present on the filesystem
#[must_use]
pub fn filesystem_check(self) -> FileSystemCheckBuilder {
FileSystemCheckBuilder::new(self.0.object_store(), self.0.state)
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
17 changes: 15 additions & 2 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl IntegrationContext {
StorageIntegration::Google => format!("gs://{}", &bucket),
StorageIntegration::Local => format!("file://{}", &bucket),
};

// the "storage_backend" will always point to the root ofg the object store.
// TODO should we provide the store via object_Store builders?
let store = match integration {
Expand Down Expand Up @@ -89,6 +88,12 @@ impl IntegrationContext {
}
}

pub fn table_builder(&self, table: TestTables) -> DeltaTableBuilder {
let name = table.as_name();
let table_uri = format!("{}/{}", self.root_uri(), &name);
DeltaTableBuilder::from_uri(table_uri).with_allow_http(true)
}

pub fn uri_for_table(&self, table: TestTables) -> String {
format!("{}/{}", self.root_uri(), table.as_name())
}
Expand Down Expand Up @@ -186,6 +191,7 @@ pub enum TestTables {
Simple,
SimpleCommit,
Golden,
Delta0_8_0Partitioned,
Custom(String),
}

Expand All @@ -204,6 +210,11 @@ impl TestTables {
.to_str()
.unwrap()
.to_owned(),
Self::Delta0_8_0Partitioned => data_path
.join("delta-0.8.0-partitioned")
.to_str()
.unwrap()
.to_owned(),
// the data path for upload does not apply to custom tables.
Self::Custom(_) => todo!(),
}
Expand All @@ -214,12 +225,14 @@ impl TestTables {
Self::Simple => "simple".into(),
Self::SimpleCommit => "simple_commit".into(),
Self::Golden => "golden".into(),
Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(),
Self::Custom(name) => name.to_owned(),
}
}
}

fn set_env_if_not_set(key: impl AsRef<str>, value: impl AsRef<str>) {
/// Set environment variable if it is not set
pub fn set_env_if_not_set(key: impl AsRef<str>, value: impl AsRef<str>) {
if std::env::var(key.as_ref()).is_err() {
std::env::set_var(key.as_ref(), value.as_ref())
};
Expand Down
Loading

0 comments on commit 3d25da0

Please sign in to comment.