Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Short term way to make AggregateStatistics still work when min/max is converted to udaf #11261

Merged
merged 2 commits into from
Jul 12, 2024

Conversation

Rachelint
Copy link
Contributor

@Rachelint Rachelint commented Jul 4, 2024

Which issue does this PR close?

Part of #11153

Rationale for this change

Now only hard code like if agg_expr.as_any().is::<expressions::Min>() to identify the min/max aggregate function in AggregateStatistics optimizer. It can't work after min/max is converted to udaf.

This pr is for solving problem stated above. The best way to solve is adding a function like output_from_stats into AggregateExpr and moving the logic from the optimizer to udaf. But unfortunately it is not so easy to find good way to do that...

For unblocking the udaf conversion about min/max, just impl a short term solution here.

What changes are included in this PR?

Support to identify min/max after they are converted to udaf in later.

Are these changes tested?

By exist tests.

Are there any user-facing changes?

No.

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions core Core DataFusion crate labels Jul 4, 2024
@Rachelint
Copy link
Contributor Author

Rachelint commented Jul 4, 2024

@alamb @edmondop
Hi, I have made a draft impl here...
I define a new function output_from_stats as a general function to get results directly from stats if possible.
Howerver, when I adapt it to AggregateUDFImpl, I found I can't get the physical exprs which are needed, see the original code in physical optimizer:

if casted_expr.expressions().len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
if let Precision::Exact(val) =
&col_stats[col_expr.index()].max_value

It is mainly due to AggregateUDFImpl is defined in expr crate, and the needed PhysicalExpr is defined in datafusion-physical-expr-common crate which will refer exps a dependency. And if we import PhysicalExpr in expr, that will lead to cyclic dependecies...

For solving it, I define a AggregateArgExprs trait in expr crate, and impl it for the Vec<Arc<dyn PhysicalExpr>>. It can work but we need to downcast it to &[Arc<dyn PhysicalExpr>] when using it in output_from_stats of the udaf(see the impl of count...). I am not sure if it is a good way to impl like this...

@alamb
Copy link
Contributor

alamb commented Jul 5, 2024

Thank you @Rachelint ❤️

It is mainly due to AggregateUDFImpl is defined in expr crate, and the needed PhysicalExpr is defined in datafusion-physical-expr-common crate which will refer exps a dependency. And if we import PhysicalExpr in expr, that will lead to cyclic dependecies...

This makes sense -- I agree the AggregateUDFImpl API should not be in terms of PhysicalExpr (for the dependency reasons you describe)

One way I could think to get around this would be to figure out the column statistics in the physical planner and pass it directly (rather than making the UDF figure them out). Something like

pub trait AggregateUDFImpl {
...
   /// If the output of this aggregate function can be determined 
   /// only from input statistics (e.g. COUNT(..) with 0 rows, is always 0)
   /// return that value
   ///
   /// # Arguments
   /// stats: Overall statistics for the input (including row count)
   /// arg_stats: Statistics, if known, for each input argument
    fn output_from_stats(
        &self,
        stats: &Statistics,
        arg_stats: &[Option<&ColumnStatistics>]
    ) -> Option<ScalarValue> {
        None
    }
...
}

ANother option might be to use the narrower API described on #11153

impl AggregateExpr {

 /// Return the value of the aggregate function, if known, given the number of input rows.
 /// 
 /// Return None if the value can not be determined solely from the input.
 /// 
 /// # Examples
 /// * The `COUNT` aggregate would return `Some(11)` given `num_rows = 11`
 /// * The `MIN` aggregate would return `Some(Null)` given `num_rows = 0
 /// * The `MIN` aggregate would return `None` given num_rows = 11
 fn output_from_rows(&self, num_rows: usize) -> Option<ScalarValue> { None }
...
}

Though I think your idea is better (pass in statistics in generla)

@edmondop
Copy link
Contributor

edmondop commented Jul 6, 2024

The draft implementation seems reasonable and all checks are passing 🎉

@Rachelint
Copy link
Contributor Author

Rachelint commented Jul 6, 2024

@alamb One thing I worry about the narrow api is that, it seems can't be used to support the original optimization of min/max?

value if value > 0 => {
let col_stats = &stats.column_statistics;
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Min>()
{
if casted_expr.expressions().len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
if let Precision::Exact(val) =
&col_stats[col_expr.index()].min_value
{
if !val.is_null() {
return Some((
val.clone(),
casted_expr.name().to_string(),
));
}
}
}
}
}
}
_ => {}
}

Maybe I misunderstand about it?

@alamb
Copy link
Contributor

alamb commented Jul 7, 2024

@alamb One thing I worry about the narrow api is that, it seems can't be used to support the original optimization of min/max?

Maybe I misunderstand about it?

No, sorry, you are correct, I was mistaken

@Rachelint
Copy link
Contributor Author

Rachelint commented Jul 9, 2024

@alamb One thing I worry about the narrow api is that, it seems can't be used to support the original optimization of min/max?
Maybe I misunderstand about it?

No, sorry, you are correct, I was mistaken

Ok, been a bit busy the past couple of days, continue to read the related codes and think a relatively good way to solve this today...
One simple way I could think to solve it temporarily for not blocking the progress of #10943
is that we just treat it a special case like count:

