Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade to DataFusion 35.0 #2121

Merged
merged 8 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "49" }
arrow-arith = { version = "49" }
arrow-array = { version = "49" }
arrow-buffer = { version = "49" }
arrow-cast = { version = "49" }
arrow-ipc = { version = "49" }
arrow-json = { version = "49" }
arrow-ord = { version = "49" }
arrow-row = { version = "49" }
arrow-schema = { version = "49" }
arrow-select = { version = "49" }
object_store = { version = "0.8" }
parquet = { version = "49" }
arrow = { version = "50" }
arrow-arith = { version = "50" }
arrow-array = { version = "50" }
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
arrow-json = { version = "50" }
arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
object_store = { version = "0.9" }
parquet = { version = "50" }

# datafusion
datafusion = { version = "34" }
datafusion-expr = { version = "34" }
datafusion-common = { version = "34" }
datafusion-proto = { version = "34" }
datafusion-sql = { version = "34" }
datafusion-physical-expr = { version = "34" }

datafusion = { version = "35" }
datafusion-expr = { version = "35" }
datafusion-common = { version = "35" }
datafusion-proto = { version = "35" }
datafusion-sql = { version = "35" }
datafusion-physical-expr = { version = "35" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions crates/azure/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum AzureCredential {
/// Authorizing with secret
ClientSecret,
/// Using a shared access signature
#[allow(dead_code)]
ManagedIdentity,
/// Using a shared access signature
SasKey,
Expand Down
1 change: 1 addition & 0 deletions crates/azure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[allow(dead_code)]
#[error("failed to parse config: {0}")]
Parse(String),

Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
sqlparser = { version = "0.40", optional = true }
sqlparser = { version = "0.41", optional = true }

[dev-dependencies]
criterion = "0.5"
Expand Down
8 changes: 3 additions & 5 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub mod physical;
impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
match err {
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source),
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None),
DeltaTableError::Io { source } => DataFusionError::IoError(source),
DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source),
Expand All @@ -102,7 +102,7 @@ impl From<DeltaTableError> for DataFusionError {
impl From<DataFusionError> for DeltaTableError {
fn from(err: DataFusionError) -> Self {
match err {
DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source },
DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source },
DataFusionError::IoError(source) => DeltaTableError::Io { source },
DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source },
Expand Down Expand Up @@ -430,7 +430,6 @@ impl<'a> DeltaScanBuilder<'a> {
limit: self.limit,
table_partition_cols,
output_ordering: vec![],
infinite_source: false,
},
logical_filter.as_ref(),
)
Expand Down Expand Up @@ -808,7 +807,7 @@ pub(crate) fn logical_expr_to_physical_expr(
) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
}

pub(crate) async fn execute_plan_to_batch(
Expand Down Expand Up @@ -1238,7 +1237,6 @@ pub(crate) async fn find_files_scan<'a>(
let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
&input_schema,
state.execution_props(),
)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl LogicalFile<'_> {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then(|| DeletionVectorView {
.then_some(DeletionVectorView {
data: arr,
index: self.index,
})
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ async fn excute_non_empty_expr(
let predicate_expr = create_physical_expr(
&negated_expression,
&input_dfschema,
&input_schema,
state.execution_props(),
)?;
let filter: Arc<dyn ExecutionPlan> =
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ impl Stream for MergeBarrierStream {
.iter()
.map(|c| {
arrow::compute::take(c.as_ref(), &indices, None)
.map_err(DataFusionError::ArrowError)
.map_err(|err| {
DataFusionError::ArrowError(err, None)
})
})
.collect::<DataFusionResult<Vec<ArrayRef>>>()?;

Expand Down
6 changes: 1 addition & 5 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
Expand Down Expand Up @@ -657,11 +656,10 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner {

if let Some(barrier) = node.as_any().downcast_ref::<MergeBarrier>() {
let schema = barrier.input.schema();
let exec_schema: ArrowSchema = schema.as_ref().to_owned().into();
return Ok(Some(Arc::new(MergeBarrierExec::new(
physical_inputs.first().unwrap().clone(),
barrier.file_column.clone(),
planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?,
planner.create_physical_expr(&barrier.expr, schema, session_state)?,
))));
}

Expand Down Expand Up @@ -1418,9 +1416,7 @@ impl std::future::IntoFuture for MergeBuilder {
PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
//TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made.
let config: SessionConfig = DeltaSessionConfig::default().into();
let config = config.with_target_partitions(1);
let session = SessionContext::new_with_config(config);

// If a user provides their own their DF state then they must register the store themselves
Expand Down
46 changes: 32 additions & 14 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ impl MergePlan {
use datafusion::prelude::{col, ParquetReadOptions};
use datafusion_common::Column;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::Expr;
use datafusion_expr::{Expr, ScalarUDF};

let locations = files
.iter()
Expand All @@ -578,7 +578,7 @@ impl MergePlan {
.map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col)))
.collect_vec();
let expr = Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(zorder::datafusion::zorder_key_udf()),
Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)),
cols,
));
let df = df.with_column(ZORDER_KEY_COLUMN, expr)?;
Expand Down Expand Up @@ -1139,10 +1139,10 @@ pub(super) mod zorder {
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
TypeSignature, Volatility,
ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use itertools::Itertools;
use std::any::Any;

pub const ZORDER_UDF_NAME: &str = "zorder_key";

Expand All @@ -1166,20 +1166,38 @@ pub(super) mod zorder {

use url::Url;
let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
ctx.register_udf(datafusion::zorder_key_udf());
ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
Ok(Self { columns, ctx })
}
}

