Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScalarUDF with zero arguments should be provided with one null array as parameter #9031

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ mod tests {
use datafusion_common::{JoinSide, JoinType, Result, ScalarValue, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{ColumnarValue, Operator};
use datafusion_expr::{ColumnarValue, Operator, Signature, Volatility};
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
};
Expand Down Expand Up @@ -1270,6 +1270,10 @@ mod tests {
],
DataType::Int32,
None,
Signature::exact(
vec![DataType::Float32, DataType::Float32],
Volatility::Immutable,
),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1336,6 +1340,10 @@ mod tests {
],
DataType::Int32,
None,
Signature::exact(
vec![DataType::Float32, DataType::Float32],
Volatility::Immutable,
),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
Expand Down Expand Up @@ -1405,6 +1413,10 @@ mod tests {
],
DataType::Int32,
None,
Signature::exact(
vec![DataType::Float32, DataType::Float32],
Volatility::Immutable,
),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -1471,6 +1483,10 @@ mod tests {
],
DataType::Int32,
None,
Signature::exact(
vec![DataType::Float32, DataType::Float32],
Volatility::Immutable,
),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
Expand Down
122 changes: 116 additions & 6 deletions datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@
// under the License.

use arrow::compute::kernels::numeric::add;
use arrow_array::{ArrayRef, Float64Array, Int32Array, RecordBatch};
use arrow_array::{
ArrayRef, Float64Array, Int32Array, Int64Array, RecordBatch, UInt64Array, UInt8Array,
};
use arrow_schema::DataType::Float64;
use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::*;
use datafusion::{execution::registry::FunctionRegistry, test_util};
use datafusion_common::cast::as_float64_array;
use datafusion_common::{assert_batches_eq, cast::as_int32_array, Result, ScalarValue};
use datafusion_expr::TypeSignature::{Any, Variadic};
use datafusion_expr::{
create_udaf, create_udf, Accumulator, ColumnarValue, LogicalPlanBuilder, Volatility,
create_udaf, create_udf, Accumulator, ColumnarValue, LogicalPlanBuilder, ScalarUDF,
ScalarUDFImpl, Signature, Volatility,
};
use rand::{thread_rng, Rng};
use std::iter;
use std::sync::Arc;

/// test that casting happens on udfs.
Expand Down Expand Up @@ -166,10 +173,7 @@ async fn scalar_udf_zero_params() -> Result<()> {

ctx.register_batch("t", batch)?;
// create function just returns 100 regardless of inp
let myfunc = Arc::new(|args: &[ColumnarValue]| {
let ColumnarValue::Scalar(_) = &args[0] else {
panic!("expect scalar")
};
let myfunc = Arc::new(|_args: &[ColumnarValue]| {
Ok(ColumnarValue::Array(
Arc::new((0..1).map(|_| 100).collect::<Int32Array>()) as ArrayRef,
))
Expand Down Expand Up @@ -392,6 +396,112 @@ async fn test_user_defined_functions_with_alias() -> Result<()> {
Ok(())
}

#[derive(Debug)]
pub struct RandomUDF {
signature: Signature,
}

impl RandomUDF {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![Any(0), Variadic(vec![Float64])],
Volatility::Volatile,
),
}
}
}

impl ScalarUDFImpl for RandomUDF {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"random_udf"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Float64)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let len: usize = match &args[0] {
ColumnarValue::Array(array) => array.len(),
_ => {
return Err(datafusion::error::DataFusionError::Internal(
"Invalid argument type".to_string(),
))
}
};
let mut rng = thread_rng();
let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len);
let array = Float64Array::from_iter_values(values);
Ok(ColumnarValue::Array(Arc::new(array)))
}
}

#[tokio::test]
alamb marked this conversation as resolved.
Show resolved Hide resolved
async fn test_user_defined_functions_zero_argument() -> Result<()> {
let ctx = SessionContext::new();

let schema = Arc::new(Schema::new(vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't hurt but I wonder if the example table needs 4 columns 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied the reported test case. I think we can reduce the columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduced to one column.

Field::new("index", DataType::UInt8, false),
Field::new("uint", DataType::UInt64, true),
Field::new("int", DataType::Int64, true),
Field::new("float", DataType::Float64, true),
]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(UInt8Array::from_iter_values([1, 2, 3])),
Arc::new(UInt64Array::from(vec![Some(2), Some(3), None])),
Arc::new(Int64Array::from(vec![Some(-2), Some(3), None])),
Arc::new(Float64Array::from(vec![Some(1.0), Some(3.3), None])),
],
)?;

ctx.register_batch("data_table", batch)?;

let random_normal_udf = ScalarUDF::from(RandomUDF::new());
ctx.register_udf(random_normal_udf);

let result = plan_and_collect(
&ctx,
"SELECT random_udf() AS random_udf, random() AS native_random FROM data_table",
)
.await?;

assert_eq!(result.len(), 1);
let batch = &result[0];
let random_udf = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let native_random = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();

assert_eq!(random_udf.len(), native_random.len());

let mut previous = 1.0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can could the random implementation ever actually make 1.0 (the range is 0..1.0). Maybe we could start at -1.0 or something just to be sure this won't ever flake

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the range 0..1.0 is exclusive on the end point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But -1.0 is also good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the range 0..1.0 is exclusive on the end point?

If so that this is fine!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to -1.0 to make it more clear.

