Skip to content

Commit

Permalink
chore: re-enable struct support update cdf
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 7, 2024
1 parent 73c354f commit aa28d73
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 48 deletions.
8 changes: 4 additions & 4 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,10 @@ impl<'a> DeltaScanBuilder<'a> {
None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
};

let schema = config
.schema
.clone()
.unwrap_or(self.snapshot.arrow_schema()?);
let schema = match config.schema.clone() {
Some(value) => Ok(value),
None => self.snapshot.arrow_schema(),
}?;

let logical_schema = df_logical_schema(self.snapshot, &config)?;

Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/delta_datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ pub struct DeltaPlanner<T: ExtensionPlanner> {
}

#[async_trait]
impl<T: ExtensionPlanner + std::marker::Send + Sync + 'static + Clone> QueryPlanner
for DeltaPlanner<T>
{
impl<T: ExtensionPlanner + Send + Sync + 'static + Clone> QueryPlanner for DeltaPlanner<T> {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand Down
37 changes: 3 additions & 34 deletions crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,14 @@ use tracing::log::*;
/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files
/// associated with commits
pub(crate) struct CDCTracker {
schema: SchemaRef,
pre_dataframe: DataFrame,
post_dataframe: DataFrame,
}

impl CDCTracker {
/// construct
pub(crate) fn new(
schema: SchemaRef,
pre_dataframe: DataFrame,
post_dataframe: DataFrame,
) -> Self {
pub(crate) fn new(pre_dataframe: DataFrame, post_dataframe: DataFrame) -> Self {
Self {
schema,
pre_dataframe,
post_dataframe,
}
Expand All @@ -44,27 +38,6 @@ impl CDCTracker {
let preimage = pre_df.clone().except(post_df.clone())?;
let postimage = post_df.except(pre_df)?;

// Create a new schema which represents the input batch along with the CDC
// columns
let fields: Vec<Arc<Field>> = self.schema.fields().to_vec().clone();

let mut has_struct = false;
for field in fields.iter() {
match field.data_type() {
DataType::Struct(_) => {
has_struct = true;
}
DataType::List(_) => {
has_struct = true;
}
_ => {}
}
}

if has_struct {
warn!("The schema contains a Struct or List type, which unfortunately means a change data file cannot be captured in this release of delta-rs: <https://github.com/delta-io/delta-rs/issues/2568>. The write operation will complete properly, but no CDC data will be generated for schema: {fields:?}");
}

let preimage = preimage.with_column(
"_change_type",
lit(ScalarValue::Utf8(Some("update_preimage".to_string()))),
Expand Down Expand Up @@ -253,7 +226,7 @@ mod tests {
Arc::new(MemTable::try_new(schema.clone(), vec![vec![updated_batch]]).unwrap());
let updated_df = ctx.read_table(table_provider_updated).unwrap();

let tracker = CDCTracker::new(schema, source_df, updated_df);
let tracker = CDCTracker::new(source_df, updated_df);

match tracker.collect() {
Ok(df) => {
Expand All @@ -276,8 +249,6 @@ mod tests {
}
}

// This cannot be re-enabled until DataFrame.except() works: <https://github.com/apache/datafusion/issues/10749>
#[ignore]
#[tokio::test]
async fn test_sanity_check_with_pure_df() {
let nested_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -354,8 +325,6 @@ mod tests {
assert_eq!(diff.len(), 1);
}

// This cannot be re-enabled until DataFrame.except() works: <https://github.com/apache/datafusion/issues/10749>
#[ignore]
#[tokio::test]
async fn test_sanity_check_with_struct() {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -423,7 +392,7 @@ mod tests {
Arc::new(MemTable::try_new(schema.clone(), vec![vec![updated_batch]]).unwrap());
let updated_df = ctx.read_table(table_provider_updated).unwrap();

let tracker = CDCTracker::new(schema, source_df, updated_df);
let tracker = CDCTracker::new(source_df, updated_df);

match tracker.collect() {
Ok(df) => {
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use serde::Serialize;
use super::cdc::should_write_cdc;
use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::WriterStatsConfig;

use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
Expand All @@ -52,7 +51,7 @@ use crate::delta_datafusion::{
};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, SchemaMode};
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig};
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaTable, DeltaTableError};
Expand Down
4 changes: 0 additions & 4 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,6 @@ async fn execute(
.with_files(candidates.candidates.clone()),
);

// Create a projection for a new column with the predicate evaluated
let input_schema = snapshot.input_schema()?;

let target_provider = provider_as_source(target_provider);
let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?;

Expand Down Expand Up @@ -349,7 +346,6 @@ async fn execute(
);

let tracker = CDCTracker::new(
input_schema.clone(),
df,
updated_df.drop_columns(&vec![UPDATE_PREDICATE_COLNAME])?,
);
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,6 @@ pub(crate) async fn execute_non_empty_expr_cdc(
None,
writer_properties,
false,
Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col
writer_stats_config,
None,
)
Expand Down

0 comments on commit aa28d73

Please sign in to comment.