Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Properly cache object-stores #14598

Merged
merged 1 commit into from
Feb 20, 2024
Merged

feat: Properly cache object-stores #14598

merged 1 commit into from
Feb 20, 2024

Conversation

ritchie46
Copy link
Member

@ritchie46 ritchie46 commented Feb 20, 2024

Might help #14384
Closes #14572

@kszlim can you check?

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars labels Feb 20, 2024
@kszlim
Copy link
Contributor

kszlim commented Feb 20, 2024

Will give it a try tomorrow

Copy link

codecov bot commented Feb 20, 2024

Codecov Report

Attention: 1 lines in your changes are missing coverage. Please review.

Comparison is base (7cf1045) 77.95% compared to head (0c86901) 77.95%.

Files Patch % Lines
crates/polars-io/src/cloud/object_store_setup.rs 93.33% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main   #14598   +/-   ##
=======================================
  Coverage   77.95%   77.95%           
=======================================
  Files        1326     1326           
  Lines      173054   173061    +7     
  Branches     2448     2448           
=======================================
+ Hits       134896   134904    +8     
+ Misses      37714    37713    -1     
  Partials      444      444           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ritchie46 ritchie46 merged commit 6ff74b0 into main Feb 20, 2024
21 checks passed
@ritchie46 ritchie46 deleted the cache branch February 20, 2024 15:27
@thomasfrederikhoeck
Copy link
Contributor

@ritchie46 I might not 100% get how long-lived the objet store is but can't using url as key not leak credentials between queries? If one call is made with a one set of credentials and another is made with a second set of credentials the initial set of credentials will be used since they match on url?

@kszlim
Copy link
Contributor

kszlim commented Feb 20, 2024

@ritchie46 still encoutered the issue (though it seems like i'm running into it at a lower rate?).

@ritchie46
Copy link
Member Author

Ok. Will wrap them with a LimitStore as well.

@kszlim
Copy link
Contributor

kszlim commented Feb 20, 2024

@ritchie46 when I say lower rate, I mean it's happens maybe 10x-20x less frequently now. Seems like this might've largely fixed the issue. I think you likely can use a higher concurrency budget by default now?

The only way I can get it to trigger now is by using a much higher than default concurrency budget (which does seem to provide a fairly significant increase in network throughput for me).

@ritchie46
Copy link
Member Author

Right. So on the default settings it doesn't happen? That's great to hear. As I could not really understand how a LimitStore would help as what we have implemented with the budgetting semaphore should have exactly the same effect. Thanks for testing it.

What is you network throughput per concurrency budget?

@kszlim
Copy link
Contributor

kszlim commented Feb 21, 2024

Everything here was tested on a c6a.24xlarge on AWS in the same region as my s3 bucket (which is not an express zone one bucket)

Two insights:
I'm using a prefetch size that's equal to the number of files I have (10k). I've found that it runs fastest for me with this (but it seems like I'm mostly CPU bound, so it only runs slightly faster).

Network utilization with default prefetch size (scale in MB/s):
image

Network utilization with 10k prefetch size and 1k concurrency budget (scale in GB/s):
image

So it seems like at the default setting, when it's actually prefetching I get around 250-350 MB/s. With the budget set to 192 (2x my vCPUs) I get around 600 MB/s. When I set it to 800 it reaches about 1.4GB/s (any further increases don't make much or any difference). My theoretical bandwidth should be 4.6875GB/s. It doesn't seem like it scales linearly

Interesting thing to note is that the query (which is just materializing a column with zero transformers applied) seems to run at roughly the same speed in all these cases. I believe the parallelism of my system isn't being fully utilized. In the 10k prefetch case, after I'm done downloading all batches, the network throughput drops from 1.4GB/s to nearly 0 KB/s and then I spend a long time (i'm guessing waiting for the rowgroups/columns to be decoded).
Looks like this:
Network throughput (axis is in GB/s):
image

Htop right after network throughput hits near zero KB/s (and there's maybe ~50s of this before my lazyframe collect completes):
image

I've tried using both the rowgroup and column parallelization strategy and neither seem to work to pin all my cores.

The problem might be that I'm grabbbing a single column and there are ~5 rowgroups per file (each rowgroup is about 50k rows). In order utilize more cores maybe the decoding has to be even more coarse grained? I'm curious how well sending the parquet bytes to a mpsc channel and then using rayon to do a parallel decode would perform over larger chunks?

@ritchie46
Copy link
Member Author

Nezt week I will be able to make a setup to optimize this case. In the meantime I am pleased to hear we can actually achieve 1.4gb/s. Doesn't sound bad.

Maybe we should set the default budget to 3x vCPU. Currently it is 15 I believe. For poor internet connections we must be conservative.

Ideally I'd make this dynamic and increase budget as long as bandwidth improves.

@kszlim
Copy link
Contributor

kszlim commented Feb 21, 2024

.unwrap_or_else(|_| std::cmp::max(POOL.current_num_threads(), MAX_BUDGET_PER_REQUEST));

Seems like it's set to the max of your num threads (so for me it was probably set to 96 by default) and 10?

Yeah, i think fully decoupling the downloading and parquet parsing/reading could probably greatly improve utilization and speed. I'm guessing because there's only 5 rowgroups per file and only 50k rows per RG (and i'm only reading one column), we're probably parallelizing at too finegrained of a level, since it seems like parallelization occurs at a per file level too?

@ritchie46
Copy link
Member Author

Yes, but the challenge is that we don't know that. It might be your reading one file with many row-groups. For the rayon decoding it shouldn't really matter of tasks are small.

We also have the challenge of stopping early at a limit. So we cannot download all at once. Ealier files should have preference. Another challenge is that budgets on AWS can be much higher than on local machines.

I first want to see if we can reduce granularity on downloading if we have info about the downloading finger print. E.g. no. of files, rgs projection etc.

@agossard
Copy link
Contributor

Hi there. Is there a way to manually trigger (from Python) a clear out of the object store cache? The way things are set up in my company, AWS s3 credentials are only valid for some set period of time (30-40 minutes). We have a process for periodically acquiring new credentials, but when we pass them in with the storage_options parameter to scan_parquet, we end up crashing with an S3 error… I’m guessing because of this cache.

So I guess one solution would be to provide ability to manually clear the cache (if not already available). Another could be to make the cache key based on some hash of everything in the storage options parameter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python/S3 connection pooling
4 participants