Skip to content

Commit

Permalink
Interval Arithmetic NegativeExpr Support (#7804)
Browse files Browse the repository at this point in the history
* Intervals can propagate over NegativeExpr's.

* Addressing doc string reviews
  • Loading branch information
berkaysynnada authored Oct 12, 2023
1 parent 1bfe740 commit 1f4442e
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 32 deletions.
3 changes: 2 additions & 1 deletion datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,8 @@ impl ScalarValue {
| ScalarValue::Int16(None)
| ScalarValue::Int32(None)
| ScalarValue::Int64(None)
| ScalarValue::Float32(None) => Ok(self.clone()),
| ScalarValue::Float32(None)
| ScalarValue::Float64(None) => Ok(self.clone()),
ScalarValue::Float64(Some(v)) => Ok(ScalarValue::Float64(Some(-v))),
ScalarValue::Float32(Some(v)) => Ok(ScalarValue::Float32(Some(-v))),
ScalarValue::Int8(Some(v)) => Ok(ScalarValue::Int8(Some(-v))),
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,7 @@ mod tests_statistical {

#[cfg(test)]
mod util_tests {
use arrow_schema::{DataType, Field, Schema};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr};
use datafusion_physical_expr::intervals::utils::check_support;
Expand All @@ -1181,26 +1182,30 @@ mod util_tests {

#[test]
fn check_expr_supported() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let supported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr));
assert!(check_support(&supported_expr, &schema));
let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr_2));
assert!(check_support(&supported_expr_2, &schema));
let unsupported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr));
assert!(!check_support(&unsupported_expr, &schema));
let unsupported_expr_2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr_2));
assert!(!check_support(&unsupported_expr_2, &schema));
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub fn check_finiteness_requirements(
/// [`Operator`]: datafusion_expr::Operator
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
check_support(filter.expression())
check_support(filter.expression(), &join.schema())
&& filter
.schema()
.fields()
Expand Down
73 changes: 66 additions & 7 deletions datafusion/physical-expr/src/expressions/negative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@

//! Negation (-) expression

use std::any::Any;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::intervals::Interval;
use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;

use arrow::{
compute::kernels::numeric::neg_wrapping,
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};

use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::{
type_coercion::{is_interval, is_null, is_signed_numeric},
ColumnarValue,
};

use std::any::Any;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// Negative expression
#[derive(Debug, Hash)]
pub struct NegativeExpr {
Expand Down Expand Up @@ -105,6 +104,30 @@ impl PhysicalExpr for NegativeExpr {
self.hash(&mut s);
}

/// Given the child interval of a NegativeExpr, it calculates the NegativeExpr's interval.
/// It replaces the upper and lower bounds after multiplying them with -1.
/// Ex: `(a, b] => [-b, -a)``
fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
Ok(Interval::new(
children[0].upper.negate()?,
children[0].lower.negate()?,
))
}

/// Returns a new [`Interval`] of a NegativeExpr that has the existing `interval` given that
/// given the input interval is known to be `children`.
fn propagate_constraints(
&self,
interval: &Interval,
children: &[&Interval],
) -> Result<Vec<Option<Interval>>> {
let child_interval = children[0];
let negated_interval =
Interval::new(interval.upper.negate()?, interval.lower.negate()?);

Ok(vec![child_interval.intersect(negated_interval)?])
}

/// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-children[0]
Expand Down Expand Up @@ -144,7 +167,10 @@ pub fn negative(
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::col;
use crate::{
expressions::{col, Column},
intervals::Interval,
};
#[allow(unused_imports)]
use arrow::array::*;
use arrow::datatypes::*;
Expand Down Expand Up @@ -187,4 +213,37 @@ mod tests {
test_array_negative_op!(Float64, 23456.0f64, 12345.0f64);
Ok(())
}

#[test]
fn test_evaluate_bounds() -> Result<()> {
let negative_expr = NegativeExpr {
arg: Arc::new(Column::new("a", 0)),
};
let child_interval = Interval::make(Some(-2), Some(1), (true, false));
let negative_expr_interval = Interval::make(Some(-1), Some(2), (false, true));
assert_eq!(
negative_expr.evaluate_bounds(&[&child_interval])?,
negative_expr_interval
);
Ok(())
}

#[test]
fn test_propagate_constraints() -> Result<()> {
let negative_expr = NegativeExpr {
arg: Arc::new(Column::new("a", 0)),
};
let original_child_interval = Interval::make(Some(-2), Some(3), (false, false));
let negative_expr_interval = Interval::make(Some(0), Some(4), (true, false));
let after_propagation =
vec![Some(Interval::make(Some(-2), Some(0), (false, true)))];
assert_eq!(
negative_expr.propagate_constraints(
&negative_expr_interval,
&[&original_child_interval]
)?,
after_propagation
);
Ok(())
}
}
20 changes: 14 additions & 6 deletions datafusion/physical-expr/src/intervals/interval_aritmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

//! Interval arithmetic library

use std::borrow::Borrow;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::ops::{AddAssign, SubAssign};

use crate::aggregate::min_max::{max, min};
use crate::intervals::rounding::{alter_fp_rounding_mode, next_down, next_up};

use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::DataType;
use arrow_array::ArrowNativeTypeOp;
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::type_coercion::binary::get_result_type;
use datafusion_expr::Operator;

use std::borrow::Borrow;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::ops::{AddAssign, SubAssign};

/// This type represents a single endpoint of an [`Interval`]. An
/// endpoint can be open (does not include the endpoint) or closed
/// (includes the endpoint).
Expand Down Expand Up @@ -87,6 +86,15 @@ impl IntervalBound {
.map(|value| IntervalBound::new(value, self.open))
}

/// Returns a new bound with a negated value, if any, and the same open/closed.
/// For example negating `[5` would return `[-5`, or `-1)` would return `1)`
pub fn negate(&self) -> Result<IntervalBound> {
self.value.arithmetic_negate().map(|value| IntervalBound {
value,
open: self.open,
})
}

/// This function adds the given `IntervalBound` to this `IntervalBound`.
/// The result is unbounded if either is; otherwise, their values are
/// added. The result is closed if both original bounds are closed, or open
Expand Down
39 changes: 27 additions & 12 deletions datafusion/physical-expr/src/intervals/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

//! Utility functions for the interval arithmetic library

use std::sync::Arc;

use super::{Interval, IntervalBound};
use crate::{
expressions::{BinaryExpr, CastExpr, Column, Literal},
expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
PhysicalExpr,
};

use arrow_schema::DataType;
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Operator;

use std::sync::Arc;

const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000;
const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF;
const DT_MS_MASK: i64 = 0xFFFF_FFFF;
Expand All @@ -37,16 +36,32 @@ const DT_MS_MASK: i64 = 0xFFFF_FFFF;
/// Currently, we do not support all [`PhysicalExpr`]s for interval calculations.
/// We do not support every type of [`Operator`]s either. Over time, this check
/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
/// Currently, [`CastExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
pub fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool {
let expr_any = expr.as_any();
let expr_supported = if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>()
{
if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() {
is_operator_supported(binary_expr.op())
&& check_support(binary_expr.left(), schema)
&& check_support(binary_expr.right(), schema)
} else if let Some(column) = expr_any.downcast_ref::<Column>() {
if let Ok(field) = schema.field_with_name(column.name()) {
is_datatype_supported(field.data_type())
} else {
return false;
}
} else if let Some(literal) = expr_any.downcast_ref::<Literal>() {
if let Ok(dt) = literal.data_type(schema) {
is_datatype_supported(&dt)
} else {
return false;
}
} else if let Some(cast) = expr_any.downcast_ref::<CastExpr>() {
check_support(cast.expr(), schema)
} else if let Some(negative) = expr_any.downcast_ref::<NegativeExpr>() {
check_support(negative.arg(), schema)
} else {
expr_any.is::<Column>() || expr_any.is::<Literal>() || expr_any.is::<CastExpr>()
};
expr_supported && expr.children().iter().all(check_support)
false
}
}

// This function returns the inverse operator of the given operator.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl ExecutionPlan for FilterExec {
fn statistics(&self) -> Statistics {
let predicate = self.predicate();

if !check_support(predicate) {
if !check_support(predicate, &self.schema()) {
return Statistics::default();
}

Expand Down

0 comments on commit 1f4442e

Please sign in to comment.