Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 23, 2019
1 parent 8087271 commit b95bcde
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 217 deletions.
345 changes: 130 additions & 215 deletions rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,8 @@ use std::sync::Arc;

use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::logicalplan::{ScalarValue, Operator};
use arrow::array::{
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::array::{
Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
Int8Builder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
};

use arrow::compute;
use arrow::array::{ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array, UInt32Builder};
use arrow::compute::kernels::comparison::{lt, lt_eq, gt, gt_eq, eq, neq};
use crate::execution::physical_plan::{
Accumulator, AggregateExpr, PhysicalExpr, PhysicalExprRef,
};
use crate::logicalplan::{Operator, ScalarValue};

use arrow::array::{
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
Expand All @@ -48,6 +33,7 @@ use arrow::array::{
Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
Int8Builder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder,
};

use arrow::compute::kernels::boolean::{and, or};
use arrow::compute::kernels::cast::cast;
use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
Expand Down Expand Up @@ -291,6 +277,134 @@ pub fn lit(value: ScalarValue) -> Arc<dyn PhysicalExpr> {
Arc::new(Literal::new(value))
}

/// Invoke a compute kernel on a pair of arrays
macro_rules! compute_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT.as_any().downcast_ref::<$DT>().unwrap();
let rr = $RIGHT.as_any().downcast_ref::<$DT>().unwrap();
Ok(Arc::new($OP(&ll, &rr)?))
}};
}

/// Invoke a compute kernel on a pair of arrays
macro_rules! compute_op_2 {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_op!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_op!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_op!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_op!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_op!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
other => Err(ExecutionError::General(format!(
"Unsupported data type {:?}",
other
))),
}
}};
}

/// Invoke a boolean kernel on a pair of arrays
macro_rules! boolean_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let ll = $LEFT.as_any().downcast_ref::<BooleanArray>().unwrap();
let rr = $RIGHT.as_any().downcast_ref::<BooleanArray>().unwrap();
Ok(Arc::new($OP(&ll, &rr)?))
}};
}
/// Binary expression
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
}

impl BinaryExpr {
/// Create new binary expression
pub fn new(
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
) -> Self {
Self { left, op, right }
}
}

impl PhysicalExpr for BinaryExpr {
fn name(&self) -> String {
format!("{:?}", self.op)
}

fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.left.data_type(input_schema)
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let left = self.left.evaluate(batch)?;
let right = self.right.evaluate(batch)?;
if left.data_type() != right.data_type() {
return Err(ExecutionError::General(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op,
left.data_type(),
right.data_type()
)));
}
match &self.op {
Operator::Lt => compute_op_2!(left, right, lt),
Operator::LtEq => compute_op_2!(left, right, lt_eq),
Operator::Gt => compute_op_2!(left, right, gt),
Operator::GtEq => compute_op_2!(left, right, gt_eq),
Operator::Eq => compute_op_2!(left, right, eq),
Operator::NotEq => compute_op_2!(left, right, neq),
Operator::And => boolean_op!(left, right, and),
Operator::Or => boolean_op!(left, right, or),
_ => Err(ExecutionError::General("Unsupported operator".to_string())),
}
}
}

/// CAST expression casts an expression to a specific data type
pub struct CastExpr {
expr: Arc<dyn PhysicalExpr>,
data_type: DataType,
}

impl CastExpr {
/// Create a CAST expression
pub fn new(expr: Arc<dyn PhysicalExpr>, data_type: DataType) -> Self {
Self { expr, data_type }
}
}

impl PhysicalExpr for CastExpr {
fn name(&self) -> String {
"CAST".to_string()
}

fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(self.data_type.clone())
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let value = self.expr.evaluate(batch)?;
Ok(cast(&value, &self.data_type)?)
}
}

/// Create a binary expression
pub fn binary(
l: Arc<dyn PhysicalExpr>,
op: Operator,
r: Arc<dyn PhysicalExpr>,
) -> Arc<dyn PhysicalExpr> {
Arc::new(BinaryExpr::new(l, op, r))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -415,202 +529,3 @@ mod tests {
accum.get_value()
}
}

/// Represents a literal value
pub struct Literal {
value: ScalarValue,
}

impl Literal {
/// Create a literal value expression
pub fn new(value: ScalarValue) -> Self {
Self { value }
}
}

macro_rules! build_literal_array {
($BATCH:ident, $BUILDER:ident, $VALUE:expr) => {{
let mut builder = $BUILDER::new($BATCH.num_rows());
for _ in 0..$BATCH.num_rows() {
builder.append_value($VALUE)?;
}
Ok(Arc::new(builder.finish()))
}};
}

