Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unhandled hook to PruningPredicate #12606

Closed
wants to merge 7 commits into from

Conversation

adriangb
Copy link
Contributor

I have a secondary index with min/max stats columns that is compatible with PruningPredicate's rewrites.

I now want to add an index for point lookups (I plan on implementing it as a column with distinct array values, but that's a bit of an implementation detail).

The point is that when PruningPredicate encounters this column (for which there are no stats, and which it doesn't recognize because I only pass in Fields for which there are stats) it currently returns true such that a_column_with_stats = 123 and a_point_lookup_column = 'abc' becomes a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and true (ignoring nulls, maybe simplifying other bits) but I want it to become a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and a_point_lookup_column @> '{abc}'::text[] or something like that.

I don't think it's reasonable to add APIs to DataFusion for this specific case since it depends on implementation details outside of DataFusion's control, but I also can't easily work around it on my end (I'd have to re-implement all of PruningPredicate). So I'm hoping that adding this hook is acceptable 😄

@github-actions github-actions bot added the core Core DataFusion crate label Sep 24, 2024
@adriangb
Copy link
Contributor Author

cc @alamb would appreciate a review!

Comment on lines +483 to +487
pub trait UnhandledPredicateHook {
/// Called when a predicate can not be handled by DataFusion's transformation rules
/// or is referencing a column that is not in the schema.
fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a closure but I had issues with lifetimes, etc. Having the trait also gives it a useful name 😄

The other API questions are:

  • Should this be mutable? I think implementers can just use interior mutability if needed.
  • Should this make it easier to say "use the existing expression"? I don't think that's a common case, and the current APIs use &Arc<dyn PhysicalExpr> -> Arc<dyn PhysicalExpr> as well. Plus it's as easy as a Clone on an Arc.

@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

I now want to add an index for point lookups (I plan on implementing it as a column with distinct array values, but that's a bit of an implementation detail).

The point is that when PruningPredicate encounters this column (for which there are no stats, and which it doesn't recognize because I only pass in Fields for which there are stats) it currently returns true such that a_column_with_stats = 123 and a_point_lookup_column = 'abc' becomes a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and true (ignoring nulls, maybe simplifying other bits) but I want it to become a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and a_point_lookup_column @> '{abc}'::text[] or something like that.

Perhaps you can rewrite the predicate before passing it to the parquet exec or the PruningPredicate? I don't fully understand what a_point_lookup_column @> '{abc}'::text[] means but it seems like you could easily do that rewrite / substitution before PruningPredicate.

I don't understand the benefit that is obtained by doing the rewrite during the pruning predicate rewrite 🤔

@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

It might also be good to look at https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html#method.literal_guarantees which you might be able to use to apply you index

@adriangb
Copy link
Contributor Author

The issue is that PruningPredicate discards (by returning true) any predicates it doesn't itself know how to rewrite. So if I do the rewrite before calling PruningPredicate then that rewrite is lost.

@adriangb
Copy link
Contributor Author

I don't fully understand what a_point_lookup_column @> '{abc}'::text[]

Basically I want to take the predicate a_point_lookup_column = 'abc' and transform that into a filter in my index.
I've chosen to store this is a_point_lookup_column_distinct text[] for the case of a UTF8 column called a_point_lookup_column. This column is then stored alongside other stats columns so you end up with something like (file_path text, row_group int, a_stats_column_min double, a_stats_column_max double, a_stats_column_null_count int, a_point_lookup_column_distinct text[]).

@adriangb
Copy link
Contributor Author

It might also be good to look at https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html#method.literal_guarantees which you might be able to use to apply you index

I admit I'm still a bit confused about LiteralGuarantees but it seems to me that to use them I'd have to have all of the data in memory.
The whole point of this is that I can store a relatively large amount of data (say 1M 16 character strings) and rip through filtering them by letting the system storing them (in my case a Postgres database with a GIN index on the array column) give me back just the row groups that matched the predicate, without ever moving all of that data over the wire to build LiteralGuarantees from it.

@adriangb
Copy link
Contributor Author

adriangb commented Sep 26, 2024

I'll add that I've been using this (as in this change + an actual implementation that uses it) in production for a couple days now and it works amazingly. It's taken some queries from >3s to <1s (from downloading all of a column for all of time to a <100ms lookup in a Postgres index).

@alamb
Copy link
Contributor

alamb commented Sep 27, 2024

Basically I want to take the predicate a_point_lookup_column = 'abc' and transform that into a filter in my index.
I've chosen to store this is a_point_lookup_column_distinct text[] for the case of a UTF8 column called a_point_lookup_column. This column is then stored alongside other stats columns so you end up with something like (file_path text, row_group int, a_stats_column_min double, a_stats_column_max double, a_stats_column_null_count int, a_point_lookup_column_distinct text[]).

I see -- what I am not understanding is why you need to do this rewrite as part of the PruningPredicate logic (which is already complicated). WHy can't you do the rewrite/transformation before passing the predicate to PruningPredicate ?

@adriangb
Copy link
Contributor Author

adriangb commented Sep 27, 2024

Here's an example:

use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use datafusion::{common::DFSchema, physical_optimizer::pruning::PruningPredicate, prelude::*};

fn main() {
    let ctx = SessionContext::new();
    let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
    let df_schema = DFSchema::try_from(schema.clone()).unwrap();

    // An expression that PruningPredicate doesn't understand becomes `true`
    let expr = ctx.parse_sql_expr("col = ANY([1, 2])", &df_schema).unwrap();
    println!("expr: {:?}", expr);
    let phys_expr = ctx.create_physical_expr(expr, &df_schema).unwrap();
    println!("phys_expr: {:?}", phys_expr);
    let pruning = PruningPredicate::try_new(phys_expr, schema.clone()).unwrap();
    let pruning_expr = pruning.predicate_expr().clone();
    println!("pruning_expr: {:?}", pruning_expr);
    // pruning_expr: Literal { value: Boolean(true) }

    // An expression referencing columns that don't have statistics collected (i.e. aren't int the schema)
    // causes an Err
    let expr = ctx.parse_sql_expr("other = 1", &df_schema).unwrap();
    println!("expr: {:?}", expr);
    let phys_expr = ctx.create_physical_expr(expr, &df_schema).unwrap();
    println!("phys_expr: {:?}", phys_expr);
    PruningPredicate::try_new(phys_expr, schema.clone()).unwrap();
    // SchemaError(FieldNotFound { field: Column { relation: None, name: "other" }, valid_fields: [Column { relation: None, name: "col" }] }, Some(""))
}

If I do the rewrite before PruningPredicate then I end up with just true.

@alamb
Copy link
Contributor

alamb commented Sep 28, 2024

If I do the rewrite before PruningPredicate then I end up with just true.

I don't see a rewrite in your example. I would have expected that you wrote something that substituted other = 1 with what you wanted

What rewrite are you doing in your actual production system

This sounds quite similar to #7869 maybe

@adriangb
Copy link
Contributor Author

The point is the rewrite I want to do is col = 'a' or col in ('a', 'b') into col_distinct = ANY(['a']) or col_distinct = ANY(['a', 'b']) respectively (or maybe exists (select * from col__distinct where col = ANY(['a', 'b']) and stats.row_group_id = col__distinct .row_group_id), I'm still playing around with real world data and plans to see what is best; point is it's an arbitrarily complex expression that I want left alone). As per above if I do that before PruningPredicate it won't preserve my transformation and will instead convert it to true.

@alamb
Copy link
Contributor

alamb commented Oct 1, 2024

I am sorry I don't quite follow your example

I found the PruningPredicate logic very tricky (when you could absolutely be sure no rows will match needs to be 100% precise), especially in the context of tristate logic.

The pruning predicate logic has to treat "I don't know if the predicate could pass (typically NULL in evaluation)" differently than if the predicate actually evaluated to NULL on the actual row

@alamb
Copy link
Contributor

alamb commented Oct 3, 2024

I think to follow up here, my conclusion is the current PR as it is implemented now likely has only one user but makes the overall rewrite logic harder to follow (and it is already complicated enough)

As I understand it, the real usecase here is to not to actually use the PruningPredicate directly to prune values, but instead to use the same rewrite logic to turn the predicate into a query to run elsewhere (postgres as I understand)

I suggest either:

  1. Refactoring the rewrite logic out of PruningPredicate somehow (so that `PruningPredicate can call it, as well as a custom rewrite)
  2. Take / fork the rewrite logic and adjust it as needed locally

Hopefully that makes sense

@adriangb
Copy link
Contributor Author

adriangb commented Oct 4, 2024

Makes sense, thank you Andrew.

If I wanted to go with option (1) I would still need some way to control what the rewrite does when faced with expressions it does not recognize. Would you in that case accept something along the lines of the hook proposed in this PR? If not I think I'll have to go with (2), not a big deal but I would rather not have to vendor the code.

@alamb
Copy link
Contributor

alamb commented Oct 4, 2024

Would you in that case accept something along the lines of the hook proposed in this PR? If not I think I'll have to go with (2), not a big deal but I would rather not have to vendor the code.

Yeah, I would expect there to be some sort of hook in the rewrite logic (with a default implementation to replace with true)

I think if done right, it could be quite elegant and better separate out the expression rewrite from the rest of the pruning predicate code

@alamb alamb marked this pull request as draft October 6, 2024 11:33
@alamb
Copy link
Contributor

alamb commented Oct 6, 2024

Marking as draft as I think this PR is no longer waiting on review and I am trying to clear the review backlog

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants