Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Top-K query optimization for ORDER BY <EXPR> [ASC|DESC] LIMIT n #3516

Closed
Dandandan opened this issue Sep 16, 2022 · 10 comments
Closed
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@Dandandan
Copy link
Contributor

Dandandan commented Sep 16, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As explained by @alamb in the user defined plan:

TopK Background:
A "Top K" node is a common query optimization which is used for
queries such as "find the top 3 customers by revenue". The
(simplified) SQL for such a query might be:

CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
 STORED AS CSV location 'tests/customer.csv';

SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;

And a naive plan would be:

> explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
 +--------------+----------------------------------------+
 | plan_type    | plan                                   |
 +--------------+----------------------------------------+
| logical_plan | Limit: 3                               |
 |              |   Sort: #revenue DESC NULLS FIRST      |
 |              |     Projection: #customer_id, #revenue |
 |              |       TableScan: sales |
 +--------------+----------------------------------------+

While this plan produces the correct answer, the careful reader
will note it fully sorts the input before discarding everything
other than the top 3 elements.

In the ClickBench benchmarks (https://github.com/ClickHouse/ClickBench) we can see many queries that might benefit from this optimization, but we can see they are converted to a "naive" plan. See for example query 9:

9. EXPLAIN SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;

-----------------------------------------


DataFusion CLI v12.0.0
0 rows in set. Query took 0.062 seconds.
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                          |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=10                                                                                                                                       |
|               |   Sort: #u DESC NULLS FIRST                                                                                                                                   |
|               |     Projection: #hits.RegionID, #COUNT(DISTINCT hits.UserID) AS u                                                                                             |
|               |       Projection: #group_alias_0 AS RegionID, #COUNT(alias1) AS COUNT(DISTINCT hits.UserID)                                                                   |
|               |         Aggregate: groupBy=[[#group_alias_0]], aggr=[[COUNT(#alias1)]]                                                                                        |
|               |           Aggregate: groupBy=[[#hits.RegionID AS group_alias_0, #hits.UserID AS alias1]], aggr=[[]]                                                           |
|               |             TableScan: hits projection=[RegionID, UserID]                                                                                                     |
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                             |
|               |   SortExec: [u@1 DESC]                                                                                                                                        |
|               |     CoalescePartitionsExec                                                                                                                                    |
|               |       ProjectionExec: expr=[RegionID@0 as RegionID, COUNT(DISTINCT hits.UserID)@1 as u]                                                                       |
|               |         ProjectionExec: expr=[group_alias_0@0 as RegionID, COUNT(alias1)@1 as COUNT(DISTINCT hits.UserID)]                                                    |
|               |           AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                                                  |
|               |             CoalesceBatchesExec: target_batch_size=4096                                                                                                       |
|               |               RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }], 16)                                                            |
|               |                 AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]                                                     |
|               |                   AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]                                   |
|               |                     CoalesceBatchesExec: target_batch_size=4096                                                                                               |
|               |                       RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }, Column { name: "alias1", index: 1 }], 16)               |
|               |                         AggregateExec: mode=Partial, gby=[RegionID@0 as group_alias_0, UserID@1 as alias1], aggr=[]                                           |
|               |                           RepartitionExec: partitioning=RoundRobinBatch(16)                                                                                   |
|               |                             ParquetExec: limit=None, partitions=[home/danielheres/Code/gdd/ClickBench/datafusion/hits.parquet], projection=[RegionID, UserID] |
|               |                                                                                                                                                               |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.015 seconds.

Describe the solution you'd like
Implement this feature.

It basically means pushing the limit information down to SortPreservingMergeStream and keeping only the top N elements in the min_heap datastructure.

Describe alternatives you've considered
n/a

Additional context
Add any other context or screenshots about the feature request here.

@Dandandan Dandandan added enhancement New feature or request performance Make DataFusion faster labels Sep 16, 2022
@xudong963
Copy link
Member

xudong963 commented Sep 16, 2022

Min heap is a reasonable data structure to solve the issue.

@alamb
Copy link
Contributor

alamb commented Sep 16, 2022

The TopK operator can also be parallelized the same way as a multi-stage grouping

Like run TopK on each of the input DataFusion partitions, Union them all together, and then do a TopK on the final plan

@Dandandan
Copy link
Contributor Author

Dandandan commented Sep 18, 2022

I think there are two items here

  • parallelizing order by followed by limit n:
  • implement topk operator

Currently a plan looks like the following

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                            |
|               |   SortExec: [COUNT(UInt8(1))@3 DESC]                                                                                                                                                                                                                         |

I think we should be translating it to:

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                            |
|               |   SortExec: [COUNT(UInt8(1))@3 DESC]                                                                                                                                                                                                                         |
|               |     CoalescePartitionsExec                                                                                                                                                                                                                                   |

Already with the current implementation we should be able to rewrite it to:

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                            |
|               |   SortPreservingMergeExec                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |      LocalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                   |
|               |        SortExec: [COUNT(UInt8(1))@3 DESC]

@alamb
Copy link
Contributor

alamb commented Sep 18, 2022

Already with the current implementation we should be able to rewrite it to:

I agree this would help performance. However, it will become irrelevant if/when we implement a real TopK operator

@Dandandan
Copy link
Contributor Author

Already with the current implementation we should be able to rewrite it to:

I agree this would help performance. However, it will become irrelevant if/when we implement a real TopK operator

I imagine a similar optimization is still useful with a TopK operator. The example TopK operator in the tests still works only a single partition and can't run in parallel. I think you'll still need a "nested" TopK operator similar to the above plan to achieve the parallelization. So at the moment there is a TopK operator, we can change it to utilize this.

Of course the TopK operator will use way less memory - but in terms of performance for simple queries that can be executed in memory, the rewrite should probably already have a big benefit.

@alamb
Copy link
Contributor

alamb commented Sep 18, 2022

@Dandandan -- I had not thought of the "parallel sorts with limit" as a good intermediate state -- you are exactly right and that makes perfect sense

@Dandandan
Copy link
Contributor Author

Dandandan commented Sep 19, 2022

After playing with it a bit more, I think this issue boils down to pushing a limit to SortPreservingMergeStream and using it there to only keep top N items instead of keeping the full sorted output. The SortPreservingMergeStream already uses a heap data structure mentioned by @xudong963

@Dandandan
Copy link
Contributor Author

So it seems we mostly have the benefits of a TopK operator now by pushing down the limit to individual operations.

There are a couple of followups possible (will create some tickets for them and close this one):

  • Avoiding spilling to disk by reinserting the to-spill data to a memory buffer
  • Use limit in SortPreserveMergeExec
  • Keeping memory under control by merging buffered batches once in a while

@jychen7
Copy link
Contributor

jychen7 commented Apr 14, 2023

There are a couple of followups possible (will create some tickets for them and close this one):
Use limit in SortPreserveMergeExec

is there an issue created for this follow-up? I didn't find it, so I create #6000. Feel free to close it if duplicated.

Also, feel free to correct me if my understanding and expectation in #6000 are wrong.

@Dandandan
Copy link
Contributor Author

I believe this is completed by the work and we should track any upcoming issues somewhere else.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

4 participants