From 95cbb8833fd35266b3184bdb99414dfd769c4f2a Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 1 Dec 2022 08:53:03 +0100 Subject: [PATCH 1/2] feat: check invariants in write command --- Cargo.lock | 10 +++--- rust/src/delta_datafusion.rs | 1 + rust/src/operations/write.rs | 64 ++++++++++++++++++++++++++++++++++-- 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba38bb95f2..2a07db296d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -902,7 +902,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-proto", - "dotenv", + "dotenvy", "dynamodb_lock", "errno", "fs_extra", @@ -942,7 +942,7 @@ dependencies = [ [[package]] name = "deltalake-python" -version = "0.6.3" +version = "0.6.4" dependencies = [ "arrow-schema", "chrono", @@ -1021,10 +1021,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] -name = "dotenv" -version = "0.15.0" +name = "dotenvy" +version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0" [[package]] name = "dynamodb_lock" diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index eac1da7900..3644b25e61 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -695,6 +695,7 @@ fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option } /// Responsible for checking batches of data conform to table's invariants. +#[derive(Clone)] pub struct DeltaDataChecker { invariants: Vec, ctx: SessionContext, diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index c18a1428ff..58b4bc1b8c 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -27,6 +27,7 @@ use super::{transaction::commit, CreateBuilder}; use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode}; use crate::builder::DeltaTableBuilder; use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; +use crate::delta_datafusion::DeltaDataChecker; use crate::schema::Schema; use crate::storage::DeltaObjectStore; use crate::writer::record_batch::divide_by_partition_values; @@ -319,6 +320,12 @@ impl std::future::IntoFuture for WriteBuilder { Err(WriteError::MissingData) }?; + let invariants = table + .get_metadata() + .and_then(|meta| meta.schema.get_invariants()) + .unwrap_or(vec![]); + let checker = DeltaDataChecker::new(invariants); + // Write data to disk let mut tasks = vec![]; for i in 0..plan.output_partitioning().partition_count() { @@ -333,12 +340,14 @@ impl std::future::IntoFuture for WriteBuilder { this.write_batch_size, ); let mut writer = DeltaWriter::new(object_store.clone(), config); - + let checker_stream = checker.clone(); let mut stream = inner_plan.execute(i, task_ctx)?; let handle: tokio::task::JoinHandle>> = tokio::task::spawn(async move { - while let Some(batch) = stream.next().await { - writer.write(&batch?).await?; + while let Some(maybe_batch) = stream.next().await { + let batch = maybe_batch?; + checker_stream.check_batch(&batch).await?; + writer.write(&batch).await?; } writer.close().await }); @@ -429,6 +438,7 @@ mod tests { use super::*; use crate::operations::DeltaOps; use crate::writer::test_utils::{get_delta_schema, get_record_batch}; + use serde_json::json; #[tokio::test] async fn test_create_write() { @@ -503,4 +513,52 @@ mod tests { assert_eq!(table.version(), 0); assert_eq!(table.get_file_uris().count(), 4) } + + #[tokio::test] + async fn test_check_invariants() { + let batch = get_record_batch(None, false); + let schema: Schema = serde_json::from_value(json!({ + "type": "struct", + "fields": [ + {"name": "id", "type": "string", "nullable": true, "metadata": {}}, + {"name": "value", "type": "integer", "nullable": true, "metadata": { + "delta.invariants": "{\"expression\": { \"expression\": \"value < 12\"} }" + }}, + {"name": "modified", "type": "string", "nullable": true, "metadata": {}}, + ] + })) + .unwrap(); + let table = DeltaOps::new_in_memory() + .create() + .with_save_mode(SaveMode::ErrorIfExists) + .with_columns(schema.get_fields().clone()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table = DeltaOps(table).write(vec![batch.clone()]).await.unwrap(); + assert_eq!(table.version(), 1); + + let schema: Schema = serde_json::from_value(json!({ + "type": "struct", + "fields": [ + {"name": "id", "type": "string", "nullable": true, "metadata": {}}, + {"name": "value", "type": "integer", "nullable": true, "metadata": { + "delta.invariants": "{\"expression\": { \"expression\": \"value < 6\"} }" + }}, + {"name": "modified", "type": "string", "nullable": true, "metadata": {}}, + ] + })) + .unwrap(); + let table = DeltaOps::new_in_memory() + .create() + .with_save_mode(SaveMode::ErrorIfExists) + .with_columns(schema.get_fields().clone()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table = DeltaOps(table).write(vec![batch.clone()]).await; + assert!(table.is_err()) + } } From 94261e905888d3bf2583e39d0336eaa6f8931846 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 1 Dec 2022 09:04:08 +0100 Subject: [PATCH 2/2] chore: clippy --- rust/src/operations/write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 58b4bc1b8c..823f1d466d 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -323,7 +323,7 @@ impl std::future::IntoFuture for WriteBuilder { let invariants = table .get_metadata() .and_then(|meta| meta.schema.get_invariants()) - .unwrap_or(vec![]); + .unwrap_or_default(); let checker = DeltaDataChecker::new(invariants); // Write data to disk