From 4b7cfdbaaa7e157c910c58842b9ff91ce5fd912c Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 14:38:09 +0300 Subject: [PATCH 01/12] cdc delete --- crates/core/src/operations/delete.rs | 51 ++++++++++++++++++++++------ crates/core/src/operations/write.rs | 51 +++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 12 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 79a7062f50..dfd398dc5b 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -21,6 +21,7 @@ use core::panic; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use super::writer::DeltaWriter; use crate::logstore::LogStoreRef; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_plan::filter::FilterExec; @@ -29,18 +30,21 @@ use datafusion::prelude::Expr; use datafusion_common::scalar::ScalarValue; use datafusion_common::DFSchema; use futures::future::BoxFuture; +use object_store::prefix::PrefixStore; use parquet::file::properties::WriterProperties; use serde::Serialize; +use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use super::write::WriterStatsConfig; +use super::write::{write_execution_plan_cdc, write_execution_plan_cdf, WriterStatsConfig}; +use super::writer::WriterConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext, }; use crate::errors::DeltaResult; -use crate::kernel::{Action, Add, Remove}; +use crate::kernel::{Action, Add, AddCDCFile, Remove}; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -130,7 +134,7 @@ async fn excute_non_empty_expr( metrics: &mut DeleteMetrics, rewrite: &[Add], writer_properties: Option, -) -> DeltaResult> { +) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. @@ -160,7 +164,7 @@ async fn excute_non_empty_expr( .map(|v| v.iter().map(|v| v.to_string()).collect::>()), ); - let add_actions = write_execution_plan( + let mut actions: Vec = write_execution_plan( Some(snapshot), state.clone(), filter.clone(), @@ -168,10 +172,10 @@ async fn excute_non_empty_expr( log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), None, - writer_properties, + writer_properties.clone(), false, None, - writer_stats_config, + writer_stats_config.clone(), None, ) .await? @@ -179,8 +183,34 @@ async fn excute_non_empty_expr( .map(|a| match a { Action::Add(a) => a, _ => panic!("Expected Add action"), - }) - .collect::>(); + }).into_iter().map(Action::Add).collect(); + + // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column + match should_write_cdc(&snapshot) { + Ok(true) => { + let cdc_predicate_expr = + state.create_physical_expr(expression.clone(), &input_dfschema)?; + let cdc_scan: Arc = + Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?); + let cdc_actions = write_execution_plan_cdc( + Some(snapshot), + state.clone(), + cdc_scan.clone(), + table_partition_cols.clone(), + log_store.object_store(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties, + false, + None, + writer_stats_config, + None, + ) + .await?; + actions.extend(cdc_actions) + } + _ => (), + }; let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows()); let filter_records = filter.metrics().and_then(|m| m.output_rows()); @@ -189,7 +219,7 @@ async fn excute_non_empty_expr( .zip(filter_records) .map(|(read, filter)| read - filter); - Ok(add_actions) + Ok(actions) } async fn execute( @@ -209,7 +239,7 @@ async fn execute( let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); - let add = if candidates.partition_scan { + let mut actions = if candidates.partition_scan { Vec::new() } else { let write_start = Instant::now(); @@ -233,7 +263,6 @@ async fn execute( .unwrap() .as_millis() as i64; - let mut actions: Vec = add.into_iter().map(Action::Add).collect(); metrics.num_removed_files = remove.len(); metrics.num_added_files = actions.len(); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 0606707c19..32b1304a7f 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -40,6 +40,7 @@ use datafusion_common::DFSchema; use datafusion_expr::Expr; use futures::future::BoxFuture; use futures::StreamExt; +use object_store::prefix::PrefixStore; use parquet::file::properties::WriterProperties; use tracing::log::*; @@ -52,7 +53,7 @@ use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType}; +use crate::kernel::{Action, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType}; use crate::logstore::LogStoreRef; use crate::operations::cast::{cast_record_batch, merge_schema}; use crate::protocol::{DeltaOperation, SaveMode}; @@ -461,6 +462,54 @@ async fn write_execution_plan_with_predicate( Ok(actions) } +pub(crate) async fn write_execution_plan_cdc( + snapshot: Option<&DeltaTableState>, + state: SessionState, + plan: Arc, + partition_columns: Vec, + object_store: ObjectStoreRef, + target_file_size: Option, + write_batch_size: Option, + writer_properties: Option, + safe_cast: bool, + schema_mode: Option, + writer_stats_config: WriterStatsConfig, + sender: Option>, +) -> DeltaResult> { + let cdc_store = Arc::new(PrefixStore::new( + object_store, + "_change_data", + )); + Ok(write_execution_plan( + snapshot, + state, + plan, + partition_columns, + cdc_store, + target_file_size, + write_batch_size, + writer_properties, + safe_cast, + schema_mode, + writer_stats_config, + sender).await?.into_iter().map(|add| { + // Modify add actions into CDC actions + match add { + Action::Add(add) => { Action::Cdc(AddCDCFile { + // This is a gnarly hack, but the action needs the nested path, not the + // path isnide the prefixed store + path: format!("_change_data/{}", add.path), + size: add.size, + partition_values: add.partition_values, + data_change: false, + tags: add.tags, + })}, + _ => panic!("Expected Add action"), + } + }).collect::>()) +} + + #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan( snapshot: Option<&DeltaTableState>, From 3567d125e848dd19c2da5406bd553e76245dc6ee Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 14:39:04 +0300 Subject: [PATCH 02/12] fmt --- crates/core/src/operations/delete.rs | 5 ++- crates/core/src/operations/write.rs | 59 +++++++++++++++------------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index dfd398dc5b..11c1399324 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -183,7 +183,10 @@ async fn excute_non_empty_expr( .map(|a| match a { Action::Add(a) => a, _ => panic!("Expected Add action"), - }).into_iter().map(Action::Add).collect(); + }) + .into_iter() + .map(Action::Add) + .collect(); // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column match should_write_cdc(&snapshot) { diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 32b1304a7f..92f90393e1 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -476,40 +476,43 @@ pub(crate) async fn write_execution_plan_cdc( writer_stats_config: WriterStatsConfig, sender: Option>, ) -> DeltaResult> { - let cdc_store = Arc::new(PrefixStore::new( - object_store, - "_change_data", - )); + let cdc_store = Arc::new(PrefixStore::new(object_store, "_change_data")); Ok(write_execution_plan( - snapshot, - state, - plan, + snapshot, + state, + plan, partition_columns, cdc_store, - target_file_size, - write_batch_size, - writer_properties, - safe_cast, - schema_mode, - writer_stats_config, - sender).await?.into_iter().map(|add| { - // Modify add actions into CDC actions - match add { - Action::Add(add) => { Action::Cdc(AddCDCFile { - // This is a gnarly hack, but the action needs the nested path, not the - // path isnide the prefixed store - path: format!("_change_data/{}", add.path), - size: add.size, - partition_values: add.partition_values, - data_change: false, - tags: add.tags, - })}, - _ => panic!("Expected Add action"), + target_file_size, + write_batch_size, + writer_properties, + safe_cast, + schema_mode, + writer_stats_config, + sender, + ) + .await? + .into_iter() + .map(|add| { + // Modify add actions into CDC actions + match add { + Action::Add(add) => { + Action::Cdc(AddCDCFile { + // This is a gnarly hack, but the action needs the nested path, not the + // path isnide the prefixed store + path: format!("_change_data/{}", add.path), + size: add.size, + partition_values: add.partition_values, + data_change: false, + tags: add.tags, + }) } - }).collect::>()) + _ => panic!("Expected Add action"), + } + }) + .collect::>()) } - #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan( snapshot: Option<&DeltaTableState>, From 3f0e6bc57fd24f5122b27c073301d3b2ad7f80c9 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 15:35:52 +0300 Subject: [PATCH 03/12] add _change_type projection --- crates/core/src/operations/delete.rs | 51 ++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 11c1399324..b85aafe85f 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -17,34 +17,39 @@ //! .await?; //! ```` -use core::panic; -use std::sync::Arc; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; - -use super::writer::DeltaWriter; use crate::logstore::LogStoreRef; +use core::panic; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::Expr; use datafusion_common::scalar::ScalarValue; use datafusion_common::DFSchema; +use datafusion_expr::lit; +use datafusion_physical_expr::{ + expressions::{self}, + PhysicalExpr, +}; use futures::future::BoxFuture; -use object_store::prefix::PrefixStore; +use std::iter; +use std::sync::Arc; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + use parquet::file::properties::WriterProperties; use serde::Serialize; use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use super::write::{write_execution_plan_cdc, write_execution_plan_cdf, WriterStatsConfig}; -use super::writer::WriterConfig; +use super::write::{write_execution_plan_cdc, WriterStatsConfig}; + use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext, }; use crate::errors::DeltaResult; -use crate::kernel::{Action, Add, AddCDCFile, Remove}; +use crate::kernel::{Action, Add, Remove}; use crate::operations::write::write_execution_plan; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -191,14 +196,40 @@ async fn excute_non_empty_expr( // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column match should_write_cdc(&snapshot) { Ok(true) => { + // Create CDC scan let cdc_predicate_expr = state.create_physical_expr(expression.clone(), &input_dfschema)?; let cdc_scan: Arc = Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?); + + // Add literal column "_change_type" + let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string()))); + let change_type_expr = state.create_physical_expr(change_type_lit, &input_dfschema)?; + + // Project columns and lit + let project_expressions: Vec<(Arc, String)> = scan + .schema() + .fields() + .into_iter() + .enumerate() + .map(|(idx, field)| -> (Arc, String) { + ( + Arc::new(expressions::Column::new(field.name(), idx)), + field.name().to_owned(), + ) + }) + .chain(iter::once((change_type_expr, "_change_type".to_owned()))) + .collect(); + + let projected_scan: Arc = Arc::new(ProjectionExec::try_new( + project_expressions, + cdc_scan.clone(), + )?); + let cdc_actions = write_execution_plan_cdc( Some(snapshot), state.clone(), - cdc_scan.clone(), + projected_scan.clone(), table_partition_cols.clone(), log_store.object_store(), Some(snapshot.table_config().target_file_size() as usize), From 2e1588982500f9a46b90f335f59e823d05f91e41 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 15:53:03 +0300 Subject: [PATCH 04/12] allow schema of plan to be used --- crates/core/src/operations/delete.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index b85aafe85f..c086b541a0 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -50,7 +50,7 @@ use crate::delta_datafusion::{ }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; -use crate::operations::write::write_execution_plan; +use crate::operations::write::{write_execution_plan, SchemaMode}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -236,7 +236,7 @@ async fn excute_non_empty_expr( None, writer_properties, false, - None, + Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col writer_stats_config, None, ) From f8fdd0d5d97d37567c349008590d0c634c2dd21e Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:03:59 +0300 Subject: [PATCH 05/12] add test, also write CDC when it was partition_scan --- crates/core/src/operations/delete.rs | 89 +++++++++++++++------------- python/tests/test_cdf.py | 45 +++++++++++++- 2 files changed, 91 insertions(+), 43 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index c086b541a0..7cff8b98d4 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -139,13 +139,14 @@ async fn excute_non_empty_expr( metrics: &mut DeleteMetrics, rewrite: &[Add], writer_properties: Option, + partition_scan: bool, ) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. + let mut actions: Vec = Vec::new(); let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let table_partition_cols = snapshot.metadata().partition_columns.clone(); let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) @@ -154,13 +155,6 @@ async fn excute_non_empty_expr( .await?; let scan = Arc::new(scan); - // Apply the negation of the filter and rewrite files - let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - - let predicate_expr = state.create_physical_expr(negated_expression, &input_dfschema)?; - let filter: Arc = - Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); - let writer_stats_config = WriterStatsConfig::new( snapshot.table_config().num_indexed_cols(), snapshot @@ -169,29 +163,47 @@ async fn excute_non_empty_expr( .map(|v| v.iter().map(|v| v.to_string()).collect::>()), ); - let mut actions: Vec = write_execution_plan( - Some(snapshot), - state.clone(), - filter.clone(), - table_partition_cols.clone(), - log_store.object_store(), - Some(snapshot.table_config().target_file_size() as usize), - None, - writer_properties.clone(), - false, - None, - writer_stats_config.clone(), - None, - ) - .await? - .into_iter() - .map(|a| match a { - Action::Add(a) => a, - _ => panic!("Expected Add action"), - }) - .into_iter() - .map(Action::Add) - .collect(); + if !partition_scan { + // Apply the negation of the filter and rewrite files + let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); + + let predicate_expr = state.create_physical_expr(negated_expression, &input_dfschema)?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + + let add_actions: Vec = write_execution_plan( + Some(snapshot), + state.clone(), + filter.clone(), + table_partition_cols.clone(), + log_store.object_store(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties.clone(), + false, + None, + writer_stats_config.clone(), + None, + ) + .await? + .into_iter() + .map(|a| match a { + Action::Add(a) => a, + _ => panic!("Expected Add action"), + }) + .into_iter() + .map(Action::Add) + .collect(); + + actions.extend(add_actions); + + let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows()); + let filter_records = filter.metrics().and_then(|m| m.output_rows()); + metrics.num_copied_rows = filter_records; + metrics.num_deleted_rows = read_records + .zip(filter_records) + .map(|(read, filter)| read - filter); + } // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column match should_write_cdc(&snapshot) { @@ -226,6 +238,8 @@ async fn excute_non_empty_expr( cdc_scan.clone(), )?); + dbg!(table_partition_cols.clone()); + let cdc_actions = write_execution_plan_cdc( Some(snapshot), state.clone(), @@ -245,14 +259,6 @@ async fn excute_non_empty_expr( } _ => (), }; - - let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows()); - let filter_records = filter.metrics().and_then(|m| m.output_rows()); - metrics.num_copied_rows = filter_records; - metrics.num_deleted_rows = read_records - .zip(filter_records) - .map(|(read, filter)| read - filter); - Ok(actions) } @@ -273,9 +279,7 @@ async fn execute( let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); - let mut actions = if candidates.partition_scan { - Vec::new() - } else { + let mut actions = { let write_start = Instant::now(); let add = excute_non_empty_expr( &snapshot, @@ -285,6 +289,7 @@ async fn execute( &mut metrics, &candidates.candidates, writer_properties, + candidates.partition_scan, ) .await?; metrics.rewrite_time_ms = Instant::now().duration_since(write_start).as_millis() as u64; diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index 2292d85cdb..92b23ad77f 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -1,6 +1,11 @@ +import os from datetime import date, datetime -from deltalake import DeltaTable +import pyarrow as pa +import pyarrow.compute as pc +import pyarrow.parquet as pq + +from deltalake import DeltaTable, write_deltalake def test_read_cdf_partitioned(): @@ -418,3 +423,41 @@ def test_read_cdf_partitioned_projection(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") columns = ["id", "_change_type", "_commit_version"] assert columns == dt.load_cdf(0, 3, columns=columns).schema.names + + +def test_delete_unpartitioned_cdf(tmp_path, sample_data: pa.Table): + cdc_path = f"{tmp_path}/_change_data" + + write_deltalake(tmp_path, sample_data, mode='append', configuration={"delta.enableChangeDataFeed":"true"}) + dt = DeltaTable(tmp_path) + dt.delete("int64 > 2") + + expected_data = sample_data.filter(pc.field('int64')>2).append_column(field_=pa.field("_change_type", pa.string(), nullable=False), column=[["delete"]*2]) + cdc_data = pq.read_table(cdc_path) + + assert os.path.exists(cdc_path), "_change_data doesn't exist" + assert cdc_data == expected_data + + +def test_delete_partitioned_cdf(tmp_path, sample_data: pa.Table): + cdc_path = f"{tmp_path}/_change_data" + + write_deltalake(tmp_path, + sample_data, + mode='overwrite', + partition_by=["utf8"], + configuration={"delta.enableChangeDataFeed":"true"}) + dt = DeltaTable(tmp_path) + dt.delete("int64 > 2") + + # expected_data = sample_data.filter(pc.field('int64')>2).append_column(field_=pa.field("_change_type", pa.string(), nullable=False), column=[["delete"]*2]) + # cdc_data = pq.read_table(cdc_path) + + # assert os.path.exists(cdc_path), "_change_data doesn't exist" + # assert len(os.listdir(cdc_path)) == 2 + # assert cdc_data == expected_data + + + + + \ No newline at end of file From 34a940416222328a2b0621d27b2d3ad9bc2b36bf Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 20:30:23 +0300 Subject: [PATCH 06/12] rm dbg --- crates/core/src/operations/delete.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 7cff8b98d4..0fcf05e764 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -238,8 +238,6 @@ async fn excute_non_empty_expr( cdc_scan.clone(), )?); - dbg!(table_partition_cols.clone()); - let cdc_actions = write_execution_plan_cdc( Some(snapshot), state.clone(), From e1854323153bfa33df0ecb9862789ad5bbe1e075 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 22:35:09 +0200 Subject: [PATCH 07/12] fix, don't use wrapped partitions, since we also write with CDC later --- crates/core/src/operations/delete.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 0fcf05e764..dcab0ab2c9 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -148,9 +148,11 @@ async fn excute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; let table_partition_cols = snapshot.metadata().partition_columns.clone(); - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) .with_files(rewrite) + // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions + // Since it can't fetch a scalar from a dictionary type + .with_schema(snapshot.input_schema()?) .build() .await?; let scan = Arc::new(scan); From d58a6be1c3e20bc5b1f2f0fa246fd7bdeab59449 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 22:55:16 +0200 Subject: [PATCH 08/12] add rust test --- crates/core/src/operations/delete.rs | 177 +++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index dcab0ab2c9..9f16be09bb 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -395,6 +395,9 @@ impl std::future::IntoFuture for DeleteBuilder { #[cfg(test)] mod tests { + use crate::delta_datafusion::cdf::DeltaCdfScan; + use crate::kernel::DataType as DeltaDataType; + use crate::operations::collect_sendable_stream; use crate::operations::DeltaOps; use crate::protocol::*; use crate::writer::test_utils::datafusion::get_data; @@ -408,11 +411,15 @@ mod tests { use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::ArrayRef; + use arrow_array::StringArray; use arrow_array::StructArray; use arrow_buffer::NullBuffer; + use arrow_schema::DataType; use arrow_schema::Fields; use datafusion::assert_batches_sorted_eq; + use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; + use delta_kernel::schema::PrimitiveType; use serde_json::json; use std::sync::Arc; @@ -868,4 +875,174 @@ mod tests { .await; assert!(res.is_err()); } + + #[tokio::test] + async fn test_delete_cdc_enabled() { + let table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + arrow::datatypes::DataType::Int32, + true, + )])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))], + ) + .unwrap(); + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + + let (table, _metrics) = DeltaOps(table) + .delete() + .with_predicate(col("value").eq(lit(2))) + .await + .unwrap(); + assert_eq!(table.version(), 2); + + let ctx = SessionContext::new(); + let table = DeltaOps(table) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table, + ctx, + ) + .await + .expect("Failed to collect batches"); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect(); + + assert_batches_sorted_eq! {[ + "+-------+--------------+-----------------+", + "| value | _change_type | _commit_version |", + "+-------+--------------+-----------------+", + "| 1 | insert | 1 |", + "| 2 | delete | 2 |", + "| 2 | insert | 1 |", + "| 3 | insert | 1 |", + "+-------+--------------+-----------------+", + ], &batches } + } + + #[tokio::test] + async fn test_delete_cdc_enabled_partitioned() { + let table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_column( + "year", + DeltaDataType::Primitive(PrimitiveType::String), + true, + None, + ) + .with_column( + "value", + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + None, + ) + .with_partition_columns(vec!["year"]) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let schema = Arc::new(Schema::new(vec![ + Field::new("year", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(StringArray::from(vec![ + Some("2020"), + Some("2020"), + Some("2024"), + ])), + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + + let (table, _metrics) = DeltaOps(table) + .delete() + .with_predicate(col("value").eq(lit(2))) + .await + .unwrap(); + assert_eq!(table.version(), 2); + + let ctx = SessionContext::new(); + let table = DeltaOps(table) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table, + ctx, + ) + .await + .expect("Failed to collect batches"); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect(); + + assert_batches_sorted_eq! {[ + "+-------+--------------+-----------------+------+", + "| value | _change_type | _commit_version | year |", + "+-------+--------------+-----------------+------+", + "| 1 | insert | 1 | 2020 |", + "| 2 | delete | 2 | 2020 |", + "| 2 | insert | 1 | 2020 |", + "| 3 | insert | 1 | 2024 |", + "+-------+--------------+-----------------+------+", + ], &batches } + } + + async fn collect_batches( + num_partitions: usize, + stream: DeltaCdfScan, + ctx: SessionContext, + ) -> Result, Box> { + let mut batches = vec![]; + for p in 0..num_partitions { + let data: Vec = + collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?; + batches.extend_from_slice(&data); + } + Ok(batches) + } } From b9529ceac4d778f32abed7e97f75d411db887694 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 23:04:02 +0200 Subject: [PATCH 09/12] finish python tests --- python/tests/test_cdf.py | 58 ++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index 92b23ad77f..2774776eb9 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -427,37 +427,49 @@ def test_read_cdf_partitioned_projection(): def test_delete_unpartitioned_cdf(tmp_path, sample_data: pa.Table): cdc_path = f"{tmp_path}/_change_data" - - write_deltalake(tmp_path, sample_data, mode='append', configuration={"delta.enableChangeDataFeed":"true"}) + + write_deltalake( + tmp_path, + sample_data, + mode="append", + configuration={"delta.enableChangeDataFeed": "true"}, + ) dt = DeltaTable(tmp_path) dt.delete("int64 > 2") - - expected_data = sample_data.filter(pc.field('int64')>2).append_column(field_=pa.field("_change_type", pa.string(), nullable=False), column=[["delete"]*2]) + + expected_data = sample_data.filter(pc.field("int64") > 2).append_column( + field_=pa.field("_change_type", pa.string(), nullable=False), + column=[["delete"] * 2], + ) cdc_data = pq.read_table(cdc_path) - + assert os.path.exists(cdc_path), "_change_data doesn't exist" assert cdc_data == expected_data - + def test_delete_partitioned_cdf(tmp_path, sample_data: pa.Table): cdc_path = f"{tmp_path}/_change_data" - - write_deltalake(tmp_path, - sample_data, - mode='overwrite', - partition_by=["utf8"], - configuration={"delta.enableChangeDataFeed":"true"}) + + write_deltalake( + tmp_path, + sample_data, + mode="overwrite", + partition_by=["utf8"], + configuration={"delta.enableChangeDataFeed": "true"}, + ) dt = DeltaTable(tmp_path) dt.delete("int64 > 2") - - # expected_data = sample_data.filter(pc.field('int64')>2).append_column(field_=pa.field("_change_type", pa.string(), nullable=False), column=[["delete"]*2]) - # cdc_data = pq.read_table(cdc_path) - - # assert os.path.exists(cdc_path), "_change_data doesn't exist" - # assert len(os.listdir(cdc_path)) == 2 - # assert cdc_data == expected_data - - - - \ No newline at end of file + expected_data = sample_data.filter(pc.field("int64") > 2).append_column( + field_=pa.field("_change_type", pa.string(), nullable=False), + column=[["delete"] * 2], + ) + table_schema = dt.schema().to_pyarrow() + table_schema = table_schema.insert( + len(table_schema), pa.field("_change_type", pa.string(), nullable=False) + ) + cdc_data = pq.read_table(cdc_path, schema=table_schema) + + assert os.path.exists(cdc_path), "_change_data doesn't exist" + assert len(os.listdir(cdc_path)) == 2 + assert cdc_data == expected_data From e4a35c34b9681a4f44d3806875fa4cd8f763d086 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 23:13:38 +0200 Subject: [PATCH 10/12] clippy --- crates/core/src/operations/delete.rs | 109 ++++++++++++--------------- crates/core/src/operations/write.rs | 1 + 2 files changed, 50 insertions(+), 60 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 9f16be09bb..feca7a1ad3 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -131,6 +131,7 @@ impl DeleteBuilder { } } +#[allow(clippy::too_many_arguments)] async fn excute_non_empty_expr( snapshot: &DeltaTableState, log_store: LogStoreRef, @@ -148,7 +149,7 @@ async fn excute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; let table_partition_cols = snapshot.metadata().partition_columns.clone(); - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) .with_files(rewrite) // Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions // Since it can't fetch a scalar from a dictionary type @@ -187,15 +188,7 @@ async fn excute_non_empty_expr( writer_stats_config.clone(), None, ) - .await? - .into_iter() - .map(|a| match a { - Action::Add(a) => a, - _ => panic!("Expected Add action"), - }) - .into_iter() - .map(Action::Add) - .collect(); + .await?; actions.extend(add_actions); @@ -208,56 +201,52 @@ async fn excute_non_empty_expr( } // CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column - match should_write_cdc(&snapshot) { - Ok(true) => { - // Create CDC scan - let cdc_predicate_expr = - state.create_physical_expr(expression.clone(), &input_dfschema)?; - let cdc_scan: Arc = - Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?); - - // Add literal column "_change_type" - let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string()))); - let change_type_expr = state.create_physical_expr(change_type_lit, &input_dfschema)?; - - // Project columns and lit - let project_expressions: Vec<(Arc, String)> = scan - .schema() - .fields() - .into_iter() - .enumerate() - .map(|(idx, field)| -> (Arc, String) { - ( - Arc::new(expressions::Column::new(field.name(), idx)), - field.name().to_owned(), - ) - }) - .chain(iter::once((change_type_expr, "_change_type".to_owned()))) - .collect(); - - let projected_scan: Arc = Arc::new(ProjectionExec::try_new( - project_expressions, - cdc_scan.clone(), - )?); - - let cdc_actions = write_execution_plan_cdc( - Some(snapshot), - state.clone(), - projected_scan.clone(), - table_partition_cols.clone(), - log_store.object_store(), - Some(snapshot.table_config().target_file_size() as usize), - None, - writer_properties, - false, - Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col - writer_stats_config, - None, - ) - .await?; - actions.extend(cdc_actions) - } - _ => (), + if let Ok(true) = should_write_cdc(&snapshot) { + // Create CDC scan + let cdc_predicate_expr = state.create_physical_expr(expression.clone(), &input_dfschema)?; + let cdc_scan: Arc = + Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?); + + // Add literal column "_change_type" + let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string()))); + let change_type_expr = state.create_physical_expr(change_type_lit, &input_dfschema)?; + + // Project columns and lit + let project_expressions: Vec<(Arc, String)> = scan + .schema() + .fields() + .into_iter() + .enumerate() + .map(|(idx, field)| -> (Arc, String) { + ( + Arc::new(expressions::Column::new(field.name(), idx)), + field.name().to_owned(), + ) + }) + .chain(iter::once((change_type_expr, "_change_type".to_owned()))) + .collect(); + + let projected_scan: Arc = Arc::new(ProjectionExec::try_new( + project_expressions, + cdc_scan.clone(), + )?); + + let cdc_actions = write_execution_plan_cdc( + Some(snapshot), + state.clone(), + projected_scan.clone(), + table_partition_cols.clone(), + log_store.object_store(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties, + false, + Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col + writer_stats_config, + None, + ) + .await?; + actions.extend(cdc_actions) }; Ok(actions) } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 92f90393e1..70ccc8c14e 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -462,6 +462,7 @@ async fn write_execution_plan_with_predicate( Ok(actions) } +#[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_cdc( snapshot: Option<&DeltaTableState>, state: SessionState, From e313ed9bc3402103903bc482f64cef8c95b705ac Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 23:26:28 +0200 Subject: [PATCH 11/12] remove rewrite_ms assert --- crates/core/src/operations/delete.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index feca7a1ad3..da8b527be2 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -483,9 +483,6 @@ mod tests { // serde_json::to_value(&metrics).unwrap() // ); - // rewrite is not required - assert_eq!(metrics.rewrite_time_ms, 0); - // Deletes with no changes to state must not commit let (table, metrics) = DeltaOps(table).delete().await.unwrap(); assert_eq!(table.version(), 2); @@ -721,7 +718,6 @@ mod tests { assert_eq!(metrics.num_deleted_rows, None); assert_eq!(metrics.num_copied_rows, None); assert!(metrics.scan_time_ms > 0); - assert_eq!(metrics.rewrite_time_ms, 0); let expected = vec![ "+----+-------+------------+", From 68e7b1c17803e8d4392f01a8207b205f09435613 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 1 Aug 2024 23:48:05 +0200 Subject: [PATCH 12/12] pyarrow 8 test compat --- python/tests/test_cdf.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index 2774776eb9..6d02158c10 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -3,6 +3,7 @@ import pyarrow as pa import pyarrow.compute as pc +import pyarrow.dataset as ds import pyarrow.parquet as pq from deltalake import DeltaTable, write_deltalake @@ -437,9 +438,13 @@ def test_delete_unpartitioned_cdf(tmp_path, sample_data: pa.Table): dt = DeltaTable(tmp_path) dt.delete("int64 > 2") - expected_data = sample_data.filter(pc.field("int64") > 2).append_column( - field_=pa.field("_change_type", pa.string(), nullable=False), - column=[["delete"] * 2], + expected_data = ( + ds.dataset(sample_data) + .to_table(filter=(pc.field("int64") > 2)) + .append_column( + field_=pa.field("_change_type", pa.string(), nullable=False), + column=[["delete"] * 2], + ) ) cdc_data = pq.read_table(cdc_path) @@ -460,9 +465,13 @@ def test_delete_partitioned_cdf(tmp_path, sample_data: pa.Table): dt = DeltaTable(tmp_path) dt.delete("int64 > 2") - expected_data = sample_data.filter(pc.field("int64") > 2).append_column( - field_=pa.field("_change_type", pa.string(), nullable=False), - column=[["delete"] * 2], + expected_data = ( + ds.dataset(sample_data) + .to_table(filter=(pc.field("int64") > 2)) + .append_column( + field_=pa.field("_change_type", pa.string(), nullable=False), + column=[["delete"] * 2], + ) ) table_schema = dt.schema().to_pyarrow() table_schema = table_schema.insert(