/// Get the DataFusion UDF struct for zorder_key
pub fn zorder_key_udf() -> ScalarUDF {
let signature = Signature {
type_signature: TypeSignature::VariadicAny,
volatility: Volatility::Immutable,
};
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Binary)));
let fun: ScalarFunctionImplementation = Arc::new(zorder_key_datafusion);
ScalarUDF::new(ZORDER_UDF_NAME, &signature, &return_type, &fun)
// DataFusion UDF impl for zorder_key
#[derive(Debug)]
pub struct ZOrderUDF;

impl ScalarUDFImpl for ZOrderUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
ZORDER_UDF_NAME
}

fn signature(&self) -> &Signature {
&Signature {
type_signature: TypeSignature::VariadicAny,
volatility: Volatility::Immutable,
}
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
Ok(DataType::Binary)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
zorder_key_datafusion(args)
}
}

/// Datafusion zorder UDF body
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::{
DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
Expand Down Expand Up @@ -296,6 +297,12 @@ impl<'a> PruningStatistics for AddContainer<'a> {
});
ScalarValue::iter_to_array(values).ok()
}

// This function is required since DataFusion 35.0, but is implemented as a no-op
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
fn contained(&self, _column: &Column, _value: &HashSet<ScalarValue>) -> Option<BooleanArray> {
None
}
}

impl PruningStatistics for DeltaTableState {
Expand Down Expand Up @@ -333,6 +340,12 @@ impl PruningStatistics for DeltaTableState {
let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?);
container.null_counts(column)
}

// This function is required since DataFusion 35.0, but is implemented as a no-op
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
fn contained(&self, _column: &Column, _value: &HashSet<ScalarValue>) -> Option<BooleanArray> {
None
}
}

#[cfg(test)]
Expand Down
10 changes: 2 additions & 8 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,7 @@ async fn execute(

let predicate_null =
when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?;
let predicate_expr = create_physical_expr(
&predicate_null,
&input_dfschema,
&input_schema,
execution_props,
)?;
let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?;
expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string()));

let projection_predicate: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -315,8 +310,7 @@ async fn execute(
let expr = case(col("__delta_rs_update_predicate"))
.when(lit(true), expr.to_owned())
.otherwise(col(column.to_owned()))?;
let predicate_expr =
create_physical_expr(&expr, &input_dfschema, &input_schema, execution_props)?;
let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?;
map.insert(column.name.clone(), expressions.len());
let c = "__delta_rs_".to_string() + &column.name;
expressions.push((predicate_expr, c.clone()));
Expand Down
1 change: 1 addition & 0 deletions crates/gcp/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[allow(dead_code)]
#[error("failed to parse config: {0}")]
Parse(String),

Expand Down
Loading
Loading