Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move conversion of FIRST/LAST Aggregate function to independent physical optimizer rule #9972

Closed
jayzhan211 opened this issue Apr 6, 2024 · 12 comments · Fixed by #10061
Closed
Labels
enhancement New feature or request

Comments

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 6, 2024

Is your feature request related to a problem or challenge?

Parts of #8708

First / Last aggregate function has the method fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> that returns another new AggregateExpr. We can't support this method since AggregateExpr is in functions-aggregate (I plan to move it here in #9960, it doesn't work too if we keep it in physical-expr-common). I propose that we move AggregateUDFImpl to functions-aggregate.

The overall idea is that we move aggregation functions struct or trait including logical and physical to functions-aggregate. keep other common struct or trait in datafusion-expr and datafusion-physical-expr-common respectively.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@jayzhan211 jayzhan211 added the enhancement New feature or request label Apr 6, 2024
@jayzhan211
Copy link
Contributor Author

@alamb The design differs from the scalar function, so I'm unsure if it makes sense.

@alamb
Copy link
Contributor

alamb commented Apr 6, 2024

First / Last aggregate function has the method fn reverse_expr(&self) -> Option<Arc> that returns another new AggregateExpr. We can't support this method since AggregateExpr is in functions-aggregate (I plan to move it here in #9960, it doesn't work too if we keep it in physical-expr-common). I propose that we move AggregateUDFImpl to functions-aggregate.

I don't understand why it wouldn't work if we kept AggregateExpr in physical-expr-common?

Since functions-aggregate depends on physical-expr-common then I think it would be reasonable for FirstValueUDAF that is defined functions-aggregate to return a LastValueUDAF (wrapped in an AggregateExpr) as the result of reverse_expr

I may be missing something though

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Apr 6, 2024

@alamb

To support reverse_expr for FirstValue, we need to implement it in impl AggregateExpr for AggregateFunctionExpr .
Since I think this method should be customizable for the user, so I think we should implement like

   // fun: AggregateUDF,
    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
         self.fun.reverse_expr()
   }

Therefore, we need to have reverse_expr for AggregateUDF, and the only choice for us is self.inner.reverse_expr. That indicates that we need reverse_expr for AggregateUDFImpl.

To be able to return Option<Arc<dyn AggregateExpr>> for AggregateUDFImpl, we need to import AggregateExpr which stays in physical-expr-common.

I agree that AggregateExpr be kept in physical-expr-common now.
How about AggregateUDFImpl? It is weird for it to go physical-expr-common. We need another expression layer that is able to import physical-expr-common. If we move AggregateUDFImpl to aggregate-functions then the user need to pull out all builtin functions which is not good too.

Possible solution

  1. Find another way to do reverse_expr.
  2. Introduce another crate :). Given that there are also many kinds of things for the user, like type coercion, rewriting, and others. It is not a bad idea to introduce logical-expr-common or expr-common

@alamb
Copy link
Contributor

alamb commented Apr 6, 2024

That indicates that we need reverse_expr for AggregateUDFImpl.

I agree

Therefore, we need to have reverse_expr for AggregateUDF, and the only choice for us is self.inner.reverse_expr. That indicates that we need reverse_expr for AggregateUDFImpl.

I wonder if we could have the function return AggregateUDF instead of AggregateExpr ? Something like