impl PhysicalExpr for Literal {
fn name(&self) -> String {
"lit".to_string()
}

fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(self.value.get_datatype())
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
match &self.value {
ScalarValue::Int8(value) => build_literal_array!(batch, Int8Builder, *value),
ScalarValue::Int16(value) => {
build_literal_array!(batch, Int16Builder, *value)
}
ScalarValue::Int32(value) => {
build_literal_array!(batch, Int32Builder, *value)
}
ScalarValue::Int64(value) => {
build_literal_array!(batch, Int64Builder, *value)
}
ScalarValue::UInt8(value) => {
build_literal_array!(batch, UInt8Builder, *value)
}
ScalarValue::UInt16(value) => {
build_literal_array!(batch, UInt16Builder, *value)
}
ScalarValue::UInt32(value) => {
build_literal_array!(batch, UInt32Builder, *value)
}
ScalarValue::UInt64(value) => {
build_literal_array!(batch, UInt64Builder, *value)
}
ScalarValue::Float32(value) => {
build_literal_array!(batch, Float32Builder, *value)
}
ScalarValue::Float64(value) => {
build_literal_array!(batch, Float64Builder, *value)
}
other => Err(ExecutionError::General(format!(
"Unsupported literal type {:?}",
other
))),
}
}
}

/// Invoke a compute kernel on a pair of arrays
macro_rules! compute_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT.as_any().downcast_ref::<$DT>().unwrap();
let rr = $RIGHT.as_any().downcast_ref::<$DT>().unwrap();
Ok(Arc::new($OP(&ll, &rr)?))
}};
}

/// Invoke a compute kernel on a pair of arrays
macro_rules! compute_op_2 {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_op!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_op!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_op!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_op!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_op!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
other => Err(ExecutionError::General(format!(
"Unsupported data type {:?}",
other
))),
}
}};
}

/// Invoke a boolean kernel on a pair of arrays
macro_rules! boolean_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let ll = $LEFT.as_any().downcast_ref::<BooleanArray>().unwrap();
let rr = $RIGHT.as_any().downcast_ref::<BooleanArray>().unwrap();
Ok(Arc::new($OP(&ll, &rr)?))
}};
}
/// Binary expression
pub struct BinaryExpr {
left: PhysicalExprRef,
op: Operator,
right: PhysicalExprRef,
}

impl BinaryExpr {
/// Create new binary expression
pub fn new(left: PhysicalExprRef, op: Operator, right: PhysicalExprRef) -> Self {
Self { left, op, right }
}
}

impl PhysicalExpr for BinaryExpr {
fn name(&self) -> String {
format!("{:?}", self.op)
}

fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.left.data_type(input_schema)
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let left = self.left.evaluate(batch)?;
let right = self.right.evaluate(batch)?;
if left.data_type() != right.data_type() {
return Err(ExecutionError::General(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op,
left.data_type(),
right.data_type()
)));
}
match &self.op {
Operator::Lt => compute_op_2!(left, right, lt),
Operator::LtEq => compute_op_2!(left, right, lt_eq),
Operator::Gt => compute_op_2!(left, right, gt),
Operator::GtEq => compute_op_2!(left, right, gt_eq),
Operator::Eq => compute_op_2!(left, right, eq),
Operator::NotEq => compute_op_2!(left, right, neq),
Operator::And => boolean_op!(left, right, and),
Operator::Or => boolean_op!(left, right, or),
_ => Err(ExecutionError::General("Unsupported operator".to_string())),
}
}
}

/// CAST expression casts an expression to a specific data type
pub struct CastExpr {
expr: PhysicalExprRef,
data_type: DataType,
}

impl CastExpr {
/// Create a CAST expression
pub fn new(expr: PhysicalExprRef, data_type: DataType) -> Self {
Self { expr, data_type }
}
}

impl PhysicalExpr for CastExpr {
fn name(&self) -> String {
"CAST".to_string()
}

fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(self.data_type.clone())
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let value = self.expr.evaluate(batch)?;
Ok(cast(&value, &self.data_type)?)
}
}

/// Create a column expression
pub fn col(i: usize) -> PhysicalExprRef {
Arc::new(Column::new(i))
}

/// Create a literal expression
pub fn lit(value: ScalarValue) -> PhysicalExprRef {
Arc::new(Literal::new(value))
}

/// Create a binary expression
pub fn binary(l: PhysicalExprRef, op: Operator, r: PhysicalExprRef) -> PhysicalExprRef {
Arc::new(BinaryExpr::new(l, op, r))
}
2 changes: 0 additions & 2 deletions rust/datafusion/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ pub trait Accumulator {
}

pub mod common;
type PhysicalExprRef = Arc<dyn PhysicalExpr>;

pub mod csv;
pub mod datasource;
pub mod expressions;
Expand Down

0 comments on commit b95bcde

Please sign in to comment.