Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFusion 44.0.0 #335

Merged
merged 13 commits into from
Jan 9, 2025
Merged
415 changes: 210 additions & 205 deletions Cargo.lock

Large diffs are not rendered by default.

36 changes: 8 additions & 28 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ iana-time-zone = "0.1.61"
# The versions of the following dependencies are managed manually.
######

datafusion = { version = "43.0.0", features = ["serde", "pyarrow", "avro"] }
datafusion-common = { version = "43.0.0", features = ["object_store", "pyarrow", "avro"] }
datafusion-expr = "43.0.0"
datafusion-proto = "43.0.0"
datafusion-functions-nested = "43.0.0"
datafusion-functions-json = "0.43.0"
datafusion = { version = "44.0.0", features = ["serde", "pyarrow", "avro"] }
datafusion-common = { version = "44.0.0", features = ["object_store", "pyarrow", "avro"] }
datafusion-expr = { version = "44.0.0" }
datafusion-expr-common = { version = "44.0.0" }
datafusion-proto = { version = "44.0.0" }
datafusion-functions-nested = { version = "44.0.0" }
datafusion-functions-json = { git = "https://github.com/lakehq/datafusion-functions-json.git", rev = "7bcca26" }
# auto-initialize: Changes [`Python::with_gil`] to automatically initialize the Python interpreter if needed.
pyo3 = { version = "0.22.0", features = ["auto-initialize", "serde"] }
arrow-flight = { version = "53.1.0" }
arrow-flight = { version = "53.3.0" }
# The `object_store` version must match the one used in DataFusion.
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
# We use a patched latest version of sqlparser. The version may be different from the one used in DataFusion.
Expand All @@ -106,27 +107,6 @@ sqlparser = { git = "https://github.com/lakehq/sqlparser-rs.git", rev = "69514bb
[patch.crates-io]
# Override dependencies to use our forked versions.
# You can use `path = "..."` to temporarily point to your local copy of the crates to speed up local development.
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-catalog = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-common-runtime = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-functions-aggregate-common = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-functions-window = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-functions-window-common = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-physical-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-proto-common = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "6d8313e" }

[profile.release]
# https://doc.rust-lang.org/cargo/reference/profiles.html#release
Expand Down
10 changes: 6 additions & 4 deletions crates/sail-execution/src/plan/shuffle_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use datafusion::common::{exec_datafusion_err, internal_err, Result};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::expressions::UnKnownColumn;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::future::try_join_all;
use futures::TryStreamExt;
use log::warn;
Expand Down Expand Up @@ -47,7 +46,10 @@ impl ShuffleReadExec {
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
partitioning,
ExecutionMode::Unbounded,
EmissionType::Both,
linhr marked this conversation as resolved.
Show resolved Hide resolved
Boundedness::Unbounded {
requires_infinite_memory: true,
linhr marked this conversation as resolved.
Show resolved Hide resolved
},
);
Self {
stage,
Expand Down
9 changes: 6 additions & 3 deletions crates/sail-execution/src/plan/shuffle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use datafusion::common::{exec_datafusion_err, exec_err, plan_err, Result};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::expressions::UnKnownColumn;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::Boundedness;
use datafusion::physical_plan::repartition::BatchPartitioner;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
use futures::future::try_join_all;
use futures::StreamExt;
Expand Down Expand Up @@ -66,7 +66,10 @@ impl ShuffleWriteExec {
// These output streams are written to locations managed by the worker,
// while the return value of `.execute()` is always an empty stream.
input_partitioning,
ExecutionMode::Unbounded,
plan.pipeline_behavior(),
linhr marked this conversation as resolved.
Show resolved Hide resolved
Boundedness::Unbounded {
requires_infinite_memory: true,
linhr marked this conversation as resolved.
Show resolved Hide resolved
},
);
let locations = vec![vec![]; input_partition_count];
Self {
Expand Down
1 change: 1 addition & 0 deletions crates/sail-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ sail-python-udf = { path = "../sail-python-udf" }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-functions-nested = { workspace = true }
datafusion-functions-json = { workspace = true }
thiserror = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-plan/src/catalog/function.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use datafusion::datasource::function::TableFunctionImpl;
use datafusion::catalog::TableFunctionImpl;
use datafusion_common::{DFSchema, DFSchemaRef, Result, TableReference};
use datafusion_expr::{DdlStatement, DropFunction, LogicalPlan, ScalarUDF};
use serde::{Deserialize, Serialize};
Expand Down
6 changes: 5 additions & 1 deletion crates/sail-plan/src/extension/function/kurtosis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion::scalar::ScalarValue;
use datafusion_common::types::logical_float64;
use datafusion_expr_common::signature::TypeSignatureClass;

pub struct KurtosisFunction {
signature: Signature,
Expand All @@ -34,7 +35,10 @@ impl Default for KurtosisFunction {
impl KurtosisFunction {
pub fn new() -> Self {
Self {
signature: Signature::coercible(vec![logical_float64()], Volatility::Immutable),
signature: Signature::coercible(
vec![TypeSignatureClass::Native(logical_float64())],
Volatility::Immutable,
),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/sail-plan/src/extension/function/skewness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::common::ScalarValue;
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion_common::types::logical_float64;
use datafusion_expr_common::signature::TypeSignatureClass;

pub struct SkewnessFunc {
name: String,
Expand All @@ -34,7 +35,10 @@ impl SkewnessFunc {
pub fn new() -> Self {
Self {
name: "skewness".to_string(),
signature: Signature::coercible(vec![logical_float64()], Volatility::Immutable),
signature: Signature::coercible(
vec![TypeSignatureClass::Native(logical_float64())],
Volatility::Immutable,
),
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions crates/sail-plan/src/extension/function/spark_aes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl ScalarUDFImpl for SparkAESEncrypt {
Ok(DataType::Binary)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], _number_rows: usize) -> Result<ColumnarValue> {
if args.len() < 2 || args.len() > 6 {
return exec_err!(
"Spark `aes_encrypt` function requires 2 to 6 arguments, got {}",
Expand Down Expand Up @@ -579,7 +579,7 @@ impl ScalarUDFImpl for SparkAESDecrypt {
Ok(DataType::Binary)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], _number_rows: usize) -> Result<ColumnarValue> {
if args.len() < 2 || args.len() > 5 {
return exec_err!(
"Spark `aes_decrypt` function requires 2 to 5 arguments, got {}",
Expand Down Expand Up @@ -958,9 +958,8 @@ impl ScalarUDFImpl for SparkTryAESEncrypt {
Ok(DataType::Binary)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
#[allow(deprecated)] // TODO use invoke_batch
let result = SparkAESEncrypt::new().invoke(args);
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
let result = SparkAESEncrypt::new().invoke_batch(args, number_rows);
match result {
Ok(result) => Ok(result),
Err(_) => Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))),
Expand Down Expand Up @@ -1004,9 +1003,8 @@ impl ScalarUDFImpl for SparkTryAESDecrypt {
Ok(DataType::Binary)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
#[allow(deprecated)] // TODO use invoke_batch
let result = SparkAESDecrypt::new().invoke(args);
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
let result = SparkAESDecrypt::new().invoke_batch(args, number_rows);
match result {
Ok(result) => Ok(result),
Err(_) => Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))),
Expand Down
8 changes: 6 additions & 2 deletions crates/sail-plan/src/extension/function/spark_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use datafusion::arrow::array::{
};
use datafusion::arrow::buffer::OffsetBuffer;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion_common::utils::array_into_list_array_nullable;
use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::{internal_err, plan_err, ExprSchema, Result};
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, TypeSignature, Volatility};
Expand Down Expand Up @@ -136,7 +136,11 @@ pub fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
let length = arrays.iter().map(|a| a.len()).sum();
// By default Int32
let array = new_null_array(&DataType::Int32, length);
Ok(Arc::new(array_into_list_array_nullable(array)))
Ok(Arc::new(
SingleRowListArrayBuilder::new(array)
.with_nullable(true)
.build_list_array(),
))
}
DataType::LargeList(..) => array_array::<i64>(arrays, data_type),
_ => array_array::<i32>(arrays, data_type),
Expand Down
8 changes: 3 additions & 5 deletions crates/sail-plan/src/extension/function/spark_concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ impl ScalarUDFImpl for SparkConcat {
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
if args
.iter()
.any(|arg| matches!(arg.data_type(), DataType::List(_)))
{
#[allow(deprecated)] // TODO use invoke_batch
ArrayConcat::new().invoke(args)
ArrayConcat::new().invoke_batch(args, number_rows)
} else {
#[allow(deprecated)] // TODO use invoke_batch
ConcatFunc::new().invoke(args)
ConcatFunc::new().invoke_batch(args, number_rows)
}
}
}
5 changes: 2 additions & 3 deletions crates/sail-plan/src/extension/function/spark_reverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@ impl ScalarUDFImpl for SparkReverse {
Ok(arg_types[0].clone())
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
if args.len() != 1 {
return exec_err!("array_reverse needs one argument");
}
match &args[0].data_type() {
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
#[allow(deprecated)] // TODO use invoke_batch
ReverseFunc::new().invoke(args)
ReverseFunc::new().invoke_batch(args, number_rows)
}
_ => make_scalar_function(array_reverse_inner)(args),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
Ok(DataType::Int64)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
if args.is_empty() {
return exec_err!("spark_unix_timestamp function requires 1 or more arguments");
}
Expand All @@ -66,10 +66,9 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
None,
)?
.cast_to(&DataType::Int64, None),
#[allow(deprecated)] // TODO use invoke_batch
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
ToTimestampSecondsFunc::new()
.invoke(args)?
.invoke_batch(args, number_rows)?
.cast_to(
&DataType::Timestamp(TimeUnit::Second, Some(self.timezone.clone())),
None,
Expand Down
5 changes: 2 additions & 3 deletions crates/sail-plan/src/extension/function/spark_weekofyear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ScalarUDFImpl for SparkWeekOfYear {
Ok(DataType::UInt32)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
if args.is_empty() {
return exec_err!("Spark `weekofyear` function requires 1 argument, got 0");
}
Expand All @@ -70,10 +70,9 @@ impl ScalarUDFImpl for SparkWeekOfYear {
None,
)?
.cast_to(&DataType::Int64, None),
#[allow(deprecated)] // TODO use invoke_batch
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
ToTimestampNanosFunc::new()
.invoke(args)?
.invoke_batch(args, number_rows)?
.cast_to(
&DataType::Timestamp(TimeUnit::Nanosecond, Some(self.timezone.clone())),
None,
Expand Down
3 changes: 2 additions & 1 deletion crates/sail-plan/src/extension/physical/map_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ impl MapPartitionsExec {
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
input.output_partitioning().clone(),
input.execution_mode(),
input.pipeline_behavior(),
linhr marked this conversation as resolved.
Show resolved Hide resolved
input.boundedness(),
);
Self {
input,
Expand Down
6 changes: 4 additions & 2 deletions crates/sail-plan/src/extension/physical/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, PlanProperties};
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, PlanProperties};
use datafusion_common::{exec_err, internal_err, Result};

use crate::extension::logical::Range;
Expand All @@ -27,7 +28,8 @@ impl RangeExec {
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::RoundRobinBatch(num_partitions),
ExecutionMode::Bounded,
EmissionType::Both,
linhr marked this conversation as resolved.
Show resolved Hide resolved
Boundedness::Bounded,
);
Self {
range,
Expand Down
3 changes: 2 additions & 1 deletion crates/sail-plan/src/extension/physical/schema_pivot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ impl SchemaPivotExec {
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
partitioning,
input.execution_mode(),
input.pipeline_behavior(),
linhr marked this conversation as resolved.
Show resolved Hide resolved
input.boundedness(),
);
Self {
input,
Expand Down
6 changes: 4 additions & 2 deletions crates/sail-plan/src/extension/physical/show_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use datafusion::arrow::compute::concat_batches;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::{Distribution, EquivalenceProperties, Partitioning};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{
DisplayAs, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
DisplayAs, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
use datafusion_common::{exec_err, internal_datafusion_err, DataFusionError, Result};
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -39,7 +40,8 @@ impl ShowStringExec {
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::RoundRobinBatch(1),
ExecutionMode::Bounded,
EmissionType::Final,
linhr marked this conversation as resolved.
Show resolved Hide resolved
Boundedness::Bounded,
);
Self {
input,
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-plan/src/function/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::datasource::function::TableFunction;
use datafusion::catalog::TableFunction;
use lazy_static::lazy_static;

use crate::error::{PlanError, PlanResult};
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-plan/src/function/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use datafusion::datasource::function::TableFunction;
use datafusion::catalog::TableFunction;

use crate::function::table::range::RangeTableFunction;

Expand Down
3 changes: 1 addition & 2 deletions crates/sail-plan/src/function/table/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::catalog::{Session, TableProvider};
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::catalog::{Session, TableFunctionImpl, TableProvider};
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::{exec_err, Result};
use datafusion_expr::{logical_plan, Expr, LogicalPlan, TableType, UserDefinedLogicalNodeCore};
Expand Down
Loading
Loading