Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support grouping aggregate function #10208

Conversation

JasonLi-cn
Copy link
Contributor

@JasonLi-cn JasonLi-cn commented Apr 24, 2024

Which issue does this PR close?

Closes #5647

Rationale for this change

Currently datafusion does not implement grouping function.

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
not_impl_err!(
"physical plan is not yet implemented for GROUPING aggregate function"
)
}

What changes are included in this PR?

Complete the grouping function.

https://www.postgresql.org/docs/9.5/functions-aggregate.html
https://learn.microsoft.com/en-us/sql/t-sql/functions/grouping-transact-sql?view=sql-server-ver15

Are these changes tested?

Yes

Are there any user-facing changes?

Yes. Perhaps we need to include in the documentation instructions for the grouping function.
https://arrow.apache.org/datafusion/user-guide/sql/aggregate_functions.html

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Apr 24, 2024
@JasonLi-cn
Copy link
Contributor Author

Related work:
#2477
#2486

@JasonLi-cn JasonLi-cn changed the title feat: support grouping aggregate function feat: support grouping aggregate function Apr 24, 2024
Copy link
Member

@waynexia waynexia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JasonLi-cn for this 👍

I noticed this from PG's document:

. The arguments to the GROUPING operation are not actually evaluated, but they must match exactly expressions given in the GROUP BY clause of the associated query level.

Do we need to do some verifications to make sure the param of GROUPING matches GROUP BY? Also I see the implementation of GroupingGroupsAccumulator assumes the input expr is column, but GROUP BY doesn't have such a constrain.

Comment on lines +323 to +325
AggregateFunction::Count | AggregateFunction::Grouping => {
Signature::variadic_any(Volatility::Immutable)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my understanding, this is the key change in the user-faced behavior of this PR, supporting grouping() over multiple columns.

mask
}

impl GroupsAccumulator for GroupingGroupsAccumulator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the comment of GroupsAccumulator https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html#notes-on-implementing-groupaccumulator:

All aggregates must first implement the simpler Accumulator trait, which handles state for a single group. Implementing GroupsAccumulator is optional and is harder to implement than Accumulator, but can be much faster for queries with many group values.

I suppose this grouping group accumulator can also follow this to implement Accumulator as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. I also wanted to realize Accumulator. However, I find that Accumulator cannot be implemented based on the current definition. When calling update_batch, we need to know the information of the current grouping set, so we need to add a parameter to update_batch. This is a big change, I need to get community's advice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Accumulator::update_batch doesn't pass group_indices: &[usize] in. It does not make sense to me if we maintain one in another way for this special grouping expr only for implementing Accumulator. Do you have any insight @alamb ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree having some special case simply for the grouping aggregate that forces changes on all other aggregates isn't ideal

When calling update_batch, we need to know the information of the current grouping set, so we need to add a parameter to update_batch

After reading https://www.postgresql.org/docs/9.5/functions-aggregate.html I see that grouping is basically a special case that only makes sense in the context of grouping set (it provides some context into the grouping set).

Given it is so special, I wonder if we could special case it somehow 🤔

One thing maybe we could do is to add another signature?

trait `GroupsAccumulator`  {
...
    /// Called with the information with what grouping set this batch belongs to.
    /// The default implementaiton calls `Self::update_batch` and ignores the grouping_set
    fn update_grouping_batch(
        &mut self,
        _values: &[ArrayRef],
        group_indices: &[usize],
        opt_filter: Option<&arrow_array::BooleanArray>,
        total_num_groups: usize,
        grouping_set: &[bool],
    ) -> Result<()> {
      self.update_batch(_values, group_indices, opt_filter, total_num_groups)
    }
...

And then we could make it clear in the documentation that the agregator calls update_group_batch but that most implementations can just implement update_batch

Comment on lines +85 to +89
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
not_impl_err!(
"physical plan is not yet implemented for GROUPING aggregate function"
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement Accumulator for GroupingGroupsAccumulator and then implement this method?

@JasonLi-cn
Copy link
Contributor Author

Thanks @JasonLi-cn for this 👍

I noticed this from PG's document:

. The arguments to the GROUPING operation are not actually evaluated, but they must match exactly expressions given in the GROUP BY clause of the associated query level.

Do we need to do some verifications to make sure the param of GROUPING matches GROUP BY? Also I see the implementation of GroupingGroupsAccumulator assumes the input expr is column, but GROUP BY doesn't have such a constrain.

Thanks @waynexia for your suggestion. I agree with you and I will make improvements according to your suggestions.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR @JasonLi-cn and the review @waynexia

I was not familiar with the grouping function before. Fascinating. I left some thoughts

I apologize for the delay in review. I have been very bust lately

mask
}

impl GroupsAccumulator for GroupingGroupsAccumulator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree having some special case simply for the grouping aggregate that forces changes on all other aggregates isn't ideal

When calling update_batch, we need to know the information of the current grouping set, so we need to add a parameter to update_batch

After reading https://www.postgresql.org/docs/9.5/functions-aggregate.html I see that grouping is basically a special case that only makes sense in the context of grouping set (it provides some context into the grouping set).

Given it is so special, I wonder if we could special case it somehow 🤔

One thing maybe we could do is to add another signature?

trait `GroupsAccumulator`  {
...
    /// Called with the information with what grouping set this batch belongs to.
    /// The default implementaiton calls `Self::update_batch` and ignores the grouping_set
    fn update_grouping_batch(
        &mut self,
        _values: &[ArrayRef],
        group_indices: &[usize],
        opt_filter: Option<&arrow_array::BooleanArray>,
        total_num_groups: usize,
        grouping_set: &[bool],
    ) -> Result<()> {
      self.update_batch(_values, group_indices, opt_filter, total_num_groups)
    }
...

And then we could make it clear in the documentation that the agregator calls update_group_batch but that most implementations can just implement update_batch

Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Jul 21, 2024
@JasonLi-cn JasonLi-cn marked this pull request as draft July 21, 2024 06:34
@github-actions github-actions bot removed the Stale PR has not had any activity for some time label Jul 22, 2024
@alamb alamb closed this in #12704 Oct 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Grouping functions with Group By CUBE/ROLLUP/GROUPING SETS
3 participants