Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parquet page level skipping (page index pruning) by default #5099

Merged
merged 4 commits into from
May 12, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Jan 28, 2023

Draft until:

Which issue does this PR close?

Closes #4085

Related to #6287

Rationale for this change

We have several performance features that help querying parquet that are not enabled by default. This not only decreases our test coverage of those features, it also means users of DataFusion do not see the benefits of using them.

This feature is awesome and we should have it on as it improves performance -- see https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/

What changes are included in this PR?

Changes:

  1. Enable by default
  2. Update tests

Are these changes tested?

Covered by existing tests

Are there any user-facing changes?

Hopefully faster queries on parquet

benchmark results:

See below

@github-actions github-actions bot added core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Jan 30, 2023
@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) and removed optimizer Optimizer rules documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) labels Feb 13, 2023
@alamb
Copy link
Contributor Author

alamb commented Feb 15, 2023

All that is needed now for this PR I think is some benchmark results

@alamb alamb force-pushed the alamb/enable_page_pruning branch from 294e121 to c8bda4a Compare March 20, 2023 15:04
@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2023

TLDR looks like this feature makes Q7 and Q16 slower on TPCH benchmarks

I think we need to review this more

To test I used

datafusion: alamb/enable_page_pruning
datafusion2: main as of  26e1b20ea3362ea62cb713004a0636b8af6a16d7

And ran the tpch queries or both SF1 and SF10 (1GB and 10GB against parquet datasets) on a google cloud machine:

cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ~/tpch_data/parquet_data_SF1 --format parquet -o ~/enable_page_index

My results are as follows

alamb@aal-dev:~/arrow-datafusion3/benchmarks$ ./compare.py ~/main_1GB/tpch-summary--1679329989.json ~/enable_page_index_1GB/tpch-summary--1679328275.json
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ /home/alamb… ┃ /home/alamb… ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1           │    1709.64ms │    1695.27ms │     no change │
│ Q2           │     490.80ms │     472.05ms │     no change │
│ Q3           │     560.96ms │     556.39ms │     no change │
│ Q4           │     221.62ms │     212.50ms │     no change │
│ Q5           │     749.65ms │     749.12ms │     no change │
│ Q6           │     458.11ms │     452.70ms │     no change │
│ Q7           │    1184.62ms │    1297.19ms │  1.10x slower │
│ Q8           │     707.43ms │     728.24ms │     no change │
│ Q9           │    1195.69ms │    1198.06ms │     no change │
│ Q10          │     776.29ms │     833.59ms │  1.07x slower │
│ Q11          │     381.73ms │     392.42ms │     no change │
│ Q12          │     329.34ms │     343.47ms │     no change │
│ Q13          │    1371.40ms │    1339.00ms │     no change │
│ Q14          │     443.23ms │     454.51ms │     no change │
│ Q15          │     448.54ms │     464.96ms │     no change │
│ Q16          │     278.15ms │     318.71ms │  1.15x slower │
│ Q17          │    6150.47ms │    5874.44ms │     no change │
│ Q18          │    3574.89ms │    3929.19ms │  1.10x slower │
│ Q19          │     792.59ms │     775.01ms │     no change │
│ Q20          │    1720.97ms │    1851.68ms │  1.08x slower │
│ Q21          │    1726.90ms │    1864.49ms │  1.08x slower │
│ Q22          │     525.99ms │     198.84ms │ +2.65x faster │
└──────────────┴──────────────┴──────────────┴───────────────┘
alamb@aal-dev:~/arrow-datafusion3/benchmarks$ ./compare.py ~/main_10GB/tpch-summary--1679330119.json  ~/enable_page_index_10GB/tpch-summary--1679328405.json
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ /home/alamb… ┃ /home/alamb… ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1           │   16252.56ms │   16031.82ms │     no change │
│ Q2           │    3994.56ms │    4353.75ms │  1.09x slower │
│ Q3           │    5572.06ms │    5620.27ms │     no change │
│ Q4           │    2144.14ms │    2194.67ms │     no change │
│ Q5           │    7796.93ms │    7646.74ms │     no change │
│ Q6           │    4382.32ms │    4327.16ms │     no change │
│ Q7           │   18702.50ms │   19922.74ms │  1.07x slower │
│ Q8           │    7383.74ms │    7616.21ms │     no change │
│ Q9           │   13855.17ms │   14408.42ms │     no change │
│ Q10          │    7446.05ms │    8030.00ms │  1.08x slower │
│ Q11          │    3414.81ms │    3850.34ms │  1.13x slower │
│ Q12          │    3027.16ms │    3085.89ms │     no change │
│ Q13          │   18859.06ms │   18627.02ms │     no change │
│ Q14          │    4157.91ms │    4140.22ms │     no change │
│ Q15          │    5293.05ms │    5369.17ms │     no change │
│ Q16          │    6512.42ms │    3011.58ms │ +2.16x faster │
│ Q17          │   86253.33ms │   76036.06ms │ +1.13x faster │
│ Q18          │   45101.99ms │   49717.76ms │  1.10x slower │
│ Q19          │    7323.15ms │    7409.85ms │     no change │
│ Q20          │   19902.39ms │   20965.94ms │  1.05x slower │
│ Q21          │   22040.06ms │   23184.84ms │  1.05x slower │
│ Q22          │    2011.87ms │    2143.62ms │  1.07x slower │
└──────────────┴──────────────┴──────────────┴───────────────┘

@Ted-Jiang
Copy link
Member

I will try to fix this week 😄 hope find the bottleneck.

@alamb
Copy link
Contributor Author

alamb commented May 8, 2023

That would be amazing @Ted-Jiang - thank you so much. I would love to help out too -- this one has been on my list for a long long time.

@Ted-Jiang
Copy link
Member

Ted-Jiang commented May 9, 2023

I test in my local M1, seems there is no regression in q16 (which page index prune none data). Set DATAFUSION_EXECUTION_PARQUET_ENABLE_PAGE_INDEX=true target/release/datafusion-cli
But i found

without page Index
  ParquetExec: file_groups={10 groups: [[Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:0..3110285], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:3110285..6220570], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:6220570..9330855], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:9330855..12441140], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:12441140..15551425], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:15551425..18661710], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:18661710..21771995], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:21771995..24882280], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:24882280..27992565], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:27992565..31102845]]}, projection=[ps_partkey, ps_suppkey], metrics=[output_rows=800000, elapsed_compute=10ns, spill_count=0, spilled_bytes=0, mem_used=0, bytes_scanned=2657280, row_groups_pruned=0, page_index_rows_filtered=0, predicate_evaluation_errors=0, num_predicate_creation_errors=0, pushdown_rows_filtered=0, time_elapsed_scanning_until_data=25.893623ms, page_index_eval_time=20ns, time_elapsed_opening=117.212334ms, pushdown_eval_time=20ns, time_elapsed_scanning_total=250.615706ms, time_elapsed_processing=128.193753ms]           
   
With page index
  ParquetExec: file_groups={10 groups: [[Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:0..3110285], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:3110285..6220570], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:6220570..9330855], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:9330855..12441140], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:12441140..15551425], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:15551425..18661710], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:18661710..21771995], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:21771995..24882280], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:24882280..27992565], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:27992565..31102845]]}, projection=[ps_partkey, ps_suppkey], metrics=[output_rows=800000, elapsed_compute=10ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned=0, bytes_scanned=2903390, num_predicate_creation_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, pushdown_rows_filtered=0, time_elapsed_processing=133.898828ms, pushdown_eval_time=20ns, time_elapsed_scanning_until_data=17.617834ms, page_index_eval_time=20ns, time_elapsed_opening=88.243249ms, time_elapsed_scanning_total=238.237217ms]                                                                                                           |

Form the bytes_scanned you can see second one scan more bytes (reasonable with page Index).
So IMOP , you do the test in google cloud machine which means will have more latency when fetch bytes than local machine.

From the plan

+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=18314, elapsed_compute=25.292µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |   ProjectionExec: expr=[group_alias_0@0 as part.p_brand, group_alias_1@1 as part.p_type, group_alias_2@2 as part.p_size, COUNT(alias1)@3 as supplier_cnt], metrics=[output_rows=18314, elapsed_compute=29.417µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |     AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, group_alias_1@1 as group_alias_1, group_alias_2@2 as group_alias_2], aggr=[COUNT(alias1)], metrics=[output_rows=18314, elapsed_compute=170.210247ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|                   |       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=85318, elapsed_compute=9.629845ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |         RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }, Column { name: "group_alias_1", index: 1 }, Column { name: "group_alias_2", index: 2 }], 10), input_partitions=10, metrics=[fetch_time=4.378393626s, repart_time=56.449415ms, send_time=729.375µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |           AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0, group_alias_1@1 as group_alias_1, group_alias_2@2 as group_alias_2], aggr=[COUNT(alias1)], metrics=[output_rows=85318, elapsed_compute=398.147584ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |             AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, group_alias_1@1 as group_alias_1, group_alias_2@2 as group_alias_2, alias1@3 as alias1], aggr=[], metrics=[output_rows=118250, elapsed_compute=370.184626ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|                   |               CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=118250, elapsed_compute=14.044362ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                   |                 RepartitionExec: partitioning=Hash([Column { name: "group_alias_0", index: 0 }, Column { name: "group_alias_1", index: 1 }, Column { name: "group_alias_2", index: 2 }, Column { name: "alias1", index: 3 }], 10), input_partitions=10, metrics=[fetch_time=3.459734628s, repart_time=202.497167ms, send_time=106.304383ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |                   AggregateExec: mode=Partial, gby=[p_brand@1 as group_alias_0, p_type@2 as group_alias_1, p_size@3 as group_alias_2, ps_suppkey@0 as alias1], aggr=[], metrics=[output_rows=118250, elapsed_compute=387.699496ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |                     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=118274, elapsed_compute=16.411µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |                       HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "ps_suppkey", index: 0 }, Column { name: "s_suppkey", index: 0 })], metrics=[output_rows=118278, build_input_rows=118324, output_batches=13, input_rows=118278, input_batches=13, build_input_batches=20, build_mem_used=11183594, join_time=36.18104ms, build_time=71.978712ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |                         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=118324, elapsed_compute=16.599522ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |                           RepartitionExec: partitioning=Hash([Column { name: "ps_suppkey", index: 0 }], 10), input_partitions=10, metrics=[fetch_time=2.856884712s, repart_time=44.825252ms, send_time=647.955µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|                   |                             ProjectionExec: expr=[ps_suppkey@1 as ps_suppkey, p_brand@3 as p_brand, p_type@4 as p_type, p_size@5 as p_size], metrics=[output_rows=118324, elapsed_compute=39.667µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                   |                               CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=118324, elapsed_compute=47.91µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |                                 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "ps_partkey", index: 0 }, Column { name: "p_partkey", index: 0 })], metrics=[output_rows=29581, build_input_rows=800000, output_batches=10, input_rows=29581, input_batches=10, build_input_batches=100, build_mem_used=56076192, join_time=84.478875ms, build_time=281.910325ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |                                   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=800000, elapsed_compute=7.032531ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |                                     RepartitionExec: partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 10), input_partitions=10, metrics=[fetch_time=213.551508ms, repart_time=111.779256ms, send_time=2.88029ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |                                       ParquetExec: file_groups={10 groups: [[Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:0..3110285], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:3110285..6220570], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:6220570..9330855], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:9330855..12441140], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:12441140..15551425], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:15551425..18661710], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:18661710..21771995], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:21771995..24882280], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:24882280..27992565], [Users/yangjiang/tpch-parquet/partsupp/part-0.parquet:27992565..31102845]]}, projection=[ps_partkey, ps_suppkey], metrics=[output_rows=800000, elapsed_compute=10ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned=0, bytes_scanned=2903390, num_predicate_creation_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, pushdown_rows_filtered=0, time_elapsed_processing=133.898828ms, pushdown_eval_time=20ns, time_elapsed_scanning_until_data=17.617834ms, page_index_eval_time=20ns, time_elapsed_opening=88.243249ms, time_elapsed_scanning_total=238.237217ms]                                                                                                           |
|                   |                                   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=29581, elapsed_compute=4.735922ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |                                     RepartitionExec: partitioning=Hash([Column { name: "p_partkey", index: 0 }], 10), input_partitions=10, metrics=[fetch_time=1.742820835s, repart_time=362.862215ms, send_time=352.200595ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |                                       RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=178.274085ms, repart_time=1ns, send_time=14.125µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |                                         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=29581, elapsed_compute=2.310501ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|                   |                                           FilterExec: p_brand@1 != Brand#45 AND p_type@2 NOT LIKE MEDIUM POLISHED% AND Use p_size@3 IN (SET) ([Literal { value: Int32(49) }, Literal { value: Int32(14) }, Literal { value: Int32(23) }, Literal { value: Int32(45) }, Literal { value: Int32(19) }, Literal { value: Int32(3) }, Literal { value: Int32(36) }, Literal { value: Int32(9) }]), metrics=[output_rows=29581, elapsed_compute=57.569834ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |                                             ParquetExec: file_groups={1 group: [[Users/yangjiang/tpch-parquet/part/part-0.parquet]]}, projection=[p_partkey, p_brand, p_type, p_size], predicate=p_brand@3 != Brand#45 AND p_type@4 NOT LIKE MEDIUM POLISHED% AND Use p_size@5 IN (SET) ([Literal { value: Int32(49) }, Literal { value: Int32(14) }, Literal { value: Int32(23) }, Literal { value: Int32(45) }, Literal { value: Int32(19) }, Literal { value: Int32(3) }, Literal { value: Int32(36) }, Literal { value: Int32(9) }]), pruning_predicate=(p_brand_min@0 != Brand#45 OR Brand#45 != p_brand_max@1) AND (p_size_min@2 <= 49 AND 49 <= p_size_max@3 OR p_size_min@2 <= 14 AND 14 <= p_size_max@3 OR p_size_min@2 <= 23 AND 23 <= p_size_max@3 OR p_size_min@2 <= 45 AND 45 <= p_size_max@3 OR p_size_min@2 <= 19 AND 19 <= p_size_max@3 OR p_size_min@2 <= 3 AND 3 <= p_size_max@3 OR p_size_min@2 <= 36 AND 36 <= p_size_max@3 OR p_size_min@2 <= 9 AND 9 <= p_size_max@3), metrics=[output_rows=200000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned=0, bytes_scanned=935235, num_predicate_creation_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, pushdown_rows_filtered=0, time_elapsed_processing=114.579335ms, pushdown_eval_time=2ns, time_elapsed_scanning_until_data=20.304792ms, page_index_eval_time=184.001µs, time_elapsed_opening=3.196208ms, time_elapsed_scanning_total=179.528835ms] |
|                   |                         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=16.377µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |                           RepartitionExec: partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 10), input_partitions=10, metrics=[fetch_time=377.902418ms, repart_time=29.384µs, send_time=3.968µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |                             RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[fetch_time=37.565626ms, repart_time=1ns, send_time=3.667µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|                   |                               ProjectionExec: expr=[s_suppkey@0 as s_suppkey], metrics=[output_rows=4, elapsed_compute=8.25µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |                                 CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=43.582µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |                                   FilterExec: s_comment@1 LIKE %Customer%Complaints%, metrics=[output_rows=4, elapsed_compute=21.399292ms, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |                                     ParquetExec: file_groups={1 group: [[Users/yangjiang/tpch-parquet/supplier/part-0.parquet]]}, projection=[s_suppkey, s_comment], predicate=s_comment@6 LIKE %Customer%Complaints%, metrics=[output_rows=10000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned=0, bytes_scanned=221440, num_predicate_creation_errors=0, predicate_evaluation_errors=0, page_index_rows_filtered=0, pushdown_rows_filtered=0, time_elapsed_processing=12.006207ms, pushdown_eval_time=2ns, time_elapsed_scanning_until_data=12.450667ms, page_index_eval_time=251ns, time_elapsed_opening=2.942125ms, time_elapsed_scanning_total=34.518458ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set. Query took 0.534 seconds.

I found there is one place need improvement, which read ParquetExec: file_groups={10 groups: [[Users/yangjiang/tpch-parquet/partsup without filter pushdown still read the pageIndex bytes 🤣

So i want to improve this and retest this in cloud env, @alamb Is this reasonable 🤔 PATL


New Edit, i found even in cloud env, your test data file still on local disk 🤦 , but i think this still need improve

@alamb
Copy link
Contributor Author

alamb commented May 9, 2023

@Ted-Jiang so is it your analysis that the difference negligible? I updated this branch to the latest main and I'll rerun the benchmarks and see what they show

@alamb
Copy link
Contributor Author

alamb commented May 9, 2023

So I reran the tpch (against parquet at scale factor 1) benchmarks and things on this branch now look good (a 10% boost):

━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      tpch ┃      tpch ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 3465.13ms │ 3433.85ms │     no change │
│ QQuery 2     │  438.03ms │  323.74ms │ +1.35x faster │
│ QQuery 3     │  384.57ms │  360.75ms │ +1.07x faster │
│ QQuery 4     │  199.64ms │  191.11ms │     no change │
│ QQuery 5     │  522.11ms │  495.44ms │ +1.05x faster │
│ QQuery 6     │  204.05ms │  196.06ms │     no change │
│ QQuery 7     │ 1060.32ms │  881.93ms │ +1.20x faster │
│ QQuery 8     │  560.98ms │  510.02ms │ +1.10x faster │
│ QQuery 9     │ 1009.24ms │  919.41ms │ +1.10x faster │
│ QQuery 10    │  581.56ms │  572.98ms │     no change │
│ QQuery 11    │  291.34ms │  279.62ms │     no change │
│ QQuery 12    │  325.47ms │  311.11ms │     no change │
│ QQuery 13    │ 1143.71ms │ 1034.17ms │ +1.11x faster │
│ QQuery 14    │  285.41ms │  267.80ms │ +1.07x faster │
│ QQuery 15    │  239.13ms │  238.92ms │     no change │
│ QQuery 16    │  279.67ms │  276.82ms │     no change │
│ QQuery 17    │ 3291.60ms │ 2737.73ms │ +1.20x faster │
│ QQuery 18    │ 3378.74ms │ 3005.48ms │ +1.12x faster │
│ QQuery 19    │  536.40ms │  498.52ms │ +1.08x faster │
│ QQuery 20    │ 1081.41ms │  989.04ms │ +1.09x faster │
│ QQuery 21    │ 1588.62ms │ 1447.42ms │ +1.10x faster │
│ QQuery 22    │  183.83ms │  167.69ms │ +1.10x faster │
└──────────────┴───────────┴───────────┴───────────────┘

@alamb alamb marked this pull request as ready for review May 9, 2023 23:53
@Ted-Jiang
Copy link
Member

@Ted-Jiang so is it your analysis that the difference negligible? I updated this branch to the latest main and I'll rerun the benchmarks and see what they show

Yes, it is negligible and still need a improvement of #6315

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb verify this 👍

@@ -49,7 +49,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files |
| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system |
| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour |
| datafusion.execution.parquet.enable_page_index | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. |
| datafusion.execution.parquet.enable_page_index | true | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we also skip IO when the whole page is skipped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will refine the description as a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
Copy link
Contributor Author

alamb commented May 12, 2023

I think it is time -- thanks a lot @Ted-Jiang !

@alamb alamb merged commit 482f32c into apache:main May 12, 2023
@Ted-Jiang
Copy link
Member

I think it is time -- thanks a lot @Ted-Jiang !

Thanks for your advancing events too 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable parquet page level skipping (page index pruning) by default
2 participants