Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Cap parallelism on local parquet reader #3310

Merged
merged 10 commits into from
Dec 4, 2024
Merged

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Nov 18, 2024

Implement a dynamically parallel local streaming parquet reader.

Background

The current streaming local parquet reader, while fast and streaming, has some problems:

  • It reads and deserializes ALL row groups and ALL columns in parallel.
  • It does not respect downstream back-pressure (the crossbeam channels are all bounded by max chunks, it's free to fill it up).

This leads to unnecessarily high memory usage, and it potentially starves downstream tasks.

Solution

Instead of launching all tasks at once, we can cap the number of parallel tasks based on certain factors:

  • Number of CPUs
  • Number of Columns.

Results

Most glaringly, the benefits of these are in memory usage of streaming queries, for example:

next(daft.read_parquet("data/tpch-dbgen/1_0/1/parquet/lineitem").iter_partitions()) # read lineitem tpch sf1

The new implementation hits a peak of 300mb, while the old goes over 1gb.
Screenshot 2024-11-18 at 11 35 36 PM
Screenshot 2024-11-18 at 11 36 15 PM

Another example, where we stream the entire file, but the consumption is slow:

for _ in daft.read_parquet("/Users/colinho/Desktop/Daft/z/daft_tpch_100g_32part_64RG.parquet").iter_partitions():
    time.sleep(0.1)

The new implementation hits a peak of 1.2gb, while the old goes over 3gb.
Screenshot 2024-11-18 at 11 42 01 PM
Screenshot 2024-11-18 at 11 42 44 PM

To maintain perfomance parity, I also wrote some benchmarks for parquet files with differing rows / cols / row groups, the results show that the new implementation is pretty much on par, with some slight differences.
Screenshot 2024-11-18 at 11 29 30 PM
Screenshot 2024-11-18 at 11 29 38 PM

On reading a tpch sf-1 lineitem table though: the results are pretty much the same: (~0.2s)

@github-actions github-actions bot added the enhancement New feature or request label Nov 18, 2024
Copy link

codspeed-hq bot commented Nov 18, 2024

CodSpeed Performance Report

Merging #3310 will improve performances by ×2.2

Comparing colin/dynamic-parquet (6fed790) with main (6d30e30)

Summary

⚡ 2 improvements
✅ 15 untouched benchmarks

Benchmarks breakdown

Benchmark main colin/dynamic-parquet Change
test_iter_rows_first_row[100 Small Files] 340.5 ms 212.2 ms +60.44%
test_show[100 Small Files] 34.9 ms 15.6 ms ×2.2

Copy link

codecov bot commented Nov 18, 2024

Codecov Report

Attention: Patch coverage is 88.18182% with 26 lines in your changes missing coverage. Please review.

Project coverage is 77.34%. Comparing base (6d30e30) to head (6fed790).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-parquet/src/stream_reader.rs 88.58% 25 Missing ⚠️
src/daft-parquet/src/read.rs 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3310      +/-   ##
==========================================
+ Coverage   77.00%   77.34%   +0.33%     
==========================================
  Files         696      696              
  Lines       86039    84849    -1190     
==========================================
- Hits        66256    65628     -628     
+ Misses      19783    19221     -562     
Files with missing lines Coverage Δ
src/daft-parquet/src/read.rs 75.22% <0.00%> (-0.08%) ⬇️
src/daft-parquet/src/stream_reader.rs 89.85% <88.58%> (+1.57%) ⬆️

... and 10 files with indirect coverage changes

@colin-ho colin-ho marked this pull request as ready for review November 18, 2024 15:48
Comment on lines 116 to 118
// Only increase permits if compute time is significantly higher than IO time,
// and waiting time is not too high.
if compute_ratio > Self::COMPUTE_THRESHOLD && wait_ratio < Self::WAIT_THRESHOLD {
Copy link
Contributor Author

@colin-ho colin-ho Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some ideas to consider:

  • Maybe we don't need to consider IO time and just consider the wait time.
  • Can this semaphore become generic and used for other I/O code, as well as the local executor? Would be cool if we could dynamically adjust degree of operator parallelism as well.
  • Can we decrease the permit count in addition to increase?
  • Can we add memory pressure to the semaphore?

Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot submitting my review from yesterday lol

benchmarking/parquet/test_local.py Outdated Show resolved Hide resolved
src/common/runtime/src/lib.rs Outdated Show resolved Hide resolved
src/daft-parquet/src/semaphore.rs Outdated Show resolved Hide resolved
src/daft-parquet/src/stream_reader.rs Outdated Show resolved Hide resolved
src/daft-parquet/src/stream_reader.rs Outdated Show resolved Hide resolved
src/daft-parquet/src/stream_reader.rs Outdated Show resolved Hide resolved
@colin-ho
Copy link
Contributor Author

colin-ho commented Dec 4, 2024

Tested this new implementation on TPCH SF1:
Screenshot 2024-12-04 at 12 18 56 AM
Screenshot 2024-12-04 at 12 20 18 AM

@colin-ho colin-ho merged commit de4fe50 into main Dec 4, 2024
44 checks passed
@colin-ho colin-ho deleted the colin/dynamic-parquet branch December 4, 2024 17:30
@colin-ho colin-ho changed the title [FEAT] Dynamically parallel local parquet reader [FEAT] Cap parallelism on local parquet reader Dec 5, 2024
colin-ho added a commit that referenced this pull request Dec 6, 2024
Implement a parallelism cap on remote parquet tasks, and use compute
runtime instead of rayon (swordfish reads only). Follow on from
#3310 which implemented it for
local.

Benchmarks in comments below

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: EC2 Default User <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants