Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel s3 store wrting #130

Merged
merged 12 commits into from
Apr 11, 2020
Merged

parallel s3 store wrting #130

merged 12 commits into from
Apr 11, 2020

Conversation

jmmshn
Copy link
Contributor

@jmmshn jmmshn commented Apr 8, 2020

  • Moved the writing to s3 into its own function and allowed the upload to be skipped in the update function. (Tested and I do get scaling on our current minio setup)

  • The user can set that flag and call the s3 writing function in process_items to do parallel writing

  • Also moved the s3_bucket initialization into the constructor. This seemed to affect speed too.

@munrojm is also testing this out now for the BS migration and he's seeing is big improvements.

@codecov
Copy link

codecov bot commented Apr 8, 2020

Codecov Report

Merging #130 into master will decrease coverage by 0.61%.
The diff coverage is 93.33%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #130      +/-   ##
==========================================
- Coverage   82.95%   82.33%   -0.62%     
==========================================
  Files          26       26              
  Lines        1883     1891       +8     
==========================================
- Hits         1562     1557       -5     
- Misses        321      334      +13     
Impacted Files Coverage Δ
src/maggma/stores/aws.py 77.86% <93.33%> (+1.43%) ⬆️
src/maggma/stores/mongolike.py 88.05% <0.00%> (-6.47%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6f70bc5...a453e46. Read the comment docs.

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Hmmm, so my thought was a little different because this still requires that whoever is using the store to actually know that this is an S3Store. That shouldn't be the case, and no builder should be written as such. Can you confirm if the performance improvement is simply running multiple puts at the same time or if it's compression or something else?

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

So I do observe the proper scaling with parallel writing like this.
mrun -n 4 is twice as fast as mrun -n 2 for example

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

I was thinking about adding async_update here:

But that's just replicating what the process_item is already doing.

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

i understand increasing the mrun speeds this up, but is the primary speed up because multiple cpu's are compressing or because multiple S3 gets are running?

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

So doing everything else the same.
I get 20 seconds using this fix and 1min 20 sec using the old way on 4 processors.

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Try this:
1.) Time the average total compression time for those 4 items
2.) Time the average total S3 put time for all 4 items

@mkhorton
Copy link
Member

Has anyone tested the parallel scaling for mongo too? Is mrun -n 4 twice as fast as mrun -n 2?

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

it takes about the same time to zip and to upload.

%%timeit
js_res = json.dumps(jsanitize(res)).encode()
zipped_res = zlib.compress(js_res)

566 ms ? 4.26 ms per loop (mean ? std. dev. of 7 runs, 1 loop each)

%%timeit
test_s3.s3_bucket.put_object(Key='test/test', Body=zipped_res)

308 ms ? 20.5 ms per loop (mean ? std. dev. of 7 runs, 1 loop each)

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Those numbers don't look right. If that is the time cost of those operations it shouldn't be 20 seconds per item.

Are you sure you're using %%timeit right? I thought the command you were measuring has to be on the same line.

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Basically, i'm trying to determine what is causing the slowdown. If it is infact compression, then multiprocessing will help. If it's not, and it just IO, then there is a much more effective fix using theadpool executors:

from concurrent.futures.thread import ThreadPoolExecutor

pool = ThreadPoolExecutor(max_workers=16) # Set this to the max concurrent S3 puts, should be Store variable

for key,doc in docs.items():
    pool.submit(test_s3.s3_bucket.put_object,Key=key, Body=doc)

This can be done in update.

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

The 20 seconds is time to go through a larger list of around ~30 times. The time above is just one item.

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

For a bigger object, it's

%%timeit
js_res = json.dumps(jsanitize(res)).encode()
zipped_res = zlib.compress(js_res)

1.24 s +- 10.2 ms per loop (mean +- std. dev. of 7 runs, 1 loop each)

%%timeit
test_s3.s3_bucket.put_object(Key='test/test', Body=zipped_res)

595 ms +- 70.7 ms per loop (mean +- std. dev. of 7 runs, 1 loop each)

So it looks like it's always taking 2x as long in compressing as it is putting

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Looks like the best solution might be to switch from JSON to pickle for the format in S3 and then using the threadpool executor in update. This keeps the interface the same.

Screen Shot 2020-04-10 at 2 26 05 PM

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Although using pickle does open potentialy security flaws due to malicious code. What do you think @mkhorton ? There is also ujson but that still needs the jsanitize.

@mkhorton
Copy link
Member

Never, ever use pickle as an archival serialization format. Maybe try the msgpack format instead, it's supported by monty and I think there are some optimized libraries out there.

@mkhorton
Copy link
Member

It's not just security, it's fragility too. The pickle format has and does break with updates to Python and updates to the classes that it's serializing. It's not as bad as it used to be, but still not advised.

@mkhorton
Copy link
Member

Wouldn't be surprised if it's the jsanitize function that's the bottleneck too, rather than the compression per se ..

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Yeah message pack seems to work well. jsanitize is infact taking a lot of the processing time.

I'm very reluctant to allow IO in process_item because the way process_item is treated to make multiprocessing and distributed processing work will break a lot of IO. It might not be an issue now, but it will be in one of the other builders and this is near impossible for anyone to debug.

@mkhorton
Copy link
Member

mkhorton commented Apr 10, 2020

Another option is the native numpy formats too:

https://docs.scipy.org/doc/numpy-1.13.0/reference/generated/numpy.savez_compressed.html#numpy.savez_compressed

https://towardsdatascience.com/why-you-should-start-using-npy-file-more-often-df2a13cc0161

Though I think some versions of this use pickle under the hood so I'm not sure.

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

Those don't work for python dictionaries though? This isn't meant for just one type of data but needs to be general.

@mkhorton
Copy link
Member

Oh ok, sorry was thinking for charge densities specifically

@mkhorton
Copy link
Member

So what exactly is jssanitize's role here? We control all of the data so in principle this shouldn't be necessary(?) -- if it's not valid JSON, it seems correct it should fail. Perhaps a better remedy is just to be stricter with the output of process_item()?

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

The purpose is really to take care of edge cases such as numpy array.
Really the place this should happen is in the builder, so the assumption in update should be that w/e docs it gets should JSON compatible. I still like msgpack over json+zlib.compress though. It appears to produce pretty dense binary documents.

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

Regardless of whether we are gonna use pickle
Here are the times I'm seen when uploading when I have 4 minio instances on spin after putting the write into the process items:

2 proc 42 secs
4 proc 22 secs
8 proc 16 secs
16 proc 13 secs

The speed scales with the processes up to 4, then tails off.
I think this is good evidence that we can write to Minio can be parallelized.
I'm OK with it not being in process_item but then can we add a async layer between process_item and update_targets and just have it do nothing but pass the data by default?

Then if we want to modify the results of process_item asynchronously like strip the data away and add upload to s3 that can be done in this function?

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

@jmmshn , the ThreadPool code executes the S3 put asynchornously using multi-threading. You can use this directly in update and it won't affect anything else at all.

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 10, 2020

OK, sounds good I'll do the ThreadPoolExecutor

On the formatting issue, it seems that the json step takes the same amount of time as the zlib step.

%%timeit
js_res = json.dumps(jsanitize(res)).encode()

649 ms ? 15.1 ms per loop (mean ? std. dev. of 7 runs, 1 loop each)

%%timeit
zipped_res = zlib.compress(js_res)

597 ms ? 2.37 ms per loop (mean ? std. dev. of 7 runs, 1 loop each)

I'll play with mgpack and pickle and maybe allow both in the aws store

@shyamd
Copy link
Contributor

shyamd commented Apr 10, 2020

I'd say we just drop JSON and pickle and just use msgpack. That will make the formatting from a python dict to binary on the order of ms so it won't affect performance. Then it's just the multi-threading for S3 put to get it to be fast as possible.

@shyamd
Copy link
Contributor

shyamd commented Apr 11, 2020

Do you see a performance difference between ProcessPool and ThreadPool. Typically ThreadPool should do fine for IO and it reduces a lot of the in-memory copying that is necessary with ProcessPool. That could cause problems for lots of large documents.

@lgtm-com
Copy link

lgtm-com bot commented Apr 11, 2020

This pull request introduces 1 alert when merging a4e136f into 6f70bc5 - view on LGTM.com

new alerts:

  • 1 for Unused import

@lgtm-com
Copy link

lgtm-com bot commented Apr 11, 2020

This pull request introduces 2 alerts when merging e8a6737 into 6f70bc5 - view on LGTM.com

new alerts:

  • 2 for Unused import

@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 11, 2020

No real difference and it breaks some tests since mock_s3 doesn't like it when you pass that around.
Should be fixed now

@shyamd
Copy link
Contributor

shyamd commented Apr 11, 2020

Sounds good. Don't worry about the docs, I'll fix that. Just take off the WIP tag when you're ready.

@jmmshn jmmshn changed the title [WIP] parallel s3 store wrting parallel s3 store wrting Apr 11, 2020
@jmmshn
Copy link
Contributor Author

jmmshn commented Apr 11, 2020

Done

@shyamd shyamd merged commit 38317cc into materialsproject:master Apr 11, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants