Skip to content

Commit

Permalink
Deprecate invoke and invoke_no_args in favor of invoke_batch (#13174)
Browse files Browse the repository at this point in the history
* Deprecate invoke and invoke_no_args in favor of invoke_batch

`invoke_batch` covers all needs, so let's deprecate and eventually
remove the redundant variants.

* Migrate test_function to invoke_batch

* Migrate regexpcount tests to invoke_batch

* Migrate log tests to invoke_batch

* Migrate tests to use invoke_batch

* Migrate ToUnixtimeFunc to implement invoke_batch

* Suppress deprecation warnings in tests

To be followed-up on.

* Migrate random benchmark to invoke_batch

* fixup! Suppress deprecation warnings in tests

* Fix docstring
  • Loading branch information
findepi authored Nov 4, 2024
1 parent 2482ff4 commit 274b222
Show file tree
Hide file tree
Showing 21 changed files with 199 additions and 102 deletions.
31 changes: 29 additions & 2 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl ScalarUDF {
/// See [`ScalarUDFImpl::invoke`] for more details.
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
pub fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke(args)
}

Expand All @@ -218,6 +219,7 @@ impl ScalarUDF {
/// See [`ScalarUDFImpl::invoke_no_args`] for more details.
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
pub fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_no_args(number_rows)
}

Expand All @@ -226,6 +228,7 @@ impl ScalarUDF {
#[deprecated(since = "42.0.0", note = "Use `invoke_batch` instead")]
pub fn fun(&self) -> ScalarFunctionImplementation {
let captured = Arc::clone(&self.inner);
#[allow(deprecated)]
Arc::new(move |args| captured.invoke(args))
}

Expand Down Expand Up @@ -480,6 +483,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
/// to arrays, which will likely be simpler code, but be slower.
///
/// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
not_impl_err!(
"Function {} does not implement invoke but called",
Expand All @@ -489,19 +493,40 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {

/// Invoke the function with `args` and the number of rows,
/// returning the appropriate result.
///
/// The function will be invoked with the slice of [`ColumnarValue`]
/// (either scalar or array).
///
/// # Performance
///
/// For the best performance, the implementations should handle the common case
/// when one or more of their arguments are constant values (aka
/// [`ColumnarValue::Scalar`]).
///
/// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments
/// to arrays, which will likely be simpler code, but be slower.
fn invoke_batch(
&self,
args: &[ColumnarValue],
number_rows: usize,
) -> Result<ColumnarValue> {
match args.is_empty() {
true => self.invoke_no_args(number_rows),
false => self.invoke(args),
true =>
{
#[allow(deprecated)]
self.invoke_no_args(number_rows)
}
false =>
{
#[allow(deprecated)]
self.invoke(args)
}
}
}

/// Invoke the function without `args`, instead the number of rows are provided,
/// returning the appropriate result.
#[deprecated(since = "42.1.0", note = "Use `invoke_batch` instead")]
fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
not_impl_err!(
"Function {} does not implement invoke_no_args but called",
Expand Down Expand Up @@ -725,10 +750,12 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke(args)
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
#[allow(deprecated)]
self.inner.invoke_no_args(number_rows)
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions/benches/random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("random_1M_rows_batch_8192", |b| {
b.iter(|| {
for _ in 0..iterations {
black_box(random_func.invoke_no_args(8192).unwrap());
black_box(random_func.invoke_batch(&[], 8192).unwrap());
}
})
});
Expand All @@ -39,7 +39,7 @@ fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("random_1M_rows_batch_128", |b| {
b.iter(|| {
for _ in 0..iterations_128 {
black_box(random_func.invoke_no_args(128).unwrap());
black_box(random_func.invoke_batch(&[], 128).unwrap());
}
})
});
Expand Down
2 changes: 2 additions & 0 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ mod tests {
use chrono::TimeDelta;

#[test]
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
fn test_date_bin() {
let res = DateBinFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(IntervalDayTime {
Expand Down Expand Up @@ -781,6 +782,7 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = DateBinFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/functions/src/datetime/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = DateTruncFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::from("day")),
Expand Down Expand Up @@ -882,6 +883,7 @@ mod tests {
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = DateTruncFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::from("hour")),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/functions/src/datetime/from_unixtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ mod test {
fn test_without_timezone() {
let args = [ColumnarValue::Scalar(Int64(Some(1729900800)))];

#[allow(deprecated)] // TODO use invoke_batch
let result = FromUnixtimeFunc::new().invoke(&args).unwrap();

match result {
Expand All @@ -181,6 +182,7 @@ mod test {
))),
];

#[allow(deprecated)] // TODO use invoke_batch
let result = FromUnixtimeFunc::new().invoke(&args).unwrap();

match result {
Expand Down
8 changes: 8 additions & 0 deletions datafusion/functions/src/datetime/make_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ mod tests {

#[test]
fn test_make_date() {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2024))),
Expand All @@ -248,6 +249,7 @@ mod tests {
panic!("Expected a scalar value")
}

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Int64(Some(2024))),
Expand All @@ -262,6 +264,7 @@ mod tests {
panic!("Expected a scalar value")
}

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Scalar(ScalarValue::Utf8(Some("2024".to_string()))),
Expand All @@ -279,6 +282,7 @@ mod tests {
let years = Arc::new((2021..2025).map(Some).collect::<Int64Array>());
let months = Arc::new((1..5).map(Some).collect::<Int32Array>());
let days = Arc::new((11..15).map(Some).collect::<UInt32Array>());
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[
ColumnarValue::Array(years),
Expand All @@ -304,6 +308,7 @@ mod tests {
//

// invalid number of arguments
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new()
.invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]);
assert_eq!(
Expand All @@ -312,6 +317,7 @@ mod tests {
);

// invalid type
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
Expand All @@ -323,6 +329,7 @@ mod tests {
);

// overflow of month
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
ColumnarValue::Scalar(ScalarValue::UInt64(Some(u64::MAX))),
Expand All @@ -334,6 +341,7 @@ mod tests {
);

// overflow of day
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let res = MakeDateFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(2023))),
ColumnarValue::Scalar(ScalarValue::Int32(Some(22))),
Expand Down
6 changes: 6 additions & 0 deletions datafusion/functions/src/datetime/to_char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ mod tests {
];

for (value, format, expected) in scalar_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[ColumnarValue::Scalar(value), ColumnarValue::Scalar(format)])
.expect("that to_char parsed values without error");
Expand Down Expand Up @@ -458,6 +459,7 @@ mod tests {
];

for (value, format, expected) in scalar_array_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[
ColumnarValue::Scalar(value),
Expand Down Expand Up @@ -583,6 +585,7 @@ mod tests {
];

for (value, format, expected) in array_scalar_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[
ColumnarValue::Array(value as ArrayRef),
Expand All @@ -599,6 +602,7 @@ mod tests {
}

for (value, format, expected) in array_array_data {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[
ColumnarValue::Array(value),
Expand All @@ -619,6 +623,7 @@ mod tests {
//

// invalid number of arguments
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new()
.invoke(&[ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))]);
assert_eq!(
Expand All @@ -627,6 +632,7 @@ mod tests {
);

// invalid type
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = ToCharFunc::new().invoke(&[
ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
Expand Down
8 changes: 8 additions & 0 deletions datafusion/functions/src/datetime/to_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ mod tests {
}

fn test_scalar(sv: ScalarValue, tc: &TestCase) {
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[ColumnarValue::Scalar(sv)]);

match to_date_result {
Expand All @@ -233,6 +234,7 @@ mod tests {
A: From<Vec<&'static str>> + Array + 'static,
{
let date_array = A::from(vec![tc.date_str]);
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Array(Arc::new(date_array))]);

Expand Down Expand Up @@ -323,6 +325,7 @@ mod tests {
fn test_scalar(sv: ScalarValue, tc: &TestCase) {
let format_scalar = ScalarValue::Utf8(Some(tc.format_str.to_string()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[
ColumnarValue::Scalar(sv),
ColumnarValue::Scalar(format_scalar),
Expand All @@ -347,6 +350,7 @@ mod tests {
let date_array = A::from(vec![tc.formatted_date]);
let format_array = A::from(vec![tc.format_str]);

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[
ColumnarValue::Array(Arc::new(date_array)),
ColumnarValue::Array(Arc::new(format_array)),
Expand Down Expand Up @@ -382,6 +386,7 @@ mod tests {
let format1_scalar = ScalarValue::Utf8(Some("%Y-%m-%d".into()));
let format2_scalar = ScalarValue::Utf8(Some("%Y/%m/%d".into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result = ToDateFunc::new().invoke(&[
ColumnarValue::Scalar(formatted_date_scalar),
ColumnarValue::Scalar(format1_scalar),
Expand Down Expand Up @@ -410,6 +415,7 @@ mod tests {
for date_str in test_cases {
let formatted_date_scalar = ScalarValue::Utf8(Some(date_str.into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(formatted_date_scalar)]);

Expand All @@ -428,6 +434,7 @@ mod tests {
let date_str = "20241231";
let date_scalar = ScalarValue::Utf8(Some(date_str.into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(date_scalar)]);

Expand All @@ -449,6 +456,7 @@ mod tests {
let date_str = "202412311";
let date_scalar = ScalarValue::Utf8(Some(date_str.into()));

#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let to_date_result =
ToDateFunc::new().invoke(&[ColumnarValue::Scalar(date_scalar)]);

Expand Down
5 changes: 3 additions & 2 deletions datafusion/functions/src/datetime/to_local_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ mod tests {

fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
let res = ToLocalTimeFunc::new()
.invoke(&[ColumnarValue::Scalar(input)])
.invoke_batch(&[ColumnarValue::Scalar(input)], 1)
.unwrap();
match res {
ColumnarValue::Scalar(res) => {
Expand Down Expand Up @@ -616,8 +616,9 @@ mod tests {
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>();
let batch_size = input.len();
let result = ToLocalTimeFunc::new()
.invoke(&[ColumnarValue::Array(Arc::new(input))])
.invoke_batch(&[ColumnarValue::Array(Arc::new(input))], batch_size)
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/functions/src/datetime/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ mod tests {
use arrow::array::{ArrayRef, Int64Array, StringBuilder};
use arrow::datatypes::TimeUnit;
use chrono::Utc;

use datafusion_common::{assert_contains, DataFusionError, ScalarValue};
use datafusion_expr::ScalarFunctionImplementation;

Expand Down Expand Up @@ -1011,7 +1010,7 @@ mod tests {
assert!(matches!(rt, Timestamp(_, Some(_))));

let res = udf
.invoke(&[array.clone()])
.invoke_batch(&[array.clone()], 1)
.expect("that to_timestamp parsed values without error");
let array = match res {
ColumnarValue::Array(res) => res,
Expand Down Expand Up @@ -1054,7 +1053,7 @@ mod tests {
assert!(matches!(rt, Timestamp(_, None)));

let res = udf
.invoke(&[array.clone()])
.invoke_batch(&[array.clone()], 1)
.expect("that to_timestamp parsed values without error");
let array = match res {
ColumnarValue::Array(res) => res,
Expand Down
Loading

0 comments on commit 274b222

Please sign in to comment.