Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multi-file partitioning in dask_cudf.read_parquet #8393

Merged
merged 9 commits into from
Jul 21, 2021

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented May 27, 2021

Depends on dask#7557

This PR updates dask_cudf.read_parquet to enable multi-file aggregation when the chunksize parameter is used. This change enables dramatic (>50x) improvements when the dataset contains many small files, which is somewhat common for hive/directory-partitioned data.

Motivating Example

dask.datasets.timeseries(
    start='2000-01-01',
    end='2000-02-28',
    freq='1s',
    partition_freq='1h',
    seed=42,
).to_parquet(
    path,
    engine="pyarrow",
    partition_on="name",
    write_index=False,
)

####
#### BEFORE THIS PR ####
####

# Without useing `chunksize`, or  when using `chunksize` before this PR
# results in 36,192 partitions
%time df_read = dask_cudf.read_parquet(path)
CPU times: user 1.17 s, sys: 501 ms, total: 1.67 s
Wall time: 1.67 s

# Without useing `chunksize`, or  when using `chunksize` before this PR
# results in a 5+ minute read time
%time df_read.compute()
CPU times: user 5min 34s, sys: 37.1 s, total: 6min 11s
Wall time: 6min 12s

####
#### AFTER THIS PR ####
####

# Using `chunksize` WITH this PR results in 26 partitions
%time df_read = dask_cudf.read_parquet(path, chunksize="1GB", aggregate_files="name")
CPU times: user 1.03 s, sys: 119 ms, total: 1.15 s
Wall time: 1.15 s

# Using `chunksize` WITH this PR results in a ~95x speedup
%time df_read.compute()
CPU times: user 3.18 s, sys: 703 ms, total: 3.88 s
Wall time: 3.92 s

@github-actions github-actions bot added the Python Affects Python cuDF API. label May 27, 2021
@codecov
Copy link

codecov bot commented May 27, 2021

Codecov Report

❗ No coverage uploaded for pull request base (branch-21.08@67b7aac). Click here to learn what that means.
The diff coverage is n/a.

❗ Current head caf2b90 differs from pull request most recent head 08d12e2. Consider uploading reports for the commit 08d12e2 to get more accurate results
Impacted file tree graph

@@               Coverage Diff               @@
##             branch-21.08    #8393   +/-   ##
===============================================
  Coverage                ?   10.60%           
===============================================
  Files                   ?      116           
  Lines                   ?    18606           
  Branches                ?        0           
===============================================
  Hits                    ?     1974           
  Misses                  ?    16632           
  Partials                ?        0           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 67b7aac...08d12e2. Read the comment docs.

@rjzamora rjzamora marked this pull request as ready for review July 15, 2021 20:09
@rjzamora rjzamora requested review from a team as code owners July 15, 2021 20:09
@rjzamora rjzamora requested review from devavret, davidwendt, charlesbluca and rgsl888prabhu and removed request for a team July 15, 2021 20:09
@github-actions github-actions bot added CMake CMake build issue conda Java Affects Java cuDF API. libcudf Affects libcudf (C++/CUDA) code. labels Jul 15, 2021
@rjzamora rjzamora removed CMake CMake build issue conda Java Affects Java cuDF API. labels Jul 15, 2021
@rjzamora rjzamora added 3 - Ready for Review Ready for review by team 4 - Needs Dask Reviewer non-breaking Non-breaking change and removed gpuCI libcudf Affects libcudf (C++/CUDA) code. labels Jul 15, 2021
@rjzamora rjzamora changed the base branch from branch-21.06 to branch-21.08 July 15, 2021 20:11
@rjzamora
Copy link
Member Author

@randerzander - dask#7557 was merged, and I just confirmed that this PR still enables (very performant) multi-file aggregation via the aggregate_files parameter in read_parquet :)

@rjzamora rjzamora added the improvement Improvement / enhancement to an existing function label Jul 15, 2021
@ajschmidt8 ajschmidt8 removed the request for review from a team July 16, 2021 13:39
@ajschmidt8
Copy link
Member

Removing ops-codeowners from the required reviews since it doesn't seem there are any file changes that we're responsible for. Feel free to add us back if necessary.

Copy link
Contributor

@devavret devavret left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for tests?

@rjzamora
Copy link
Member Author

No need for tests?

Good question actually - If CI is using the Dask main branch, we can add an explicit test with aggregate_files= set. Otherwise, we cannot really test for the feature we are enabling. Perhaps a simple test with a version check is appropriate.

@robertmaynard robertmaynard removed the request for review from a team July 16, 2021 16:51
@robertmaynard
Copy link
Contributor

Removing cmake-codeowners from the reviews since it doesn't seem this has any build-system impact.

@rjzamora
Copy link
Member Author

@devavret - Just added test coverage

else:
(path, row_group, partition_keys) = piece
if not isinstance(pieces, list):
pieces = [pieces]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be ?

Suggested change
pieces = [pieces]
pieces = list(pieces)

in case pieces is a numpy array or tuple

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I know this is a bit ugly :/

We cant use list(pieces), because pieces will either be a tuple, a string, a list of strings, or a list of tuples. For newer versions of Dask, this should always be a list already. For older versions, however, we want to convert a string into a list of strings, and a tuple into a list of tuples.

@@ -40,42 +40,82 @@ def read_metadata(*args, **kwargs):

return (new_meta, stats, parts, index)

@classmethod
def multi_support(cls):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being used ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used upstream (in Dask) to check if we support pieces being passed in a as a list in read_partition. So, it is an "opt-in" mechanism.

@rjzamora rjzamora removed the request for review from a team July 20, 2021 21:32
Copy link
Member

@quasiben quasiben left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora . This looks good and thanks for adding the tests.

Also, thank you to @rgsl888prabhu and @devavret for the review

@quasiben
Copy link
Member

@gpucibot merge

@rapids-bot rapids-bot bot merged commit 39220f9 into rapidsai:branch-21.08 Jul 21, 2021
@rjzamora rjzamora deleted the multi-file-parquet branch July 21, 2021 20:31
@vyasr vyasr added 4 - Needs Review Waiting for reviewer to review or respond and removed 4 - Needs Dask Reviewer labels Feb 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team 4 - Needs Review Waiting for reviewer to review or respond improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants