-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement aggregation pushdown in Pinot #6069
Conversation
Resolves #4140 |
6781317
to
cd77bf2
Compare
cd77bf2
to
9e82d36
Compare
presto-pinot/src/main/java/io/prestosql/pinot/PinotMetadata.java
Outdated
Show resolved
Hide resolved
presto-pinot/src/main/java/io/prestosql/pinot/query/DynamicTableBuilder.java
Outdated
Show resolved
Hide resolved
presto-pinot/src/main/java/io/prestosql/pinot/query/DynamicTableBuilder.java
Outdated
Show resolved
Hide resolved
presto-pinot/src/test/java/io/prestosql/pinot/PinotQueryRunner.java
Outdated
Show resolved
Hide resolved
presto-pinot/src/main/java/io/prestosql/pinot/PinotMetadata.java
Outdated
Show resolved
Hide resolved
@elonazoulay please rebase. See especially #6667 which changes JdbcTableHandle a bit and how agg pushdown is handled now. |
9e82d36
to
4088407
Compare
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java
Outdated
Show resolved
Hide resolved
591c3b2
to
45ba425
Compare
@findepi Sounds good! I will be removing the DynamicTableBuilder in an upcoming pull request. |
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Show resolved
Hide resolved
plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestMinimalFunctionality.java
Outdated
Show resolved
Hide resolved
9dc0553
to
754b5ab
Compare
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestAggregationPushdown.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestMinimalFunctionality.java
Outdated
Show resolved
Hide resolved
.setCatalogSessionProperty("pinot", "enable_aggregation_pushdown", "true") | ||
.build(); | ||
String noGroupingsQueryTemplate = "SELECT COUNT(*), SUM(%1$s), AVG(%1$s), MIN(%1$s), MAX(%1$s) FROM " + TOPIC_AND_TABLE; | ||
assertQueryFails(format(noGroupingsQueryTemplate, "long_number"), "Segment query returned.*"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is expected to fail? please add explanation.
(same below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without aggregation pushdown enabled, the connector will query the servers directly. This is a way to test the limit for segment queries. A limit is enforced to avoid pinot servers crashing when too many large selects are run.
plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotIntegrationSmokeTest.java
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java
Outdated
Show resolved
Hide resolved
754b5ab
to
ec5f4af
Compare
As i understand, this currently depends on #7627. |
7adff3c
to
3515f68
Compare
0d90917
to
e9189eb
Compare
@elonazoulay let me know whet this is ready for review. Also. it seems that
-- some commits could be run in separate PRs, which could perhaps reduce review overhead? |
@findepi - this is ready, but to make it easier I will break it up into separate PR's, thanks! |
plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Outdated
Show resolved
Hide resolved
if (inputExpression.isEmpty() || (isCountFunction(aggregate) && !isCountStarFunction(aggregate))) { | ||
return Optional.empty(); | ||
} | ||
// Distinct aggregations other than distinctcounthll are not supported | ||
if (aggregate.isDistinct()) { | ||
return Optional.empty(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be handled for individual functions separately. What would it take to use AggregateFunctionRewriter
here?
Also, checking isDistinct
is not enough.
See io.trino.plugin.jdbc.expression.AggregateFunctionPatterns#basicAggregation
which matches only "basic" aggregation functions, without features that are both: easy to forget about and hard to implement in the remote system.
.map(PinotColumnHandle.class::cast) | ||
.map(PinotColumnHandle::getColumnName) | ||
.collect(toImmutableList()), | ||
tableHandle.getQuery().flatMap(DynamicTable::getFilter), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in DynamicTable
what are the semantics of filters, aggregations, sorting, etc?
Which is applied first, which is second, etc.?
Current ordering of fields suggest that aggregation comes before filter, so this line looks incorrect -- you are transposing pre-existing filter with an aggregation.
If this is "just" a matter of field ordering in DynamicTable, please update it and add commentary like here:
trino/presto-base-jdbc/src/main/java/io/prestosql/plugin/jdbc/JdbcTableHandle.java
Lines 47 to 56 in b52bcd7
private final TupleDomain<ColumnHandle> constraint; | |
// semantically aggregation is applied after constraint | |
private final Optional<List<List<JdbcColumnHandle>>> groupingSets; | |
// semantically limit is applied after aggregation | |
private final OptionalLong limit; | |
// columns of the relation described by this handle, after projections, aggregations, etc. | |
private final Optional<List<JdbcColumnHandle>> columns; |
When doing that, sure the ordering of fields matches that of constructor arguments, they are equally important!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed AggregationExpression and now the connector returns the exact pinot data type for aggregations. Also implemented approx_distinct pushdown.
a352f5e
to
b3d6360
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elonazoulay I resolved the addressed comments. I think I have an alternative proposal that would work with #8578 and not require the copying. PTAL if what I'm saying makes sense and is feasible to implement.
There's another option to remove count_distinct pushdown (which is the only rule which needs the grouping columns in rewrite context) from this PR so that it works with #8578 and address count_distinct in a follow-up.
projections.add(new Variable(pinotColumnHandle.getColumnName(), pinotColumnHandle.getDataType())); | ||
resultAssignments.add(new Assignment(pinotColumnHandle.getColumnName(), pinotColumnHandle, pinotColumnHandle.getDataType())); | ||
} | ||
List<String> groupingColumns = getOnlyElement(groupingSets).stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elonazoulay IIUC the grouping columns in rewrite context are only needed to verify that the grouping columns are part of the SELECT clause. If so, can we instead do that validation here after the rewrite is done?
Yes, the connector will end up doing work that gets discarded but that seems cleaner because even if we add grouping columns to RewriteContext I don't see an easy way to populate them for other connectors so we won't be able to provide a sane impl. which doesn't return null
for that method.
I think this is possible to do because here we have tableHandle available from which we can get the grouping columns in case of DynamicTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, in a previous version of this pr it was done before the rewrite rules, I can add it back. Another idea: can RewriteContext.getGroupingColumns() return an Optional<List> as a default method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there some specific reason to change from the old implementation? Maybe I'm missing something. If it's feasible to do I'd like that.
As for the RewriteContext changes I'll defer to @martint (and Piotr once he's back). IMO if we have alternatives which are cleaner then I'd like to avoid the change. IMO the validation that grouping columns are part of select isn't exclusive to just count_distinct and makes sense to do before any rewrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Would another option be to pass the table handle into the rewrite context? I can get the grouping columns there also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hashhar - I extracted the check for grouping columns and reverted the changes for AggregateFunctionRewriter and RewriteContext. What do you think about adding TableHandle to the RewriteContext instead of this approach?
Just in case I saved the previous version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the newer version. We can revisit changing RewriteContext once we have other non-JDBC connectors that implement agg pushdown. We'll have better information at that time to decide the correct abstraction.
e0a146a
to
fdff656
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to go from my perspective % a question about behaviour of applyCountDistinct
.
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotMetadata.java
Show resolved
Hide resolved
@elonazoulay I think you can rebase on top of #8578 and remove the copied classes where possible. |
Will do, that's great news!!! Thanks @hashhar ! |
07baf9d
to
53b968d
Compare
Reorder members and constructor parameters in order of pushdown application.
Add a config to set broker query limit. If the limit is set too high, pinot can oom: apache/pinot#7110
This is a prepatory commit for aggregation pushdown which sets values to null, otherwise pinot never returns null values.
53b968d
to
8723d41
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotBrokerPageSource.java
Show resolved
Hide resolved
|
Thanks a lot for working on this @elonazoulay . ❤️ |
Thank you! |
Resolves #4140