Skip to content

Commit

Permalink
Incorporate dyn scalar kernels (#1685)
Browse files Browse the repository at this point in the history
* Rebase

* impl ToNumeric for ScalarValue

* Update macro to be based on

* Add floats

* Cleanup

* Newline
  • Loading branch information
matthewmturner authored Jan 30, 2022
1 parent fecce97 commit 3494e9c
Showing 1 changed file with 114 additions and 8 deletions.
122 changes: 114 additions & 8 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::TryInto;
use std::{any::Any, sync::Arc};

use arrow::array::TimestampMillisecondArray;
Expand All @@ -29,6 +30,18 @@ use arrow::compute::kernels::comparison::{
eq_bool, eq_bool_scalar, gt_bool, gt_bool_scalar, gt_eq_bool, gt_eq_bool_scalar,
lt_bool, lt_bool_scalar, lt_eq_bool, lt_eq_bool_scalar, neq_bool, neq_bool_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_bool_scalar, gt_dyn_bool_scalar, gt_eq_dyn_bool_scalar, lt_dyn_bool_scalar,
lt_eq_dyn_bool_scalar, neq_dyn_bool_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_scalar, gt_dyn_scalar, gt_eq_dyn_scalar, lt_dyn_scalar, lt_eq_dyn_scalar,
neq_dyn_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
};
use arrow::compute::kernels::comparison::{
eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar,
};
Expand Down Expand Up @@ -430,6 +443,23 @@ macro_rules! compute_utf8_op_scalar {
}};
}

/// Invoke a compute kernel on a data array and a scalar value
macro_rules! compute_utf8_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
if let Some(string_value) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_utf8_scalar>]}(
$LEFT,
&string_value,
)?))
} else {
Err(DataFusionError::Internal(format!(
"compute_utf8_op_scalar for '{}' failed with literal 'none' value",
stringify!($OP),
)))
}
}};
}

/// Invoke a compute kernel on a boolean data array and a scalar value
macro_rules! compute_bool_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
Expand All @@ -447,6 +477,25 @@ macro_rules! compute_bool_op_scalar {
}};
}

/// Invoke a compute kernel on a boolean data array and a scalar value
macro_rules! compute_bool_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
// generate the scalar function name, such as lt_dyn_bool_scalar, from the $OP parameter
// (which could have a value of lt) and the suffix _scalar
if let Some(b) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_bool_scalar>]}(
$LEFT,
b,
)?))
} else {
Err(DataFusionError::Internal(format!(
"compute_utf8_op_scalar for '{}' failed with literal 'none' value",
stringify!($OP),
)))
}
}};
}

/// Invoke a bool compute kernel on array(s)
macro_rules! compute_bool_op {
// invoke binary operator
Expand Down Expand Up @@ -475,7 +524,6 @@ macro_rules! compute_bool_op {
/// LEFT is array, RIGHT is scalar value
macro_rules! compute_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
use std::convert::TryInto;
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
Expand All @@ -489,6 +537,26 @@ macro_rules! compute_op_scalar {
}};
}

/// Invoke a dyn compute kernel on a data array and a scalar value
/// LEFT is Primitive or Dictionart array of numeric values, RIGHT is scalar value
macro_rules! compute_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
// generate the scalar function name, such as lt_dyn_scalar, from the $OP parameter
// (which could have a value of lt_dyn) and the suffix _scalar
if let Some(value) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_scalar>]}(
$LEFT,
value,
)?))
} else {
Err(DataFusionError::Internal(format!(
"compute_utf8_op_scalar for '{}' failed with literal 'none' value",
stringify!($OP),
)))
}
}};
}

/// Invoke a compute kernel on array(s)
macro_rules! compute_op {
// invoke binary operator
Expand Down Expand Up @@ -879,26 +947,64 @@ impl PhysicalExpr for BinaryExpr {
}
}

/// The binary_array_op_dyn_scalar macro includes types that extend beyond the primitive,
/// such as Utf8 strings.
#[macro_export]
macro_rules! binary_array_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let result: Result<Arc<dyn Array>> = match $RIGHT {
ScalarValue::Boolean(b) => compute_bool_op_dyn_scalar!($LEFT, b, $OP),
ScalarValue::Decimal128(..) => compute_decimal_op_scalar!($LEFT, $RIGHT, $OP, DecimalArray),
ScalarValue::Utf8(v) => compute_utf8_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::LargeUtf8(v) => compute_utf8_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::Int8(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::Int16(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::Int32(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::Int64(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::UInt8(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::UInt16(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::UInt32(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::UInt64(v) => compute_op_dyn_scalar!($LEFT, v, $OP),
ScalarValue::Float32(_) => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array),
ScalarValue::Float64(_) => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array),
ScalarValue::Date32(_) => compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array),
ScalarValue::Date64(_) => compute_op_scalar!($LEFT, $RIGHT, $OP, Date64Array),
ScalarValue::TimestampSecond(..) => compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray),
ScalarValue::TimestampMillisecond(..) => compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray),
ScalarValue::TimestampMicrosecond(..) => compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray),
ScalarValue::TimestampNanosecond(..) => compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray),
other => Err(DataFusionError::Internal(format!("Data type {:?} not supported for scalar operation '{}' on dyn array", other, stringify!($OP))))
};
Some(result)
}}
}

impl BinaryExpr {
/// Evaluate the expression of the left input is an array and
/// right is literal - use scalar operations
fn evaluate_array_scalar(
&self,
array: &ArrayRef,
array: &dyn Array,
scalar: &ScalarValue,
) -> Result<Option<Result<ArrayRef>>> {
let scalar_result = match &self.op {
Operator::Lt => binary_array_op_scalar!(array, scalar.clone(), lt),
Operator::Lt => {
binary_array_op_dyn_scalar!(array, scalar.clone(), lt)
}
Operator::LtEq => {
binary_array_op_scalar!(array, scalar.clone(), lt_eq)
binary_array_op_dyn_scalar!(array, scalar.clone(), lt_eq)
}
Operator::Gt => {
binary_array_op_dyn_scalar!(array, scalar.clone(), gt)
}
Operator::Gt => binary_array_op_scalar!(array, scalar.clone(), gt),
Operator::GtEq => {
binary_array_op_scalar!(array, scalar.clone(), gt_eq)
binary_array_op_dyn_scalar!(array, scalar.clone(), gt_eq)
}
Operator::Eq => {
binary_array_op_dyn_scalar!(array, scalar.clone(), eq)
}
Operator::Eq => binary_array_op_scalar!(array, scalar.clone(), eq),
Operator::NotEq => {
binary_array_op_scalar!(array, scalar.clone(), neq)
binary_array_op_dyn_scalar!(array, scalar.clone(), neq)
}
Operator::Like => {
binary_string_array_op_scalar!(array, scalar.clone(), like)
Expand Down

0 comments on commit 3494e9c

Please sign in to comment.