Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grouping keys may be read by JDBC connector after aggregation pushdown even if not needed by upstream query execution #9021

Closed
losipiuk opened this issue Aug 28, 2021 · 10 comments
Assignees
Labels
enhancement New feature or request

Comments

@losipiuk
Copy link
Member

losipiuk commented Aug 28, 2021

Reproducable via PostgreSqlQueryRunner

trino:tpch> explain select min(nationkey) from nation group by regionkey;
                                                                                    Query Plan
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [_pfgnrtd]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     Output[_col0]
     │   Layout: [_pfgnrtd:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
     │   _col0 := _pfgnrtd
     └─ RemoteSource[1]
            Layout: [_pfgnrtd:bigint]

 Fragment 1 [SOURCE]
     Output layout: [_pfgnrtd]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TableScan[postgresql:Query[SELECT "regionkey", min("nationkey") AS "_pfgnrtd_0" FROM "tpch"."nation" GROUP BY "regionkey"] columns=[_pfgnrtd_0:bigint:int8], grouped = false]
         Layout: [_pfgnrtd:bigint]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         _pfgnrtd := _pfgnrtd_0:bigint:int8

See TODO in the code being added in #8562

@losipiuk losipiuk changed the title Ensure grouping keys are not read by JDBC connector after aggregation pushdown if not needed by upstream query execution Grouping keys may be read by JDBC connector after aggregation pushdown even if not needed by upstream query execution Aug 28, 2021
@findepi findepi added the enhancement New feature or request label Aug 29, 2021
@hashhar
Copy link
Member

hashhar commented Aug 30, 2021

Seems like it happens due to

.addAll(groupingSetsAsJdbcColumnHandles.stream()
.flatMap(List::stream)
.distinct()
.iterator())
.

We can try to be smarter about this and only add a projection for the grouping column if it's required (e.g. no aggregation function is present or the grouping column is part of an existing projection).

I'll try and send a pull-request for this.

@findepi
Copy link
Member

findepi commented Aug 30, 2021

I'll try and send a pull-request for this.

please base it on #8562 work.

@hashhar hashhar self-assigned this Aug 30, 2021
@sergey-melnychuk sergey-melnychuk self-assigned this Sep 23, 2021
@hashhar hashhar removed their assignment Sep 24, 2021
@sergey-melnychuk
Copy link

I didn't find any meaningful "heuristic" on DefaultJdbcMetadata::applyAggregation method level to cut off unnecessary columns from being propagated/selected. I believe more advanced change of behaviour is required around PushAggregationIntoTableScan and TableScanNode areas of planner/optimizer.

In the provided query (select min(nationkey) from nation group by regionkey;) the regionkey is added to the assignments from the TableScanNode instance itself, thus current implementation (all columns used in aggregations are present) does make sense.

@findepi
Copy link
Member

findepi commented Jan 17, 2022

Note:

     TableScan[postgresql:Query[SELECT "regionkey", min("nationkey") AS "_pfgnrtd_0" FROM "tpch"."nation" GROUP BY "regionkey"] columns=[_pfgnrtd_0:bigint:int8], grouped = false]

Query[...] indicates the query as passed from coordinator to the worker.
What is the final query sent to remote database (here: PostgreSQL)?
Is it wrapped as

SELECT _pfgnrtd_0 FROM (SELECT "regionkey", min("nationkey") AS "_pfgnrtd_0" FROM "tpch"."nation" GROUP BY "regionkey")

?

if it was wrapped, there wouldn't be a problem.

cc @grantatspothero @kasiafi

@grantatspothero
Copy link
Contributor

grantatspothero commented Jan 18, 2022

@findepi good point I did not know workers further refine queries! just for my learning, could you point me to where that happens?

When checking the postgres logs the actual query being ran is:

2022-01-18 16:14:18.996 UTC [138] LOG:  execute <unnamed>/C_1: SELECT "_pfgnrtd_0" FROM (SELECT "regionkey", min("nationkey") AS "_pfgnrtd_0" FROM "tpch"."nation" GROUP BY "regionkey") o

As you mentioned, it seems this issue is not actually an issue 🎉

@grantatspothero
Copy link
Contributor

Actually found the place where the query is refined before being executed here:
https://github.com/trinodb/trino/blob/master/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordCursor.java#L110

@findepi
Copy link
Member

findepi commented Jan 18, 2022

Closing as "Cannot reproduce".

@hashhar
Copy link
Member

hashhar commented Jan 18, 2022

good point I did not know workers further refine queries!

Today I learnt as well.

@findepi please correct if wrong. Also does this mean that applyTopN and applyLimit happen on workers? (JdbcRecordCursor calls JdbcClient.buildSql -> prepareQuery -> applyQueryTransformations -> applyTopN/Limit).

@grantatspothero
Copy link
Contributor

grantatspothero commented Jan 18, 2022

@hashhar applyLimit does not fire on the workers, but if the planner rules that use applyLimit fire they will tack information onto the JdbcTableHandle. Then buildSql fires on the workers and rewrites the query to include the limit.

Initial query: select * from (SELECT min(nationkey) as m FROM nation GROUP BY regionkey ) tmp limit 1

right before buildSql, note PushAggregationIntoTableScan and PushLimitIntoTableScan already fired:
Query[SELECT "regionkey", min("nationkey") AS "_pfgnrtd_0" FROM "tpch"."nation" GROUP BY "regionkey"] limit=1 columns=[_pfgnrtd_0:bigint:int8]

right after: SELECT "_pfgnrtd_0" FROM (SELECT "regionkey", min("nationkey") AS "_pfgnrtd_0" FROM "tpch"."nation" GROUP BY "regionkey") o LIMIT 1

@findepi
Copy link
Member

findepi commented Jan 19, 2022

exactly.

(JdbcRecordCursor calls JdbcClient.buildSql -> prepareQuery -> applyQueryTransformations -> applyTopN/Limit).

this is on worker

ConnectorMetadata.apply{Limit,TopN} is called on the coordinator

grantatspothero added a commit to grantatspothero/trino that referenced this issue Jan 19, 2022
The upstream issue: trinodb#9021 cannot be reproduced, see the issue for details
grantatspothero added a commit to grantatspothero/trino that referenced this issue Jan 27, 2022
The upstream issue: trinodb#9021 cannot be reproduced, see the issue for details
hashhar pushed a commit that referenced this issue Jan 28, 2022
The upstream issue: #9021 cannot be reproduced, see the issue for details
ranganathg pushed a commit to ranganathg/trino that referenced this issue Feb 7, 2022
The upstream issue: trinodb#9021 cannot be reproduced, see the issue for details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Development

Successfully merging a pull request may close this issue.

5 participants