for i in 0..random_udf.len() {
assert!(random_udf.value(i) >= 0.0 && random_udf.value(i) < 1.0);
assert!(random_udf.value(i) != previous);
previous = random_udf.value(i);
}

Ok(())
}

fn create_udf_context() -> SessionContext {
let ctx = SessionContext::new();
// register a custom UDF
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn create_physical_expr(
input_phy_exprs.to_vec(),
data_type,
monotonicity,
fun.signature().clone(),
)))
}

Expand Down
18 changes: 6 additions & 12 deletions datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub fn create_physical_expr(
}

Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
let mut physical_args = args
let physical_args = args
.iter()
.map(|e| create_physical_expr(e, input_dfschema, execution_props))
.collect::<Result<Vec<_>>>()?;
Expand All @@ -272,17 +272,11 @@ pub fn create_physical_expr(
execution_props,
)
}
ScalarFunctionDefinition::UDF(fun) => {
// udfs with zero params expect null array as input
if args.is_empty() {
physical_args.push(Arc::new(Literal::new(ScalarValue::Null)));
}
udf::create_physical_expr(
fun.clone().as_ref(),
&physical_args,
input_schema,
)
}
ScalarFunctionDefinition::UDF(fun) => udf::create_physical_expr(
fun.clone().as_ref(),
&physical_args,
input_schema,
),
ScalarFunctionDefinition::Name(_) => {
internal_err!("Function `Expr` with name should be resolved.")
}
Expand Down
12 changes: 11 additions & 1 deletion datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::{
expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity,
ScalarFunctionImplementation,
ScalarFunctionImplementation, Signature,
};

/// Physical expression of a scalar function
Expand All @@ -58,6 +58,8 @@ pub struct ScalarFunctionExpr {
// and it specifies the effect of an increase or decrease in
// the corresponding `arg` to the function value.
monotonicity: Option<FuncMonotonicity>,
// Signature of the function
signature: Signature,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like only one field is ever read. I wonder if it would be better to copy just this field rather than the entire signature (which is both larger with several allocations, but also might be misleading that this signature information was used somehow more in execution plans.

I worry that the signature information might start being referred to in physical planning

So perhaps something like

Suggested change
signature: Signature,
// Does this function need to be invoked with zero arguments ?
supports_zero_argument: bool,
self.signature.type_signature.supports_zero_argument

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

}

impl Debug for ScalarFunctionExpr {
Expand All @@ -79,13 +81,15 @@ impl ScalarFunctionExpr {
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
monotonicity: Option<FuncMonotonicity>,
signature: Signature,
) -> Self {
Self {
fun,
name: name.to_owned(),
args,
return_type,
monotonicity,
signature,
}
}

Expand Down Expand Up @@ -149,6 +153,11 @@ impl PhysicalExpr for ScalarFunctionExpr {
{
vec![ColumnarValue::create_null_array(batch.num_rows())]
}
// If the function supports zero argument, we pass in a null array indicating the batch size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never fully understood why this didn't just check self.args.is_empty() 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Changed to self.args.is_empty().

// This is for user-defined functions.
(0, Err(_)) if self.signature.type_signature.supports_zero_argument() => {
vec![ColumnarValue::create_null_array(batch.num_rows())]
}
_ => self
.args
.iter()
Expand All @@ -175,6 +184,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
children,
self.return_type().clone(),
self.monotonicity.clone(),
self.signature.clone(),
)))
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub fn create_physical_expr(
input_phy_exprs.to_vec(),
fun.return_type(&input_exprs_types)?,
fun.monotonicity()?,
fun.signature().clone(),
)))
}

Expand Down
19 changes: 8 additions & 11 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,21 +340,17 @@ pub fn parse_physical_expr(
// TODO Do not create new the ExecutionProps
let execution_props = ExecutionProps::new();

let fun_expr = functions::create_physical_fun(
functions::create_physical_expr(
&(&scalar_function).into(),
&args,
input_schema,
&execution_props,
)?;

Arc::new(ScalarFunctionExpr::new(
&e.name,
fun_expr,
args,
convert_required!(e.return_type)?,
None,
))
)?
}
ExprType::ScalarUdf(e) => {
let scalar_fun = registry.udf(e.name.as_str())?.fun().clone();
let udf = registry.udf(e.name.as_str())?;
let signature = udf.signature();
let scalar_fun = udf.fun().clone();

let args = e
.args
Expand All @@ -368,6 +364,7 @@ pub fn parse_physical_expr(
args,
convert_required!(e.return_type)?,
None,
signature.clone(),
))
}
ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,9 @@ fn roundtrip_builtin_scalar_function() -> Result<()> {
"acos",
fun_expr,
vec![col("a", &schema)?],
DataType::Int64,
DataType::Float64,
Copy link
Member Author

@viirya viirya Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing test is not correct at all. acos built-in scalar function's return type should be Float64.

Previously the roundtrip test passes because from_proto simply takes serde return type and uses it as parameter to ScalarFunctionExpr.

But in this PR, from_proto calls create_physical_expr which gets return type directly from BuiltinScalarFunction. So with the PR, this test issue is found.

None,
Signature::exact(vec![DataType::Int64], Volatility::Immutable),
);

let project =
Expand Down Expand Up @@ -617,6 +618,7 @@ fn roundtrip_scalar_udf() -> Result<()> {
vec![col("a", &schema)?],
DataType::Int64,
None,
Signature::exact(vec![DataType::Int64], Volatility::Immutable),
);

let project =
Expand Down
Loading