Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interval Arithmetic NegativeExpr Support #7804

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,8 @@ impl ScalarValue {
| ScalarValue::Int16(None)
| ScalarValue::Int32(None)
| ScalarValue::Int64(None)
| ScalarValue::Float32(None) => Ok(self.clone()),
| ScalarValue::Float32(None)
| ScalarValue::Float64(None) => Ok(self.clone()),
ScalarValue::Float64(Some(v)) => Ok(ScalarValue::Float64(Some(-v))),
ScalarValue::Float32(Some(v)) => Ok(ScalarValue::Float32(Some(-v))),
ScalarValue::Int8(Some(v)) => Ok(ScalarValue::Int8(Some(-v))),
Expand Down
13 changes: 9 additions & 4 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,7 @@ mod tests_statistical {

#[cfg(test)]
mod util_tests {
use arrow_schema::{DataType, Field, Schema};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr};
use datafusion_physical_expr::intervals::utils::check_support;
Expand All @@ -1181,26 +1182,30 @@ mod util_tests {

#[test]
fn check_expr_supported() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let supported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr));
assert!(check_support(&supported_expr, &schema));
let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
assert!(check_support(&supported_expr_2));
assert!(check_support(&supported_expr_2, &schema));
let unsupported_expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(Column::new("a", 0)),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr));
assert!(!check_support(&unsupported_expr, &schema));
let unsupported_expr_2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Or,
Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
)) as Arc<dyn PhysicalExpr>;
assert!(!check_support(&unsupported_expr_2));
assert!(!check_support(&unsupported_expr_2, &schema));
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub fn check_finiteness_requirements(
/// [`Operator`]: datafusion_expr::Operator
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
check_support(filter.expression())
check_support(filter.expression(), &join.schema())
&& filter
.schema()
.fields()
Expand Down
73 changes: 66 additions & 7 deletions datafusion/physical-expr/src/expressions/negative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@

//! Negation (-) expression

use std::any::Any;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::intervals::Interval;
use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::PhysicalExpr;

use arrow::{
compute::kernels::numeric::neg_wrapping,
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};

use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_expr::{
type_coercion::{is_interval, is_null, is_signed_numeric},
ColumnarValue,
};

use std::any::Any;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// Negative expression
#[derive(Debug, Hash)]
pub struct NegativeExpr {
Expand Down Expand Up @@ -105,6 +104,30 @@ impl PhysicalExpr for NegativeExpr {
self.hash(&mut s);
}

/// Given the child interval of a NegativeExpr, it calculates the NegativeExpr's interval.
/// It replaces the upper and lower bounds after multiplying them with -1.
/// Ex: `(a, b] => [-b, -a)``
fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
Ok(Interval::new(
children[0].upper.negate()?,
children[0].lower.negate()?,
))
}

/// Returns a new [`Interval`] of a NegativeExpr that has the existing `interval` given that
/// given the input interval is known to be `children`.
fn propagate_constraints(
&self,
interval: &Interval,
children: &[&Interval],
) -> Result<Vec<Option<Interval>>> {
let child_interval = children[0];
let negated_interval =
Interval::new(interval.upper.negate()?, interval.lower.negate()?);

Ok(vec![child_interval.intersect(negated_interval)?])
}

/// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-children[0]
Expand Down Expand Up @@ -144,7 +167,10 @@ pub fn negative(
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::col;
use crate::{
expressions::{col, Column},
intervals::Interval,
};
#[allow(unused_imports)]
use arrow::array::*;
use arrow::datatypes::*;
Expand Down Expand Up @@ -187,4 +213,37 @@ mod tests {
test_array_negative_op!(Float64, 23456.0f64, 12345.0f64);
Ok(())
}

#[test]
fn test_evaluate_bounds() -> Result<()> {
let negative_expr = NegativeExpr {
arg: Arc::new(Column::new("a", 0)),
};
let child_interval = Interval::make(Some(-2), Some(1), (true, false));
let negative_expr_interval = Interval::make(Some(-1), Some(2), (false, true));
assert_eq!(
negative_expr.evaluate_bounds(&[&child_interval])?,
negative_expr_interval
);
Ok(())
}

#[test]
fn test_propagate_constraints() -> Result<()> {
let negative_expr = NegativeExpr {
arg: Arc::new(Column::new("a", 0)),
};
let original_child_interval = Interval::make(Some(-2), Some(3), (false, false));
let negative_expr_interval = Interval::make(Some(0), Some(4), (true, false));
let after_propagation =
vec![Some(Interval::make(Some(-2), Some(0), (false, true)))];
assert_eq!(
negative_expr.propagate_constraints(
&negative_expr_interval,
&[&original_child_interval]
)?,
after_propagation
);
Ok(())
}
}
20 changes: 14 additions & 6 deletions datafusion/physical-expr/src/intervals/interval_aritmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

//! Interval arithmetic library

use std::borrow::Borrow;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::ops::{AddAssign, SubAssign};

use crate::aggregate::min_max::{max, min};
use crate::intervals::rounding::{alter_fp_rounding_mode, next_down, next_up};

use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::DataType;
use arrow_array::ArrowNativeTypeOp;
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::type_coercion::binary::get_result_type;
use datafusion_expr::Operator;

use std::borrow::Borrow;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::ops::{AddAssign, SubAssign};

/// This type represents a single endpoint of an [`Interval`]. An
/// endpoint can be open (does not include the endpoint) or closed
/// (includes the endpoint).
Expand Down Expand Up @@ -87,6 +86,15 @@ impl IntervalBound {
.map(|value| IntervalBound::new(value, self.open))
}

/// Returns a new bound with a negated value, if any, and the same open/closed.
/// For example negating `[5` would return `[-5`, or `-1)` would return `1)`
pub fn negate(&self) -> Result<IntervalBound> {
berkaysynnada marked this conversation as resolved.
Show resolved Hide resolved
self.value.arithmetic_negate().map(|value| IntervalBound {
value,
open: self.open,
})
}

/// This function adds the given `IntervalBound` to this `IntervalBound`.
/// The result is unbounded if either is; otherwise, their values are
/// added. The result is closed if both original bounds are closed, or open
Expand Down
39 changes: 27 additions & 12 deletions datafusion/physical-expr/src/intervals/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

//! Utility functions for the interval arithmetic library

use std::sync::Arc;

use super::{Interval, IntervalBound};
use crate::{
expressions::{BinaryExpr, CastExpr, Column, Literal},
expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
PhysicalExpr,
};

use arrow_schema::DataType;
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Operator;

use std::sync::Arc;

const MDN_DAY_MASK: i128 = 0xFFFF_FFFF_0000_0000_0000_0000;
const MDN_NS_MASK: i128 = 0xFFFF_FFFF_FFFF_FFFF;
const DT_MS_MASK: i64 = 0xFFFF_FFFF;
Expand All @@ -37,16 +36,32 @@ const DT_MS_MASK: i64 = 0xFFFF_FFFF;
/// Currently, we do not support all [`PhysicalExpr`]s for interval calculations.
/// We do not support every type of [`Operator`]s either. Over time, this check
/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
/// Currently, [`CastExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
pub fn check_support(expr: &Arc<dyn PhysicalExpr>) -> bool {
/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While reviewing this PR I noticed that to add support for interval analysis to a PhysicalExpr there are at least two places that need to be changed. The impl PhysicalExpr and check_support. Besides making it somewhat harder to add interval analysis to new expression types, I also think it means that custom impls of PhysicalExpr can't be used in interval analysis.

What do you think about making it possible for custom PhysicalExprs to use interval analysis (likely by moving some part of this check into the impl PhysicalExpr)? I can file a follow on ticket if you like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn supports_interval_analysis(&self) -> bool {
        false
}

Adding such method in impl PhysicalExpr is what you're thinking? If it is so, I can quickly convert it to that form.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding such method in impl PhysicalExpr is what you're thinking? If it is so, I can quickly convert it to that form.

Yes that would be one way. Another potential might be to change the signature of evaluate_bounds to an Option so that multiple functions didn't need to kept in sync

 fn evaluate_bounds(&self, _children: &[&Interval]) -> Result<Option<Interval>> {
        Ok(None)
    }

To be clear, I think we should merge this PR as is -- maybe with documentation updates -- and then make this change as a follow on PR. That way we will unblock #7793 faster

pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool {
let expr_any = expr.as_any();
let expr_supported = if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>()
{
if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() {
is_operator_supported(binary_expr.op())
&& check_support(binary_expr.left(), schema)
&& check_support(binary_expr.right(), schema)
} else if let Some(column) = expr_any.downcast_ref::<Column>() {
if let Ok(field) = schema.field_with_name(column.name()) {
is_datatype_supported(field.data_type())
} else {
return false;
}
} else if let Some(literal) = expr_any.downcast_ref::<Literal>() {
if let Ok(dt) = literal.data_type(schema) {
is_datatype_supported(&dt)
} else {
return false;
}
} else if let Some(cast) = expr_any.downcast_ref::<CastExpr>() {
check_support(cast.expr(), schema)
} else if let Some(negative) = expr_any.downcast_ref::<NegativeExpr>() {
check_support(negative.arg(), schema)
} else {
expr_any.is::<Column>() || expr_any.is::<Literal>() || expr_any.is::<CastExpr>()
};
expr_supported && expr.children().iter().all(check_support)
false
}
}

// This function returns the inverse operator of the given operator.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl ExecutionPlan for FilterExec {
fn statistics(&self) -> Statistics {
let predicate = self.predicate();

if !check_support(predicate) {
if !check_support(predicate, &self.schema()) {
return Statistics::default();
}

Expand Down