-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add grouping_id function #10518
Add grouping_id function #10518
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤘
@@ -576,7 +594,12 @@ private static ValueExtractFunction makeValueExtractFunction( | |||
// Add aggregations. | |||
final int resultRowAggregatorStart = query.getResultRowAggregatorStart(); | |||
for (int i = 0; i < entry.getValues().length; i++) { | |||
resultRow.set(resultRowAggregatorStart + i, entry.getValues()[i]); | |||
if (dimsToInclude != null && groupingAggregatorsBitSet.get(i)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably doesn't matter, but since this is a relatively hot path of the code, it might be worth measuring if this if statement here has any performance impact on group by queries which do not having a grouping aggregator. If it is actually visible, it might be worth pulling out a separate code path to handle the grouping sets case so that this does not negatively impact performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. This is one change I am a bit anxious about. is there any existing benchmark I could use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this if
clause is probably fine. However, if I'm reading code correctly, the new aggregators seem to do nothing and even their result is not in use but those aggregators are still involved in hash aggregation. I'm more worrying about this because it involves serializing/deserializing the aggregator values to/from off-heap memory which is pretty expensive. Because GroupingAggregatorFactory
is a special aggregator type which cannot be computed by regular aggregation, can we rewrite the query to not compute them in hash aggregation but add the aggregation results as what you do now?
Right. This is one change I am a bit anxious about. is there any existing benchmark I could use?
GroupByBenchmark
will be the easiest place for such benchmarks, but it's not probably the best place because it benchmarks the query performance of historicals. I would like to suggest to add a new one for broker performance, but this comment is not a blocker. I'm OK with adding a new benchmark in GroupByBenchmark
for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current Long*Aggregator implementations, there is no serialization/deserialization. Aggregations are no-op and get call just returns the value passed in the constructor. am I missing any code path here?
I also wanted to limit the places where there is special handling for GroupingAggregatorFactory
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked to @abhishekagarwal87 offline. My biggest concern is that the special handling for GroupingAggregatorFactory
seems pretty magical because the behaviour of the factory and its aggregators is different from others. What I suggested above is making it more special but less magical, because I think it's less confusing. @abhishekagarwal87's concern is mostly around the complexity of query rewriting which requires adjusting result row signature (because the result of grouping
function will be missing at certain points during a query after rewrite). I think we can still handle this but maybe it could be fragile because we don't have a systemic way to handle result row signature changes during a query and thus the logic to handle them will be ad-hoc. I agree with this view, so the current structure seems reasonable even though I still think, ideally, we should not involve GroupingAggregatorFactory
in hash aggregation. Maybe we can do in the future once we have some better way to handle query writing and result row signature changes.
processing/src/main/java/org/apache/druid/query/aggregation/GroupingAggregatorFactory.java
Outdated
Show resolved
Hide resolved
...essing/src/main/java/org/apache/druid/query/aggregation/constant/LongConstantAggregator.java
Show resolved
Hide resolved
@@ -576,7 +594,12 @@ private static ValueExtractFunction makeValueExtractFunction( | |||
// Add aggregations. | |||
final int resultRowAggregatorStart = query.getResultRowAggregatorStart(); | |||
for (int i = 0; i < entry.getValues().length; i++) { | |||
resultRow.set(resultRowAggregatorStart + i, entry.getValues()[i]); | |||
if (dimsToInclude != null && groupingAggregatorsBitSet.get(i)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this if
clause is probably fine. However, if I'm reading code correctly, the new aggregators seem to do nothing and even their result is not in use but those aggregators are still involved in hash aggregation. I'm more worrying about this because it involves serializing/deserializing the aggregator values to/from off-heap memory which is pretty expensive. Because GroupingAggregatorFactory
is a special aggregator type which cannot be computed by regular aggregation, can we rewrite the query to not compute them in hash aggregation but add the aggregation results as what you do now?
Right. This is one change I am a bit anxious about. is there any existing benchmark I could use?
GroupByBenchmark
will be the easiest place for such benchmarks, but it's not probably the best place because it benchmarks the query performance of historicals. I would like to suggest to add a new one for broker performance, but this comment is not a blocker. I'm OK with adding a new benchmark in GroupByBenchmark
for now.
processing/src/main/java/org/apache/druid/query/aggregation/GroupingAggregatorFactory.java
Outdated
Show resolved
Hide resolved
...essing/src/main/java/org/apache/druid/query/aggregation/constant/LongConstantAggregator.java
Show resolved
Hide resolved
One more comment. I think the query should be rewritten before it is sent to historicals to not include |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code changes LGTM. I left a couple of comments for Javadoc and comments explaining the magical behaviour.
processing/src/main/java/org/apache/druid/query/aggregation/GroupingAggregatorFactory.java
Show resolved
Hide resolved
@@ -576,7 +594,12 @@ private static ValueExtractFunction makeValueExtractFunction( | |||
// Add aggregations. | |||
final int resultRowAggregatorStart = query.getResultRowAggregatorStart(); | |||
for (int i = 0; i < entry.getValues().length; i++) { | |||
resultRow.set(resultRowAggregatorStart + i, entry.getValues()[i]); | |||
if (dimsToInclude != null && groupingAggregatorsBitSet.get(i)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked to @abhishekagarwal87 offline. My biggest concern is that the special handling for GroupingAggregatorFactory
seems pretty magical because the behaviour of the factory and its aggregators is different from others. What I suggested above is making it more special but less magical, because I think it's less confusing. @abhishekagarwal87's concern is mostly around the complexity of query rewriting which requires adjusting result row signature (because the result of grouping
function will be missing at certain points during a query after rewrite). I think we can still handle this but maybe it could be fragile because we don't have a systemic way to handle result row signature changes during a query and thus the logic to handle them will be ad-hoc. I agree with this view, so the current structure seems reasonable even though I still think, ideally, we should not involve GroupingAggregatorFactory
in hash aggregation. Maybe we can do in the future once we have some better way to handle query writing and result row signature changes.
processing/src/main/java/org/apache/druid/query/aggregation/GroupingAggregatorFactory.java
Outdated
Show resolved
Hide resolved
...essing/src/main/java/org/apache/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I left comments for minor typos. I'm OK with either fixing them in this PR or in a follow-up. @abhishekagarwal87 thanks for your patience!
processing/src/main/java/org/apache/druid/query/aggregation/GroupingAggregatorFactory.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/aggregation/GroupingAggregatorFactory.java
Outdated
Show resolved
Hide resolved
if (dimIndex >= 0) { | ||
dimsToIncludeBitSet.set(dimIndex - resultRowDimensionStart); | ||
} | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: javadoc comment format on inline comment
@jihoonson Thank you for the thorough review. I have fixed the typos. |
@@ -337,6 +338,7 @@ Only the COUNT aggregation can accept DISTINCT. | |||
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| | |||
|`ANY_VALUE(expr)`|Returns any value of `expr` including null. `expr` must be numeric. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)| | |||
|`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| | |||
|`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy dimension is included in a row, when using `GROUPING SETS`. Refer to [additional documentation](aggregations.md#Grouping Aggregator) on how to infer this number.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhishekagarwal87 I don't think this anchor will work (our doc generators don't include spaces in anchors). Could you please double-check it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Even github doesn't render it correctly though intellij does it. Fixed in PR
#10654
@@ -99,7 +99,8 @@ total. Finally, GROUP BY CUBE computes a grouping set for each combination of gr | |||
`GROUP BY CUBE (country, city)` is equivalent to `GROUP BY GROUPING SETS ( (country, city), (country), (city), () )`. | |||
Grouping columns that do not apply to a particular row will contain `NULL`. For example, when computing | |||
`GROUP BY GROUPING SETS ( (country, city), () )`, the grand total row corresponding to `()` will have `NULL` for the | |||
"country" and "city" columns. | |||
"country" and "city" columns. Column may also be `NULL` if it was `NULL` in the data itself. To differentiate such rows | |||
, you can use `GROUPING` aggregation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhishekagarwal87 Extraneous whitespace before the ,
might interfere with rendering. (I'm not sure, but it looks suspicious.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-worked this a bit in #10654
* First draft of grouping_id function * Add more tests and documentation * Add calcite tests * Fix travis failures * bit of a change * Add documentation * Fix typos * typo fix
Description
This PR adds grouping_id function in both SQL and native query layer. The function is modeled as an aggregate function, similar to how it is done in Calcite. Unlike regular aggregate functions, this one takes the grouping dimensions as arguments.
There were different ways, the aggregator result could have materialized for a particular subtotal spec. I initially tried putting the logic in the merge function inside
GroupByBinaryFnV2
. That didn't work though since many input rows do not need merging if the grouping key is seen only once. So I have now added the logic as and when we iterate through base rows, modifying the dimensions and grouping aggregator at the same time.This PR has:
Key changed/added classes in this PR
RowBasedGrouperHelper
GroupingSqlAggregator
GroupingAggregatorFactory