Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite partial top-n node to LimitNode or LastNNode #18384

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

rice668
Copy link
Contributor

@rice668 rice668 commented Jul 24, 2023

Description

If Trino knows that each file has been sorted, it can rewrite the Partial TopN to LimitNode or LastNNode(a new logical plan node in this PR), which will significantly improve query performance. Trino can use the sort_order_id field in the Iceberg manifest entry to determine whether a data file has been sorted.

Backgrounds and Core Idea

For convenience of review, I need to describe the LastNOperator (a new operator was introduced) in this PR and the working principle of the related code. First, as a recap, let's take a quick look at how Page was originally built from OrcReader. I believe Trino gods are more familiar with the implementation details, but it can help understanding the LastNOperator implementations later. Trino's OrcReader advances based on RowGroups then obtains the current RowGroup's input stream, and construct Pages based on this input stream. Therefore, at least during the Pages generation phase, a Page is not generated across RowGroups. In other words, a Page does not come from two different RowGroups. Additionally, the batch parameter MAX_BATCH_SIZE is set to 8192 in current codebase, so if the default RowGroup size is 10,000, at least one Page will be generated for each RowGroup.

In this PR, when OrcReader outputs a Page, it adds an additional column whose value is calculated by concatenating the existing variables stripeSize and currentRowGroup. The following code snippet is the logic for adding the last column to the Page. This code is part of a class called OrcRecordBackwardReader. This class mainly iterates row groups in reverse order.

Page page = new Page(currentBatchSize, blocks);
long[] values = new long[page.getPositionCount()];
Arrays.fill(values, (long) stripeSize << 32 | currentRowGroup);
LongArrayBlock rowGroupId = new LongArrayBlock(page.getPositionCount(), Optional.empty(), values);
page = page.appendColumn(rowGroupId);

It should be noted that trino will do some additional optimization on the output Page. For example, when the
physical operator includes ScanFilterAndProjectOperator or FilterAndProjectOperator instead of TableScanOperator. Trino optimizes the Page by merging or splitting and this PR also adapts to the both scenarios.

If the implementation of LastNOperator reads from the beginning of the file and retains the data from the last few Pages at the end of the file, there may still be significant IO overhead due to processing a lot of irrelevant data. Therefore, the LastNOperator expects the input page to be constructed from the last RowGroup of the current split and this depends on the behavior of OrcReader's output Page,so the PR introduces a class called OrcRecordBackwardReader to differentiate it from the existing OrcRecordReader.

Furthermore, the LastNOperator implementation takes performance issues into account. Each Page entering the operator has a unique RowGroupId block to indicate from which row group the Page comes and the block is the last block of the Page. Unlike the LimitOperator, which outputs any input Page downstream until remainingLimit is no longer greater than 0, the LastNOperator accumulates the Pages. It does not output to the downstream operator until it accumulates N elements unless the total number of rows in the source data is less than N. Therefore, the LastNOperator is a blocking operator, like the OrderByOperator, and it can precisely calculates the N elements needed to reduce the memory of the current worker process and the network pressure caused by sending data to downstream operator.

Now, let's illustrate the general implementation approach of the LastNOperator with an example.

Consider the following data written sequentially in a column. For convenience, I use one row to represent it, and assume the row group size is 3. This numbers will be divided into three row groups. Since the data is written sequentially, the data between row groups follows the order: data in row group 3 is smaller than that in row group 2, and data in row group 2 is smaller than that in row group 1. However, within each row group, the data is still sequential.

1,2,3 | 4,5,6 | 7,8,9

row group-3 [1,2,3]
row group-2 [4,5,6]
row group-1 [7,8,9]

The LastNOperator maintains a corresponding queue for each row group, and the queue saves the pages generated by the row group. Also, the value of N is updated when encountering each new row group and the purpose is to calculate how many N is needed remaining in the new row group. For example, if we need the top 4 data. i.e. [9,8,7,6], with N being 4, then when row group-1 is processed done, N will be updated to 1, we can call it the remaining N is 1. At this point, only 1 more digit needs to be obtained from row group-2 is fine. Since in each row group, the data is still sequentialas so all pages in row group-2 must be read to find the page containing the number 6. Next, when reading row group-3, N is updated again, and when N decrease to 0, it indicates that no more data is needed. When the LastNOperator finally outputs, it region the pages to reduce memory and network pressure, ensuring that the data sent downstream only contains [9,8,7,6], even if don't region page at getOutput, it won't have much impact on memory and network because this part of the data is very small. In addition, the implementation of addInput in LastNOperator, for each current queue, the impact of memory on the worker process will also be fully considered.

After this version of the code if can be successfully merged, we will also plan to support multi-field sorting and the situation where only some part of files are sorted. Probably like below, but this PR does not have these functions.

image

Test

Testing was performed on real log queries to retrieve the latest 200 records, which is a very common business requirement. Two query scenarios were tested, one ascending and one descending. It is worth noting that the performance of these two test cases was almost identical.
The brief test idea is as follows:

Write

The iceberg table sorting field is defined as ORDER BY _timestamp DESC NULLS LAST, and the iceberg table is written using the Apache/Spark engine.

Read

The sorting of the query was divided into two cases, one that is exactly the same as the writing order and one that is completely opposite.

ORDER BY _timestamp DESC NULLS LAST: It took 1.96 seconds, and the partial-top-n node was rewritten as LimitNode, corresponding to LimitOperator.

ORDER BY _timestamp ASC NULLS FIRST: It took 2.24 seconds, and the partial-top-n node was rewritten as LastNNode, corresponding to the new operator added by the MR, called LastNOperator.

Compared to the original Trino query, the query takes 36.45 seconds, and the test results show a performance improvement of 16 to 18 times.

SELECT
  "cid",
  "errno",
  "host_deploy_env" "host.deploy_env",
  "host_name" "host.name",
  "host_region" "host.region",
  "host_zone" "host.zone",
  "log_file" "log.file",
  "log_level" "log.level",
  "log_msg" "log.msg",
  "log_observed_time" "log.observed_time",
  "log_offset" "log.offset",
  "msg",
  "pid",
  "service_name" "service.name",
  "tag",
  "version",
  "_consumer_hostname",
  "_id",
  "log_stuid" "log.stuid",
  "upstream_code",
  "upstream_addr",
  "req_type",
  "arg_trid",
  "_timestamp" "timestamp"
FROM iceberg_log.xxx_yyy_zzz
WHERE ((("msg" LIKE '%client%') AND ("msg" IS NOT NULL)) AND (_timestamp > 1705341600000) AND (_timestamp <= 1705342800000) AND (log_date >= '20240116') AND (log_date <= '20240116') AND (log_hour >= '02') AND (log_hour <= '02'))
ORDER BY _timestamp ASC NULLS FIRST
OFFSET 0 ROWS
LIMIT 200

Partition statistics

  • Size: 58.1GB
  • Record Counts: 1,083,459,817
  • Total File Counts: 236

Additional context and related issues

This is the work of the first part of this issue #18139

@cla-bot cla-bot bot added the cla-signed label Jul 24, 2023
@rice668 rice668 changed the title Push partial top to iceberg [in progress] Push partial top to iceberg Jul 24, 2023
@rice668
Copy link
Contributor Author

rice668 commented Jul 24, 2023

This mr still has some details that need to be dealt with, but the review can be started. @marton-bod @findepi @alexjo2144 If convenient, Would you mind to have a review on this MR ?

@github-actions github-actions bot added the iceberg Iceberg connector label Jul 24, 2023
@rice668 rice668 force-pushed the push_partial_top_to_iceberg branch from 7e257f5 to e097b3b Compare July 24, 2023 12:38
@rice668 rice668 changed the title [in progress] Push partial top to iceberg Push partial top to iceberg Jul 24, 2023
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().filter(toIcebergExpression(effectivePredicate)).planFiles()) {
for (FileScanTask scanTask : fileScanTasks) {
DataFile file = scanTask.file();
if (file.sortOrderId() == 0) {
Copy link
Contributor Author

@rice668 rice668 Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if iceberg DataFile defines a sortorder, but the file from planFile does not have it, I suspect this is a bug of iceberg.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting is optional. It's legitimate for files to be sorted differently, or not be sorted at all.

However, there is a problem with table.newScan(). We should avoid it. We use it for stats today, and we know it's causing problems (takes too long), so we don't want to do this during planning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File sorting is indeed optional, but necessary for this PR to accelerate queries (we can support partial file sorting scenarios later). Since table.newScan() is not recommended, is there another way to get information about whether the files have been sorted during the planning stage ?

And For query acceleration scenarios based on large amounts of top-N data, I think the planning time is negligible ?

@rice668 rice668 changed the title Push partial top to iceberg Attempt to push partial top node to iceberg Jul 25, 2023
@raunaqmorarka
Copy link
Member

Have you looked at using the same approach as #6634 or #8171 to replace partial topN with limit ?

@rice668
Copy link
Contributor Author

rice668 commented Jul 25, 2023

Hi, @raunaqmorarka I've given an initial review of the information available on the partial rewriting of TopN to Limit. It seems that my first step would be to acquire the ConnectorTableProperties object within the getTableProperties method, and during this process, I would need to construct a LocalProperty object, which would hold the sorting information.

However, an issue arises as the getTableProperties operation is also executed for basic queries such as 'select * from table limit x'. To discern whether an Iceberg table is sorted, the planFile is required, which constitutes a heavy operation due to the necessity to traverse all records in the Avro file, particularly when there are a multitude of table data files. This appears to differentiate from other connectors, such as Phoenix or Hive.

Furthermore, if data is written by (for instance, ASC) contradicts the query direction (for instance, DESC), it seems that the current Trino version doesn't provide support. Do you have any suggestions? I would greatly appreciate your insights.

@sopel39 sopel39 requested a review from findepi August 9, 2023 09:38
@sopel39
Copy link
Member

sopel39 commented Aug 9, 2023

@zhangminglei Thanks for PR. Could you rebase? it also seems that iceberg tests are failing

@rice668
Copy link
Contributor Author

rice668 commented Oct 18, 2023

@sopel39 Sorry, I haven't read this issue for a long time. I would think it is an iceberg bug apache/iceberg#8864, Hi, @nastra , Could you please help to check this ?

@rice668
Copy link
Contributor Author

rice668 commented Oct 24, 2023

@findepi @sopel39 Just a friendly ping, the test failed was caused by iceberg bug and I fixed it
(apache/iceberg#8873) . But now, I can only remove the end-to-end test(TestPushPartialTopNToIceberg.java in my PR) first. After the latest version of iceberg is released, I think this test can be added back. I'd like to know how you two think about this.

try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().filter(toIcebergExpression(effectivePredicate)).planFiles()) {
for (FileScanTask scanTask : fileScanTasks) {
DataFile file = scanTask.file();
if (file.sortOrderId() == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting is optional. It's legitimate for files to be sorted differently, or not be sorted at all.

However, there is a problem with table.newScan(). We should avoid it. We use it for stats today, and we know it's causing problems (takes too long), so we don't want to do this during planning.

@@ -2546,6 +2552,62 @@ private static IcebergColumnHandle createProjectedColumnHandle(IcebergColumnHand
Optional.empty());
}

@Override
public Optional<PartialSortApplicationResult<ConnectorTableHandle>> applyPartialSort(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not applyTopN?

Copy link
Contributor Author

@rice668 rice668 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be precise, this PR does not implement push-down top-N, which should be implemented by database software based on JDBC client connections.This PR just rewrite partial-top-n-node to LimitNode or LastNNode.

From

partial-top-n
  Project
    Filter
      TableScan

To

Limit/LastN
  Project
    Filter
      TableScan

Copy link
Contributor Author

@rice668 rice668 Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting is optional. It's legitimate for files to be sorted differently, or not be sorted at all.

You are right. In addition, the multi-version of SortOrder is not considered now. If the SortOrderId of each file is inconsistent with the SortOrderId defined by the table, then this optimization cannot be used.

originalTableHandle.withSort(allSorted),
trinoSortOrder.isAscending() == icebergSortOrder.isAscending(),
trinoSortOrder.isNullsFirst() == icebergSortOrder.isNullsFirst(),
allSorted));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i don't understand how it works.
Let's assume i have a bunch of sorted files.
Each of these files may produce one or more splits.
How does the fact that files are sorted help in anything?

Copy link
Contributor Author

@rice668 rice668 Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good and worthwhile question. If I understand you correctly, if all the files in the query range are already sorted and the selectivity of the filter field is very low, then it is necessary to have one split per file. However, if the selectivity of the filter field is relatively high, one split per file may not have enough concurrency, which may affect query performance, because almost all the data in the file needs to be read to find the required data. In this case, we would rather have a file split into multiple splits to improve query performance.

@findepi
Copy link
Member

findepi commented Oct 24, 2023

If Trino knows that each file has been sorted, it can remove the Partial Sort and Partial TopN from the execution plan, which will significantly improve query performance.
....
Performance gains are significant with ASC and DESC ordering. Specifically, we see a more than 20x increase in performance with ASC and a 2x increase in performance with DESC when the limit is relatively large (e.g., ORDER BY x DESC LIMIT 10000).

cc @sopel39 @lukasz-stec

@sopel39
Copy link
Member

sopel39 commented Oct 30, 2023

If Trino knows that each file has been sorted, it can remove the Partial Sort and Partial TopN from the execution plan, which will significantly improve query performance.

I think it's a good idea. @findepi will you continue with review?

Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Jan 11, 2024
@mosabua
Copy link
Member

mosabua commented Jan 11, 2024

@zhangminglei please submit the new version as you suggested and also adapt by rebasing. Let us know how we can help after that.

@rice668
Copy link
Contributor Author

rice668 commented Jan 12, 2024

Thanks @mosabua ! The latest code has been merged to production within our company. The code ensures that data written in one order (e.g. DESC) can be query in two different orders (DESC or ASC) with almost consistent query performance.

I will submit the latest code as soon as possible

@github-actions github-actions bot removed the stale label Jan 12, 2024
@rice668 rice668 changed the title Attempt to push partial top node to iceberg Rewrite partial top-n node to LimitNode or LastNNode Jan 17, 2024
Copy link

cla-bot bot commented Jan 18, 2024

Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: zhangminglei.
This is most likely caused by a git client misconfiguration; please make sure to:

  1. check if your git client is configured with an email to sign commits git config --list | grep email
  2. If not, set it up using git config --global user.email [email protected]
  3. Make sure that the git commit email is configured in your GitHub account settings, see https://github.com/settings/emails

@cla-bot cla-bot bot added the cla-signed label Jan 22, 2024
@rice668 rice668 force-pushed the push_partial_top_to_iceberg branch 2 times, most recently from ced3941 to 2b21616 Compare January 22, 2024 06:56
@rice668 rice668 marked this pull request as ready for review January 22, 2024 07:10
@rice668 rice668 requested a review from findepi January 22, 2024 07:22
@rice668 rice668 force-pushed the push_partial_top_to_iceberg branch 2 times, most recently from 847cd8a to b345b77 Compare January 24, 2024 04:24
@rice668
Copy link
Contributor Author

rice668 commented Jan 24, 2024

Hello, @mosabua @findepi @sopel39 I think the PR is ready for review, but I still need to add some implementation ideas to speed up the process, but this does not prevent the review from starting now. thank you.

I have added the Backgrounds and Core Idea in the PR description.

// partition specs are {0:f1, 1:f1,f2, 2:f3}
// if query sort is `order by f2`, filter out result(specIdNotMatching) is specId=0,2. later we will use `planFile` decide whether the planed files have specId 0 or 2.
// if all the files spec ids does not have 0 or 2 due to some reason, then we hope the query can use our optimization.
table.specs().values().forEach(spec -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if spec is no longer used, does it matter?

(can a historical spec make the feature not work?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends. The important thing is whether the plan file generated includes this historical spec. If it does, then the feature not work.

* </pre>
*
*/
public class PushPartialTopNIntoTableScan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @martint Just a friendly ping, Could you take a look on this ?

/**
* If the file is ordered, attempt to push down sort or topN to connector.
*/
default Optional<PartialSortApplicationResult<ConnectorTableHandle>> applyPartialSort(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Trino partial sort would often indicate "distributed sort" feature, but this is not the context this method is called (or could be called in the future). Not sure how this new SPI method should be best framed.

cc @martint

@rice668
Copy link
Contributor Author

rice668 commented Jan 30, 2024

Thanks @findepi for review, I have modified the PR in 1st round commit based on your comments . Please continue to check it out when you have time, Thank you.

@mosabua
Copy link
Member

mosabua commented Feb 2, 2024

Fyi @martint @findepi @osscm

Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Feb 28, 2024
Copy link

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

@github-actions github-actions bot closed this Mar 21, 2024
@mosabua
Copy link
Member

mosabua commented Mar 21, 2024

I think we should still pursue this so I am reopening

@mosabua mosabua reopened this Mar 21, 2024
@github-actions github-actions bot removed the stale label Mar 22, 2024
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Apr 15, 2024
@github-actions github-actions bot removed the stale label Apr 19, 2024
@mosabua mosabua added the stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. label Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed hive Hive connector iceberg Iceberg connector stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed.
Development

Successfully merging this pull request may close these issues.

5 participants