if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() {

@alamb
Copy link
Contributor

alamb commented Jul 10, 2024

Ok, been a bit busy the past couple of days, continue to read the related codes and think a relatively good way to solve this today... One simple way I could think to solve it temporarily for not blocking the progress of #10943 is that we just treat it a special case like count:

if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() {

no worries -- there is a lot going on these days.

I think your short term workaround sounds very clever to me 👍 I think it is fine to leave figuring out the right general API to add to AggregateUdfImpl as a future project (my rationale being that special casing "max" / "min" is no worse than hard coding the Min and Max types, and unblocks the conversion to AggregateUDF)

@Rachelint
Copy link
Contributor Author

Rachelint commented Jul 10, 2024

Ok, been a bit busy the past couple of days, continue to read the related codes and think a relatively good way to solve this today... One simple way I could think to solve it temporarily for not blocking the progress of #10943 is that we just treat it a special case like count:

if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() {

no worries -- there is a lot going on these days.

I think your short term workaround sounds very clever to me 👍 I think it is fine to leave figuring out the right general API to add to AggregateUdfImpl as a future project (my rationale being that special casing "max" / "min" is no worse than hard coding the Min and Max types, and unblocks the conversion to AggregateUDF)

😄 Thanks, I just impl the short term workaround now.
But still worry about using the function name only to identify the built-in min/max/count in the optimizer.
Assume that user define a udaf with a same name to overwrite the built-in (seems we can do it?).
And the logic in user's min/max/count is totally different with the built-in.
However the optimizer is unable to know the min/max/count is not the built-in(optimizer just identify by name), and do the mistaken optmization in result.

@alamb
Copy link
Contributor

alamb commented Jul 10, 2024

😄 Thanks, I just impl the short term workaround now. But still worry about using the function name only to identify the built-in min/max/count in the optimizer. Assume that user define a udaf with a same name to overwrite the built-in (seems we can do it?). And the logic in user's min/max/count is totally different with the built-in. However the optimizer is unable to know the min/max/count is not the built-in(optimizer just identify by name), and do the mistaken optmization in result.

I agree -- it will not be correct. I think that is why we eventually need to move the logic out of the optimizer and into the AggregateUDFImpl itself.

@Rachelint Rachelint force-pushed the impl-a-general-get-results-from-stats branch from 33c7410 to 8f52701 Compare July 11, 2024 00:25
@github-actions github-actions bot removed logical-expr Logical plan and expressions physical-expr Physical Expressions labels Jul 11, 2024
@Rachelint
Copy link
Contributor Author

Rachelint commented Jul 11, 2024

😄 Thanks, I just impl the short term workaround now. But still worry about using the function name only to identify the built-in min/max/count in the optimizer. Assume that user define a udaf with a same name to overwrite the built-in (seems we can do it?). And the logic in user's min/max/count is totally different with the built-in. However the optimizer is unable to know the min/max/count is not the built-in(optimizer just identify by name), and do the mistaken optmization in result.

I agree -- it will not be correct. I think that is why we eventually need to move the logic out of the optimizer and into the AggregateUDFImpl itself.

🤔 Seems indeed necessary to do that.
😄 Have finished the short term solution here.
I originally want to avoid downcast_ref::<AggregateFunctionExpr>(), but found if we do that, we should add two functions is_distinct and function_name to AggregateExpr before. I am not sure if it is ok to add them... so I just impl with downcast_ref::<AggregateFunctionExpr>() now...

@Rachelint Rachelint changed the title Impl a general get results from stats Short term way to make AggregateStatistics still work when min/max is converted to udaf Jul 11, 2024
@Rachelint Rachelint marked this pull request as ready for review July 11, 2024 00:54
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Rachelint -- I think this is an improvement as it will upblock pulling out Min/Max aggregate functions, even thought here is more to do. I also happen to think this PR is easier to understand the intent (by moving is_min and is_max checks into functions).

Nice work

my only suggestion is to add comments with a link to the ticket that explains the rationale

@Rachelint
Copy link
Contributor Author

Rachelint commented Jul 12, 2024

Thanks @alamb for review, will continue trying to move them into AggregateUDFImpl, and comments have been added.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @Rachelint -- 🚀 we are making progress

@alamb alamb merged commit 02335eb into apache:main Jul 12, 2024
23 checks passed
@Rachelint Rachelint deleted the impl-a-general-get-results-from-stats branch July 13, 2024 17:28
@edmondop
Copy link
Contributor

Thank you @Rachelint . I think I can resume #11013 unless @alamb you think there is more work we want to externalize/handle via separate PRs

@alamb
Copy link
Contributor

alamb commented Jul 15, 2024

Thank you @Rachelint . I think I can resume #11013 unless @alamb you think there is more work we want to externalize/handle via separate PRs

I am not sure -- I suggest going through all uses of Min and Max (specifically look for any downcast_ref::<Min> and downcast_ref::<Max> references and see if there are any other places that need to be removed

The goal of #11013 I think should also remove the built in Min/Max aggregate

@edmondop
Copy link
Contributor

Right ! It is looking good. The refactoring has broken an optimiser, will need to investigate

@edmondop
Copy link
Contributor

Makes sense. I am almost done on the other pr I need to understand why one optimisation was lost

findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
…is converted to udaf (apache#11261)

* impl the short term solution.

* add todos.
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jul 17, 2024
…is converted to udaf (apache#11261)

* impl the short term solution.

* add todos.
xinlifoobar pushed a commit to xinlifoobar/datafusion that referenced this pull request Jul 18, 2024
…is converted to udaf (apache#11261)

* impl the short term solution.

* add todos.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants