Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ColumnarValue::values_to_arrays, deprecate columnar_values_to_array #9114

Merged
merged 3 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::cast::as_float64_array;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::functions::columnar_values_to_array;
use std::sync::Arc;

/// create local execution context with an in-memory table:
Expand Down Expand Up @@ -71,13 +70,15 @@ async fn main() -> Result<()> {
// this is guaranteed by DataFusion based on the function's signature.
assert_eq!(args.len(), 2);

let args = columnar_values_to_array(args)?;
// Expand the arguments to arrays (this is simple, but inefficient for
// single constant values).
let args = ColumnarValue::values_to_arrays(args)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the basic change -- make the logic a function of ColumnarValue


// 1. cast both arguments to f64. These casts MUST be aligned with the signature or this function panics!
let base = as_float64_array(&args[0]).expect("cast failed");
let exponent = as_float64_array(&args[1]).expect("cast failed");

// this is guaranteed by DataFusion. We place it just to make it obvious.
// The array lengths is guaranteed by DataFusion. We assert here to make it obvious.
assert_eq!(exponent.len(), base.len());

// 2. perform the computation
Expand Down
164 changes: 163 additions & 1 deletion datafusion/expr/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::datatypes::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use std::sync::Arc;

/// Represents the result of evaluating an expression: either a single
Expand Down Expand Up @@ -75,4 +75,166 @@ impl ColumnarValue {
pub fn create_null_array(num_rows: usize) -> Self {
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}

/// Converts [`ColumnarValue`]s to [`ArrayRef`]s with the same length.
///
/// # Performance Note
///
/// This function expands any [`ScalarValue`] to an array. This expansion
/// permits using a single function in terms of arrays, but it can be
/// inefficient compared to handling the scalar value directly.
///
/// Thus, it is recommended to provide specialized implementations for
/// scalar values if performance is a concern.
///
/// # Errors
///
/// If there are multiple array arguments that have different lengths
pub fn values_to_arrays(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a different algorithm than columnar_values_to_array as it also handles mixed ScalarValue and ArrayRefs

if args.is_empty() {
return Ok(vec![]);
}

let mut array_len = None;
for arg in args {
array_len = match (arg, array_len) {
(ColumnarValue::Array(a), None) => Some(a.len()),
(ColumnarValue::Array(a), Some(array_len)) => {
if array_len == a.len() {
Some(array_len)
} else {
return internal_err!(
"Arguments has mixed length. Expected length: {array_len}, found length: {}", a.len()
);
}
}
(ColumnarValue::Scalar(_), array_len) => array_len,
}
}

// If array_len is none, it means there are only scalars, so make a 1 element array
let inferred_length = array_len.unwrap_or(1);

let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

Ok(args)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn values_to_arrays() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new tests

// (input, expected)
let cases = vec![
// empty
TestCase {
input: vec![],
expected: vec![],
},
// one array of length 3
TestCase {
input: vec![ColumnarValue::Array(make_array(1, 3))],
expected: vec![make_array(1, 3)],
},
// two arrays length 3
TestCase {
input: vec![
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Array(make_array(2, 3)),
],
expected: vec![make_array(1, 3), make_array(2, 3)],
},
// array and scalar
TestCase {
input: vec![
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
],
expected: vec![
make_array(1, 3),
make_array(100, 3), // scalar is expanded
],
},
// scalar and array
TestCase {
input: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
ColumnarValue::Array(make_array(1, 3)),
],
expected: vec![
make_array(100, 3), // scalar is expanded
make_array(1, 3),
],
},
// multiple scalars and array
TestCase {
input: vec![
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Scalar(ScalarValue::Int32(Some(200))),
],
expected: vec![
make_array(100, 3), // scalar is expanded
make_array(1, 3),
make_array(200, 3), // scalar is expanded
],
},
];
for case in cases {
case.run();
}
}

#[test]
#[should_panic(
expected = "Arguments has mixed length. Expected length: 3, found length: 4"
)]
fn values_to_arrays_mixed_length() {
ColumnarValue::values_to_arrays(&[
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Array(make_array(2, 4)),
])
.unwrap();
}

#[test]
#[should_panic(
expected = "Arguments has mixed length. Expected length: 3, found length: 7"
)]
fn values_to_arrays_mixed_length_and_scalar() {
ColumnarValue::values_to_arrays(&[
ColumnarValue::Array(make_array(1, 3)),
ColumnarValue::Scalar(ScalarValue::Int32(Some(100))),
ColumnarValue::Array(make_array(2, 7)),
])
.unwrap();
}

struct TestCase {
input: Vec<ColumnarValue>,
expected: Vec<ArrayRef>,
}

impl TestCase {
fn run(self) {
let Self { input, expected } = self;

assert_eq!(
ColumnarValue::values_to_arrays(&input).unwrap(),
expected,
"\ninput: {input:?}\nexpected: {expected:?}"
);
}
}

/// Makes an array of length `len` with all elements set to `val`
fn make_array(val: i32, len: usize) -> ArrayRef {
Arc::new(arrow::array::Int32Array::from(vec![val; len]))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,6 @@ mod tests {
use datafusion_physical_expr::execution_props::ExecutionProps;

use chrono::{DateTime, TimeZone, Utc};
use datafusion_physical_expr::functions::columnar_values_to_array;

// ------------------------------
// --- ExprSimplifier tests -----
Expand Down Expand Up @@ -1461,7 +1460,7 @@ mod tests {
let return_type = Arc::new(DataType::Int32);

let fun = Arc::new(|args: &[ColumnarValue]| {
let args = columnar_values_to_array(args)?;
let args = ColumnarValue::values_to_arrays(args)?;

let arg0 = as_int32_array(&args[0])?;
let arg1 = as_int32_array(&args[1])?;
Expand Down
44 changes: 2 additions & 42 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,49 +173,9 @@ pub(crate) enum Hint {
AcceptsSingular,
}

/// A helper function used to infer the length of arguments of Scalar functions and convert
/// [`ColumnarValue`]s to [`ArrayRef`]s with the inferred length. Note that this function
/// only works for functions that accept either that all arguments are scalars or all arguments
/// are arrays with same length. Otherwise, it will return an error.
#[deprecated(since = "36.0.0", note = "Use ColumarValue::values_to_arrays instead")]
pub fn columnar_values_to_array(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
if args.is_empty() {
return Ok(vec![]);
}

let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) if acc.is_none() => Some(1),
ColumnarValue::Scalar(_) => {
if let Some(1) = acc {
acc
} else {
None
}
}
ColumnarValue::Array(a) => {
if let Some(l) = acc {
if l == a.len() {
acc
} else {
None
}
} else {
Some(a.len())
}
}
});

let inferred_length = len.ok_or(DataFusionError::Internal(
"Arguments has mixed length".to_string(),
))?;

let args = args
.iter()
.map(|arg| arg.clone().into_array(inferred_length))
.collect::<Result<Vec<_>>>()?;

Ok(args)
ColumnarValue::values_to_arrays(args)
}

/// Decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function
Expand Down
Loading