diff --git a/Cargo.toml b/Cargo.toml index f8e25f1120..5932f52087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,14 +45,14 @@ object_store = { version = "0.10.1" } parquet = { version = "52" } # datafusion -datafusion = { version = "39" } -datafusion-expr = { version = "39" } -datafusion-common = { version = "39" } -datafusion-proto = { version = "39" } -datafusion-sql = { version = "39" } -datafusion-physical-expr = { version = "39" } -datafusion-functions = { version = "39" } -datafusion-functions-array = { version = "39" } +datafusion = { version = "40" } +datafusion-expr = { version = "40" } +datafusion-common = { version = "40" } +datafusion-proto = { version = "40" } +datafusion-sql = { version = "40" } +datafusion-physical-expr = { version = "40" } +datafusion-functions = { version = "40" } +datafusion-functions-array = { version = "40" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/src/delta_datafusion/cdf/scan.rs b/crates/core/src/delta_datafusion/cdf/scan.rs index ea34855b77..bd7488899f 100644 --- a/crates/core/src/delta_datafusion/cdf/scan.rs +++ b/crates/core/src/delta_datafusion/cdf/scan.rs @@ -26,6 +26,10 @@ impl DisplayAs for DeltaCdfScan { } impl ExecutionPlan for DeltaCdfScan { + fn name(&self) -> &str { + Self::static_name() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 2d48f7873e..2577d1a1db 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -29,6 +29,7 @@ use std::{ use arrow_schema::DataType; use chrono::{DateTime, NaiveDate}; use datafusion::execution::context::SessionState; +use datafusion::execution::FunctionRegistry; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; use datafusion_expr::{ @@ -107,9 +108,15 @@ pub(crate) fn parse_predicate_expression( })?; let context_provider = DeltaContextProvider { state: df_state }; - let sql_to_rel = + let mut sql_to_rel = SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into()); + // NOTE: This can be probably removed with Datafusion 41 once + // is released + for planner in context_provider.state.expr_planners() { + sql_to_rel = sql_to_rel.with_user_defined_planner(planner.clone()); + } + Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?) } diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 347925f31f..d25d0765ee 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -141,7 +141,7 @@ async fn scan_table_by_files( // Identify which columns we need to project let mut used_columns = expression - .to_columns()? + .column_refs() .into_iter() .map(|column| logical_schema.index_of(&column.name)) .collect::, ArrowError>>()?; diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index eb295912a2..e23a561e5b 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -85,6 +85,10 @@ impl DisplayAs for FindFilesExec { } impl ExecutionPlan for FindFilesExec { + fn name(&self) -> &str { + Self::static_name() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index bff1210fea..760b7380c6 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -61,7 +61,7 @@ use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult, - ToDFSchema, + TableReference, ToDFSchema, }; use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::utils::conjunction; @@ -845,6 +845,10 @@ impl DisplayAs for DeltaScan { } impl ExecutionPlan for DeltaScan { + fn name(&self) -> &str { + Self::static_name() + } + fn as_any(&self) -> &dyn Any { self } @@ -1316,6 +1320,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec { fn try_decode_table_provider( &self, buf: &[u8], + _table_ref: &TableReference, _schema: SchemaRef, _ctx: &SessionContext, ) -> Result, DataFusionError> { @@ -1326,6 +1331,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec { fn try_encode_table_provider( &self, + _table_ref: &TableReference, node: Arc, buf: &mut Vec, ) -> Result<(), DataFusionError> { @@ -1506,7 +1512,7 @@ pub(crate) async fn find_files_scan<'a>( // Identify which columns we need to project let mut used_columns = expression - .to_columns()? + .column_refs() .into_iter() .map(|column| logical_schema.index_of(&column.name)) .collect::, ArrowError>>()?; @@ -1756,9 +1762,8 @@ mod tests { use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::ParquetExec; use datafusion::physical_plan::empty::EmptyExec; - use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor}; + use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr}; use datafusion_expr::lit; - use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; use object_store::path::Path; diff --git a/crates/core/src/delta_datafusion/physical.rs b/crates/core/src/delta_datafusion/physical.rs index bfc220cf86..adbb7fb4fe 100644 --- a/crates/core/src/delta_datafusion/physical.rs +++ b/crates/core/src/delta_datafusion/physical.rs @@ -74,6 +74,10 @@ impl DisplayAs for MetricObserverExec { } impl ExecutionPlan for MetricObserverExec { + fn name(&self) -> &str { + Self::static_name() + } + fn as_any(&self) -> &dyn std::any::Any { self } diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index cc8bff2359..5c4b3be0f5 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -196,6 +196,10 @@ impl DisplayAs for CDCObserver { } impl ExecutionPlan for CDCObserver { + fn name(&self) -> &str { + Self::static_name() + } + fn as_any(&self) -> &dyn std::any::Any { self } diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 04cde87a19..e9b2f8fd00 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -67,6 +67,10 @@ impl MergeBarrierExec { } impl ExecutionPlan for MergeBarrierExec { + fn name(&self) -> &str { + Self::static_name() + } + fn as_any(&self) -> &dyn std::any::Any { self } diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index e2c76e68fd..90b7827575 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -42,6 +42,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { ParserOptions { parse_float_as_decimal: self.options.parse_float_as_decimal, enable_ident_normalization: self.options.enable_ident_normalization, + support_varchar_with_length: false, }, ); planner.statement_to_plan(s)