-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make PruningPredicate's rewrite public #12850
Merged
+188
−24
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
9c49413
Make PruningPredicate's rewrite public
adriangb d249584
feedback
adriangb e8dba08
Improve documentation and add default to ConstantUnhandledPredicatehook
alamb e823c88
Merge pull request #3 from alamb/alamb/unhandled_hook
adriangb c438fd0
Update pruning.rs
adriangb File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -478,6 +478,37 @@ pub struct PruningPredicate { | |||||
literal_guarantees: Vec<LiteralGuarantee>, | ||||||
} | ||||||
|
||||||
/// Hook to handle predicates that DataFusion can not handle, e.g. certain complex expressions | ||||||
/// or predicates that reference columns that are not in the schema. | ||||||
pub trait UnhandledPredicateHook { | ||||||
/// Called when a predicate can not be handled by DataFusion's transformation rules | ||||||
/// or is referencing a column that is not in the schema. | ||||||
fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>; | ||||||
} | ||||||
|
||||||
#[derive(Debug, Clone)] | ||||||
struct ConstantUnhandledPredicateHook { | ||||||
default: Arc<dyn PhysicalExpr>, | ||||||
} | ||||||
|
||||||
impl ConstantUnhandledPredicateHook { | ||||||
fn new(default: Arc<dyn PhysicalExpr>) -> Self { | ||||||
Self { default } | ||||||
} | ||||||
} | ||||||
|
||||||
impl UnhandledPredicateHook for ConstantUnhandledPredicateHook { | ||||||
fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> { | ||||||
self.default.clone() | ||||||
} | ||||||
} | ||||||
|
||||||
fn default_unhandled_hook() -> Arc<dyn UnhandledPredicateHook> { | ||||||
Arc::new(ConstantUnhandledPredicateHook::new(Arc::new( | ||||||
phys_expr::Literal::new(ScalarValue::Boolean(Some(true))), | ||||||
))) | ||||||
} | ||||||
|
||||||
impl PruningPredicate { | ||||||
/// Try to create a new instance of [`PruningPredicate`] | ||||||
/// | ||||||
|
@@ -502,10 +533,16 @@ impl PruningPredicate { | |||||
/// See the struct level documentation on [`PruningPredicate`] for more | ||||||
/// details. | ||||||
pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> { | ||||||
let unhandled_hook = default_unhandled_hook(); | ||||||
|
||||||
// build predicate expression once | ||||||
let mut required_columns = RequiredColumns::new(); | ||||||
let predicate_expr = | ||||||
build_predicate_expression(&expr, schema.as_ref(), &mut required_columns); | ||||||
let predicate_expr = build_predicate_expression( | ||||||
&expr, | ||||||
schema.as_ref(), | ||||||
&mut required_columns, | ||||||
&unhandled_hook, | ||||||
); | ||||||
|
||||||
let literal_guarantees = LiteralGuarantee::analyze(&expr); | ||||||
|
||||||
|
@@ -1316,23 +1353,43 @@ const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20; | |||||
/// expression that will evaluate to FALSE if it can be determined no | ||||||
/// rows between the min/max values could pass the predicates. | ||||||
/// | ||||||
/// Any predicates that can not be translated will be passed to `unhandled_hook`. | ||||||
/// | ||||||
/// Returns the pruning predicate as an [`PhysicalExpr`] | ||||||
/// | ||||||
/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will be rewritten to TRUE | ||||||
/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook` | ||||||
pub fn rewrite_predicate_to_statistics_predicate( | ||||||
expr: &Arc<dyn PhysicalExpr>, | ||||||
schema: &Schema, | ||||||
unhandled_hook: Option<Arc<dyn UnhandledPredicateHook>>, | ||||||
) -> Arc<dyn PhysicalExpr> { | ||||||
let unhandled_hook = unhandled_hook.unwrap_or(default_unhandled_hook()); | ||||||
|
||||||
let mut required_columns = RequiredColumns::new(); | ||||||
|
||||||
build_predicate_expression(expr, schema, &mut required_columns, &unhandled_hook) | ||||||
} | ||||||
|
||||||
/// Translate logical filter expression into pruning predicate | ||||||
/// expression that will evaluate to FALSE if it can be determined no | ||||||
/// rows between the min/max values could pass the predicates. | ||||||
/// | ||||||
/// Any predicates that can not be translated will be passed to `unhandled_hook`. | ||||||
/// | ||||||
/// Returns the pruning predicate as an [`PhysicalExpr`] | ||||||
/// | ||||||
/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook` | ||||||
fn build_predicate_expression( | ||||||
expr: &Arc<dyn PhysicalExpr>, | ||||||
schema: &Schema, | ||||||
required_columns: &mut RequiredColumns, | ||||||
unhandled_hook: &Arc<dyn UnhandledPredicateHook>, | ||||||
) -> Arc<dyn PhysicalExpr> { | ||||||
// Returned for unsupported expressions. Such expressions are | ||||||
// converted to TRUE. | ||||||
let unhandled = Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true)))); | ||||||
|
||||||
// predicate expression can only be a binary expression | ||||||
let expr_any = expr.as_any(); | ||||||
if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() { | ||||||
return build_is_null_column_expr(is_null.arg(), schema, required_columns, false) | ||||||
.unwrap_or(unhandled); | ||||||
.unwrap_or_else(|| unhandled_hook.handle(expr)); | ||||||
} | ||||||
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() { | ||||||
return build_is_null_column_expr( | ||||||
|
@@ -1341,19 +1398,19 @@ fn build_predicate_expression( | |||||
required_columns, | ||||||
true, | ||||||
) | ||||||
.unwrap_or(unhandled); | ||||||
.unwrap_or_else(|| unhandled_hook.handle(expr)); | ||||||
} | ||||||
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() { | ||||||
return build_single_column_expr(col, schema, required_columns, false) | ||||||
.unwrap_or(unhandled); | ||||||
.unwrap_or_else(|| unhandled_hook.handle(expr)); | ||||||
} | ||||||
if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() { | ||||||
// match !col (don't do so recursively) | ||||||
if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() { | ||||||
return build_single_column_expr(col, schema, required_columns, true) | ||||||
.unwrap_or(unhandled); | ||||||
.unwrap_or_else(|| unhandled_hook.handle(expr)); | ||||||
} else { | ||||||
return unhandled; | ||||||
return unhandled_hook.handle(expr); | ||||||
} | ||||||
} | ||||||
if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() { | ||||||
|
@@ -1382,9 +1439,14 @@ fn build_predicate_expression( | |||||
}) | ||||||
.reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _) | ||||||
.unwrap(); | ||||||
return build_predicate_expression(&change_expr, schema, required_columns); | ||||||
return build_predicate_expression( | ||||||
&change_expr, | ||||||
schema, | ||||||
required_columns, | ||||||
unhandled_hook, | ||||||
); | ||||||
} else { | ||||||
return unhandled; | ||||||
return unhandled_hook.handle(expr); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -1396,21 +1458,23 @@ fn build_predicate_expression( | |||||
bin_expr.right().clone(), | ||||||
) | ||||||
} else { | ||||||
return unhandled; | ||||||
return unhandled_hook.handle(expr); | ||||||
} | ||||||
}; | ||||||
|
||||||
if op == Operator::And || op == Operator::Or { | ||||||
let left_expr = build_predicate_expression(&left, schema, required_columns); | ||||||
let right_expr = build_predicate_expression(&right, schema, required_columns); | ||||||
let left_expr = | ||||||
build_predicate_expression(&left, schema, required_columns, unhandled_hook); | ||||||
let right_expr = | ||||||
build_predicate_expression(&right, schema, required_columns, unhandled_hook); | ||||||
// simplify boolean expression if applicable | ||||||
let expr = match (&left_expr, op, &right_expr) { | ||||||
(left, Operator::And, _) if is_always_true(left) => right_expr, | ||||||
(_, Operator::And, right) if is_always_true(right) => left_expr, | ||||||
(left, Operator::Or, right) | ||||||
if is_always_true(left) || is_always_true(right) => | ||||||
{ | ||||||
unhandled | ||||||
Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true)))) | ||||||
} | ||||||
_ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)), | ||||||
}; | ||||||
|
@@ -1423,12 +1487,11 @@ fn build_predicate_expression( | |||||
Ok(builder) => builder, | ||||||
// allow partial failure in predicate expression generation | ||||||
// this can still produce a useful predicate when multiple conditions are joined using AND | ||||||
Err(_) => { | ||||||
return unhandled; | ||||||
} | ||||||
Err(_) => return unhandled_hook.handle(expr), | ||||||
}; | ||||||
|
||||||
build_statistics_expr(&mut expr_builder).unwrap_or(unhandled) | ||||||
build_statistics_expr(&mut expr_builder) | ||||||
.unwrap_or_else(|_| unhandled_hook.handle(expr)) | ||||||
} | ||||||
|
||||||
fn build_statistics_expr( | ||||||
|
@@ -1582,6 +1645,8 @@ mod tests { | |||||
use arrow_array::UInt64Array; | ||||||
use datafusion_expr::expr::InList; | ||||||
use datafusion_expr::{cast, is_null, try_cast, Expr}; | ||||||
use datafusion_functions_nested::expr_fn::{array_has, make_array}; | ||||||
use datafusion_physical_expr::expressions as phys_expr; | ||||||
use datafusion_physical_expr::planner::logical2physical; | ||||||
|
||||||
#[derive(Debug, Default)] | ||||||
|
@@ -3397,6 +3462,75 @@ mod tests { | |||||
// TODO: add test for other case and op | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_rewrite_expr_to_prunable_custom_unhandled_hook() { | ||||||
struct CustomUnhandledHook; | ||||||
|
||||||
impl UnhandledPredicateHook for CustomUnhandledHook { | ||||||
/// This handles an arbitrary case of a column that doesn't exist in the schema | ||||||
/// by renaming it to yet another column that doesn't exist in the schema | ||||||
/// (the transformation is arbitrary, the point is that it can do whatever it wants) | ||||||
fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> { | ||||||
Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42)))) | ||||||
} | ||||||
} | ||||||
|
||||||
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); | ||||||
let schema_with_b = Schema::new(vec![ | ||||||
Field::new("a", DataType::Int32, true), | ||||||
Field::new("b", DataType::Int32, true), | ||||||
]); | ||||||
|
||||||
let transform_expr = |expr| { | ||||||
let expr = logical2physical(&expr, &schema_with_b); | ||||||
rewrite_predicate_to_statistics_predicate( | ||||||
&expr, | ||||||
&schema, | ||||||
Some(Arc::new(CustomUnhandledHook {})), | ||||||
) | ||||||
}; | ||||||
|
||||||
// transform an arbitrary valid expression that we know is handled | ||||||
let known_expression = col("a").eq(lit(ScalarValue::Int32(Some(12)))); | ||||||
let known_expression_transformed = rewrite_predicate_to_statistics_predicate( | ||||||
&logical2physical(&known_expression, &schema), | ||||||
&schema, | ||||||
None, | ||||||
); | ||||||
|
||||||
// an expression referencing an unknown column (that is not in the schema) gets passed to the hook | ||||||
let input = col("b").eq(lit(ScalarValue::Int32(Some(12)))); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can write literals more concisely like this if you wanted
Suggested change
|
||||||
let expected = logical2physical(&lit(42), &schema); | ||||||
let transformed = transform_expr(input.clone()); | ||||||
assert_eq!(transformed.to_string(), expected.to_string()); | ||||||
|
||||||
// more complex case with unknown column | ||||||
let input = known_expression.clone().and(input.clone()); | ||||||
let expected = phys_expr::BinaryExpr::new( | ||||||
known_expression_transformed.clone(), | ||||||
Operator::And, | ||||||
logical2physical(&lit(42), &schema), | ||||||
); | ||||||
let transformed = transform_expr(input.clone()); | ||||||
assert_eq!(transformed.to_string(), expected.to_string()); | ||||||
|
||||||
// an unknown expression gets passed to the hook | ||||||
let input = array_has(make_array(vec![lit(1)]), col("a")); | ||||||
let expected = logical2physical(&lit(42), &schema); | ||||||
let transformed = transform_expr(input.clone()); | ||||||
assert_eq!(transformed.to_string(), expected.to_string()); | ||||||
|
||||||
// more complex case with unknown expression | ||||||
let input = known_expression.and(input); | ||||||
let expected = phys_expr::BinaryExpr::new( | ||||||
known_expression_transformed.clone(), | ||||||
Operator::And, | ||||||
logical2physical(&lit(42), &schema), | ||||||
); | ||||||
let transformed = transform_expr(input.clone()); | ||||||
assert_eq!(transformed.to_string(), expected.to_string()); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_rewrite_expr_to_prunable_error() { | ||||||
// cast string value to numeric value | ||||||
|
@@ -3886,6 +4020,7 @@ mod tests { | |||||
required_columns: &mut RequiredColumns, | ||||||
) -> Arc<dyn PhysicalExpr> { | ||||||
let expr = logical2physical(expr, schema); | ||||||
build_predicate_expression(&expr, schema, required_columns) | ||||||
let unhandled_hook = default_unhandled_hook(); | ||||||
build_predicate_expression(&expr, schema, required_columns, &unhandled_hook) | ||||||
} | ||||||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a new API I think it might be worth thinking a bit more about. What would you think about moving this logic into its own structure?
For example, instead of
Something like
Or with a rewrite hook
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup that sounds good to me, I'll push that change tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually just did it, it was easy.
My main question is if it should include the schema or not.