Skip to content

Commit

Permalink
chore: upgrade to datafusion datafusion-40
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jul 25, 2024
1 parent 292e3c2 commit 88c102e
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 14 deletions.
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ object_store = { version = "0.10.1" }
parquet = { version = "52" }

# datafusion
datafusion = { version = "39" }
datafusion-expr = { version = "39" }
datafusion-common = { version = "39" }
datafusion-proto = { version = "39" }
datafusion-sql = { version = "39" }
datafusion-physical-expr = { version = "39" }
datafusion-functions = { version = "39" }
datafusion-functions-array = { version = "39" }
datafusion = { version = "40" }
datafusion-expr = { version = "40" }
datafusion-common = { version = "40" }
datafusion-proto = { version = "40" }
datafusion-sql = { version = "40" }
datafusion-physical-expr = { version = "40" }
datafusion-functions = { version = "40" }
datafusion-functions-array = { version = "40" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/delta_datafusion/cdf/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl DisplayAs for DeltaCdfScan {
}

impl ExecutionPlan for DeltaCdfScan {
fn name(&self) -> &str {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
9 changes: 8 additions & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
use arrow_schema::DataType;
use chrono::{DateTime, NaiveDate};
use datafusion::execution::context::SessionState;
use datafusion::execution::FunctionRegistry;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::{
Expand Down Expand Up @@ -107,9 +108,15 @@ pub(crate) fn parse_predicate_expression(
})?;

let context_provider = DeltaContextProvider { state: df_state };
let sql_to_rel =
let mut sql_to_rel =
SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into());

// NOTE: This can be probably removed with Datafusion 41 once
// <https://github.com/apache/datafusion/pull/11485> is released
for planner in context_provider.state.expr_planners() {
sql_to_rel = sql_to_rel.with_user_defined_planner(planner.clone());
}

Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn scan_table_by_files(

// Identify which columns we need to project
let mut used_columns = expression
.to_columns()?
.column_refs()
.into_iter()
.map(|column| logical_schema.index_of(&column.name))
.collect::<std::result::Result<Vec<usize>, ArrowError>>()?;
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/delta_datafusion/find_files/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl DisplayAs for FindFilesExec {
}

impl ExecutionPlan for FindFilesExec {
fn name(&self) -> &str {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
13 changes: 9 additions & 4 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
ToDFSchema,
TableReference, ToDFSchema,
};
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::utils::conjunction;
Expand Down Expand Up @@ -845,6 +845,10 @@ impl DisplayAs for DeltaScan {
}

impl ExecutionPlan for DeltaScan {
fn name(&self) -> &str {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -1316,6 +1320,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec {
fn try_decode_table_provider(
&self,
buf: &[u8],
_table_ref: &TableReference,
_schema: SchemaRef,
_ctx: &SessionContext,
) -> Result<Arc<dyn TableProvider>, DataFusionError> {
Expand All @@ -1326,6 +1331,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec {

fn try_encode_table_provider(
&self,
_table_ref: &TableReference,
node: Arc<dyn TableProvider>,
buf: &mut Vec<u8>,
) -> Result<(), DataFusionError> {
Expand Down Expand Up @@ -1506,7 +1512,7 @@ pub(crate) async fn find_files_scan<'a>(

// Identify which columns we need to project
let mut used_columns = expression
.to_columns()?
.column_refs()
.into_iter()
.map(|column| logical_schema.index_of(&column.name))
.collect::<Result<Vec<usize>, ArrowError>>()?;
Expand Down Expand Up @@ -1756,9 +1762,8 @@ mod tests {
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor};
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
use datafusion_expr::lit;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use object_store::path::Path;
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/delta_datafusion/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl DisplayAs for MetricObserverExec {
}

impl ExecutionPlan for MetricObserverExec {
fn name(&self) -> &str {
Self::static_name()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ impl DisplayAs for CDCObserver {
}

impl ExecutionPlan for CDCObserver {
fn name(&self) -> &str {
Self::static_name()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl MergeBarrierExec {
}

impl ExecutionPlan for MergeBarrierExec {
fn name(&self) -> &str {
Self::static_name()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand Down
1 change: 1 addition & 0 deletions crates/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> {
ParserOptions {
parse_float_as_decimal: self.options.parse_float_as_decimal,
enable_ident_normalization: self.options.enable_ident_normalization,
support_varchar_with_length: false,
},
);
planner.statement_to_plan(s)
Expand Down

0 comments on commit 88c102e

Please sign in to comment.