trait AggregateUDFImpl { 
...
   // fun: AggregateUDF,
    fn reverse_expr(&self) -> Option<AggregateUDF>;

And then we can reconstruct the AggregateExpr from there?

My thinking is that once we have ported all aggregates to AggregateUDF then this would be the only way to implement Aggregates

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Apr 7, 2024

Not AggregateUDF because it does not have ordering info. But I think we can convert to AggregateFunction which contains AggregateUDF and order_by, but there is a concern. The previous implementation converts first to last in physical expr directly. If we convert to AggregateFunction. There is an additional cost to converting expr from logical to physical back and forth. And, the reason for reversing expr is to gain more efficiency, the change here is contradicting.

How about we introduce a UDF version for AggregateExpr? With that, the user can define their conversion between physical-expr. Each function can have its own AggregateFunctionExpr.

@alamb
Copy link
Contributor

alamb commented Apr 7, 2024

Not AggregateUDF because it does not have ordering info.

Does it need the ordering info? Or could we keep the code that makes the new AggregateFunctionExpr in the physical optimizer?

Doesn't reverse simply need to needs to know what function to use when if the sort order is "reversed"?

impl AggregateUDFImpl { 
... 
  /// returns the "reverse" of this aggregate. Reverse means the function to use when the input ordering is 
  /// reversed. For example, `first_value` can be reversed to `last_value` if the ordering is reversed. 
  /// if the function is its own reverse, 
  fn reverse(&self) -> Option<Arc<Aggregate>>

This seems to mirror how the current AggregateExpr works:

https://github.com/alamb/arrow-datafusion/blob/a1ae15826245097e7c12d4f0ed3425b25af6c431/datafusion/physical-expr/src/aggregate/average.rs#L144-L146

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Apr 7, 2024

Not AggregateUDF because it does not have ordering info.

Does it need the ordering info? Or could we keep the code that makes the new AggregateFunctionExpr in the physical optimizer?

Doesn't reverse simply need to needs to know what function to use when if the sort order is "reversed"?

impl AggregateUDFImpl { 
... 
  /// returns the "reverse" of this aggregate. Reverse means the function to use when the input ordering is 
  /// reversed. For example, `first_value` can be reversed to `last_value` if the ordering is reversed. 
  /// if the function is its own reverse, 
  fn reverse(&self) -> Option<Arc<Aggregate>>

This seems to mirror how the current AggregateExpr works:

https://github.com/alamb/arrow-datafusion/blob/a1ae15826245097e7c12d4f0ed3425b25af6c431/datafusion/physical-expr/src/aggregate/average.rs#L144-L146

We create Last with reversed ordering.
First is equal to Last with reversed ordering.
Btw, why is reverse expr in Avg, Sum, MinMax, and Count just returning clone, what is the difference between returns None?

   // impl AggregateExpr for FirstValuePhysicalExpr 
    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
        Some(Arc::new(self.clone().convert_to_last()))
    }

    pub fn convert_to_last(self) -> LastValuePhysicalExpr {
        let name = if self.name.starts_with("FIRST") {
            format!("LAST{}", &self.name[5..])
        } else {
            format!("LAST_VALUE({})", self.expr)
        };
        let FirstValuePhysicalExpr {
            expr,
            input_data_type,
            ordering_req,
            order_by_data_types,
            ..
        } = self;
        LastValuePhysicalExpr::new(
            expr,
            name,
            input_data_type,
            reverse_order_bys(&ordering_req),
            order_by_data_types,
        )
    }

could we keep the code that makes the new AggregateFunctionExpr in the physical optimizer

I think we need to move get_aggregate_exprs_requirement to physical optimize rule, then we can deal with first to last conversion inside the rule.

https://github.com/apache/arrow-datafusion/blob/215f30f74a12e91fd7dca0d30e37014c8c3493ed/datafusion/physical-plan/src/aggregates/mod.rs#L894-L900

@alamb
Copy link
Contributor

alamb commented Apr 8, 2024

Btw, why is reverse expr in Avg, Sum, MinMax, and Count just returning clone, what is the difference between returns None?

I don't know

I think we need to move get_aggregate_exprs_requirement to physical optimize rule, then we can deal with first to last conversion inside the rule.

Seems reasonable to me, but I didnt look carefully at the code

Thank you for pursuing this @jayzhan211

@jayzhan211 jayzhan211 changed the title Move AggregateUDFImpl to functions-aggregate Move get_aggregate_exprs_requirement to PhysicalOptimizerRule Apr 9, 2024
@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Apr 10, 2024

@alamb

I planned to move out the code that computes required_input_ordering for AggregateExec to the optimizer. Because that is the part contains get_aggregate_exprs_requirement. I name the rule simplify_ordering.

The problem is that I need to ensure the rule simplify_ordering is applied somewhere after AggregateExec::try_new_with_schema and before required_input_ordering is required. Since many rules include AggregateExec::try_new_with_schema I end up inserting simplify_ordering after many rules. The worst is that the rule EnforceDistribution expects the simplify_ordering be applied between the rule.

        let adjusted = if top_down_join_key_reordering {
            // Run a top-down process to adjust input key ordering recursively
            let plan_requirements = PlanWithKeyRequirements::new_default(plan);
            let adjusted = plan_requirements
                .transform_down(&adjust_input_keys_ordering)
                .data()?;
            adjusted.plan
        } else {
            // Run a bottom-up process
            plan.transform_up(&|plan| {
                Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
            })
            .data()?
        };

      // Expect simply_ordering here before `ensure_distribution` is called.

