From aa28d730e1d69ed419f2dc22404c5bbab8e98647 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 5 Aug 2024 00:12:18 +0200 Subject: [PATCH] chore: re-enable struct support update cdf --- crates/core/src/delta_datafusion/mod.rs | 8 ++--- crates/core/src/delta_datafusion/planner.rs | 4 +-- crates/core/src/operations/cdc.rs | 37 ++------------------- crates/core/src/operations/delete.rs | 3 +- crates/core/src/operations/update.rs | 4 --- crates/core/src/operations/write.rs | 1 - 6 files changed, 9 insertions(+), 48 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 9cb10b7461..c2b410cb74 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -523,10 +523,10 @@ impl<'a> DeltaScanBuilder<'a> { None => DeltaScanConfigBuilder::new().build(self.snapshot)?, }; - let schema = config - .schema - .clone() - .unwrap_or(self.snapshot.arrow_schema()?); + let schema = match config.schema.clone() { + Some(value) => Ok(value), + None => self.snapshot.arrow_schema(), + }?; let logical_schema = df_logical_schema(self.snapshot, &config)?; diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index 9c31410cec..f0af1092ca 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -41,9 +41,7 @@ pub struct DeltaPlanner { } #[async_trait] -impl QueryPlanner - for DeltaPlanner -{ +impl QueryPlanner for DeltaPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index 75874cd5f9..42a33cbcab 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -15,20 +15,14 @@ use tracing::log::*; /// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files /// associated with commits pub(crate) struct CDCTracker { - schema: SchemaRef, pre_dataframe: DataFrame, post_dataframe: DataFrame, } impl CDCTracker { /// construct - pub(crate) fn new( - schema: SchemaRef, - pre_dataframe: DataFrame, - post_dataframe: DataFrame, - ) -> Self { + pub(crate) fn new(pre_dataframe: DataFrame, post_dataframe: DataFrame) -> Self { Self { - schema, pre_dataframe, post_dataframe, } @@ -44,27 +38,6 @@ impl CDCTracker { let preimage = pre_df.clone().except(post_df.clone())?; let postimage = post_df.except(pre_df)?; - // Create a new schema which represents the input batch along with the CDC - // columns - let fields: Vec> = self.schema.fields().to_vec().clone(); - - let mut has_struct = false; - for field in fields.iter() { - match field.data_type() { - DataType::Struct(_) => { - has_struct = true; - } - DataType::List(_) => { - has_struct = true; - } - _ => {} - } - } - - if has_struct { - warn!("The schema contains a Struct or List type, which unfortunately means a change data file cannot be captured in this release of delta-rs: . The write operation will complete properly, but no CDC data will be generated for schema: {fields:?}"); - } - let preimage = preimage.with_column( "_change_type", lit(ScalarValue::Utf8(Some("update_preimage".to_string()))), @@ -253,7 +226,7 @@ mod tests { Arc::new(MemTable::try_new(schema.clone(), vec![vec![updated_batch]]).unwrap()); let updated_df = ctx.read_table(table_provider_updated).unwrap(); - let tracker = CDCTracker::new(schema, source_df, updated_df); + let tracker = CDCTracker::new(source_df, updated_df); match tracker.collect() { Ok(df) => { @@ -276,8 +249,6 @@ mod tests { } } - // This cannot be re-enabled until DataFrame.except() works: - #[ignore] #[tokio::test] async fn test_sanity_check_with_pure_df() { let nested_schema = Arc::new(Schema::new(vec![ @@ -354,8 +325,6 @@ mod tests { assert_eq!(diff.len(), 1); } - // This cannot be re-enabled until DataFrame.except() works: - #[ignore] #[tokio::test] async fn test_sanity_check_with_struct() { let ctx = SessionContext::new(); @@ -423,7 +392,7 @@ mod tests { Arc::new(MemTable::try_new(schema.clone(), vec![vec![updated_batch]]).unwrap()); let updated_df = ctx.read_table(table_provider_updated).unwrap(); - let tracker = CDCTracker::new(schema, source_df, updated_df); + let tracker = CDCTracker::new(source_df, updated_df); match tracker.collect() { Ok(df) => { diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 6e9ad5d725..692c1b303b 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -43,7 +43,6 @@ use serde::Serialize; use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ @@ -52,7 +51,7 @@ use crate::delta_datafusion::{ }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; -use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, SchemaMode}; +use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::{DeltaTable, DeltaTableError}; diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index d34a4986dc..2a947f486f 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -292,9 +292,6 @@ async fn execute( .with_files(candidates.candidates.clone()), ); - // Create a projection for a new column with the predicate evaluated - let input_schema = snapshot.input_schema()?; - let target_provider = provider_as_source(target_provider); let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; @@ -349,7 +346,6 @@ async fn execute( ); let tracker = CDCTracker::new( - input_schema.clone(), df, updated_df.drop_columns(&vec![UPDATE_PREDICATE_COLNAME])?, ); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index c3b8781708..923eadeeaf 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -693,7 +693,6 @@ pub(crate) async fn execute_non_empty_expr_cdc( None, writer_properties, false, - Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col writer_stats_config, None, )