        let distribution_context = DistributionContext::new_default(adjusted);
        // Distribution enforcement needs to be applied bottom-up.
        let distribution_context = distribution_context
            .transform_up(&|distribution_context| {
                ensure_distribution(distribution_context, config)
            })
            .data()?;
        Ok(distribution_context.plan)

I need to split this rule into two, so I can insert the simplify_ordering in between. And, I think there is something wrong 😢

The other approach is instead of having simplify_ordering as a single rule, have it like a function for each rule, but I think either way is hard to maintain, we still need to think about where to insert the rule.

Since the required_input_ordering is integrated deeply with AggregateExec creation, pulling it out as a rule ends up we need to ensure the rule is applied in the correct place, which seems like a bad design for me.

UPDATE:
I have no good idea currently, leave this note here and see if you or others have any good idea for it.
I comment out the first to last conversion only in get_aggregate_exprs_requirement and I can have one single simplify_ordering at the end without error. Maybe I can just move these two conversions instead of the whole get_aggregate_exprs_requirement to optimizer. 🤔

@alamb
Copy link
Contributor

alamb commented Apr 10, 2024

Maybe @mustafasrepo has some ideas to share about this too, as he has worked extensively with that code and authored a significant amount of it

@jayzhan211 jayzhan211 changed the title Move get_aggregate_exprs_requirement to PhysicalOptimizerRule Move conversion of FIRST/LAST Aggregate function to independent physical optimizer rule Apr 13, 2024
@mustafasrepo
Copy link
Contributor

Sorry for the late reply. Since I was in vacation, couldn't look here.

Btw, why is reverse expr in Avg, Sum, MinMax, and Count just returning clone, what is the difference between returns None?

As an example usecase: Consider the query

SELECT SUM(b), FIRST_VALUE(b ORDER BY c DESC)
FROM table
GROUP BY a

where table is already ordered by c ASC. In this case, by taking reverse of SUM(which is itself) and FIRST_VALUE we can convert query above to it equivalent form below

SELECT SUM(b), LAST_VALUE(b ORDER BY c ASC)
FROM table
GROUP BY a

to align ordering requirement with existing ordering. Returning None from fn reverser_expr() indicates that when input data is iterated in reverse order, the result generated wouldn't be same compared to existing version. However, for SUM, AVG etc. when input data is iterated in reverse order, the result is same. As an another counter example, consider query

SELECT ARRAY_AGG(b ORDER BY c DESC), FIRST_VALUE(b ORDER BY c DESC)
FROM table
GROUP BY a

where table is ordered by c ASC as before. There is no way to produce result of the ARRAY_AGG(b ORDER BY c DESC) with the ordering c ASCat the input. Hence, forARRAY_AGG, this implementation returns Noneto communicate this feature. In short, for order insensitive aggregators we should implementfn reverse_expr` by returning the clone of the existing aggregator, to communicate same result would be generated in reverse order (in any arbitrary permutation actually).

@jayzhan211
Copy link
Contributor Author

Sorry for the late reply. Since I was in vacation, couldn't look here.

Btw, why is reverse expr in Avg, Sum, MinMax, and Count just returning clone, what is the difference between returns None?

As an example usecase: Consider the query

SELECT SUM(b), FIRST_VALUE(b ORDER BY c DESC)
FROM table
GROUP BY a

where table is already ordered by c ASC. In this case, by taking reverse of SUM(which is itself) and FIRST_VALUE we can convert query above to it equivalent form below

SELECT SUM(b), LAST_VALUE(b ORDER BY c ASC)
FROM table
GROUP BY a

to align ordering requirement with existing ordering. Returning None from fn reverser_expr() indicates that when input data is iterated in reverse order, the result generated wouldn't be same compared to existing version. However, for SUM, AVG etc. when input data is iterated in reverse order, the result is same. As an another counter example, consider query

SELECT ARRAY_AGG(b ORDER BY c DESC), FIRST_VALUE(b ORDER BY c DESC)
FROM table
GROUP BY a

where table is ordered by c ASC as before. There is no way to produce result of the ARRAY_AGG(b ORDER BY c DESC) with the ordering c ASCat the input. Hence, forARRAY_AGG, this implementation returns Noneto communicate this feature. In short, for order insensitive aggregators we should implementfn reverse_expr` by returning the clone of the existing aggregator, to communicate same result would be generated in reverse order (in any arbitrary permutation actually).

I see. It makes sense to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants