Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel s3 store wrting #130

Merged
merged 12 commits into from
Apr 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pynng==0.5.0
dnspython==1.16.0
uvicorn==0.11.3
sshtunnel==0.1.5
msgpack==0.5.6
82 changes: 54 additions & 28 deletions src/maggma/stores/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
Advanced Stores for connecting to AWS data
"""

import json
import zlib

from typing import Union, Optional, Dict, List, Iterator, Tuple, Any

from monty.json import jsanitize
from monty.dev import deprecated

from maggma.core import Store, Sort

from concurrent.futures import wait
from concurrent.futures.thread import ThreadPoolExecutor
import msgpack # type: ignore
from monty.msgpack import default as monty_default
from monty.msgpack import object_hook as monty_object_hook
from maggma.utils import grouper, to_isoformat_ceil_ms

try:
Expand All @@ -36,6 +40,7 @@ def __init__(
compress: bool = False,
endpoint_url: str = None,
sub_dir: str = None,
s3_workers: int = 4,
**kwargs,
):
"""
Expand All @@ -48,19 +53,23 @@ def __init__(
compress: compress files inserted into the store
endpoint_url: endpoint_url to allow interface to minio service
sub_dir: (optional) subdirectory of the s3 bucket to store the data
s3_workers: number of concurrent S3 puts to run
"""
if boto3 is None:
raise RuntimeError("boto3 and botocore are required for S3Store")
self.index = index

self.bucket = bucket
self.s3_profile = s3_profile
self.compress = compress
self.endpoint_url = endpoint_url
self.sub_dir = sub_dir.strip("/") + "/" if sub_dir else ""
self.s3 = None # type: Any
self.s3_bucket = None # type: Any
self.s3_workers = s3_workers
# Force the key to be the same as the index
kwargs["key"] = str(index.key)

super(S3Store, self).__init__(**kwargs)

def name(self) -> str:
Expand All @@ -70,11 +79,10 @@ def name(self) -> str:
"""
return f"s3://{self.bucket}"

def connect(self, force_reset: bool = False):
def connect(self, *args, **kwargs): # lgtm[py/conflicting-attributes]
"""
Connect to the source data
"""
self.index.connect(force_reset=force_reset)

session = Session(profile_name=self.s3_profile)
resource = session.resource("s3", endpoint_url=self.endpoint_url)
Expand All @@ -85,6 +93,7 @@ def connect(self, force_reset: bool = False):
raise Exception("Bucket not present on AWS: {}".format(self.bucket))

self.s3_bucket = resource.Bucket(self.bucket)
self.index.connect(*args, **kwargs)

def close(self):
"""
Expand Down Expand Up @@ -168,7 +177,7 @@ def query(

if doc.get("compression", "") == "zlib":
data = zlib.decompress(data)
yield json.loads(data)
yield msgpack.unpackb(data, object_hook=monty_object_hook, raw=False)

def distinct(
self, field: str, criteria: Optional[Dict] = None, all_exist: bool = False
Expand Down Expand Up @@ -241,44 +250,61 @@ def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = No
field is to be used
"""
search_docs = []
search_keys = []

if isinstance(key, list):
search_keys = key
elif key:
search_keys = [key]
else:
search_keys = [self.key]
for d in docs:
search_doc = {k: d[k] for k in search_keys}
search_doc[self.key] = d[self.key] # Ensure key is in metadata
if self.sub_dir != "":
search_doc["sub_dir"] = self.sub_dir
with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
fs = {
pool.submit(
fn=self.write_doc_to_s3, doc=itr_doc, search_keys=search_keys
)
for itr_doc in docs
}
fs, _ = wait(fs)
for sdoc in fs:
search_docs.append(sdoc.result())
# Use store's update to remove key clashes
self.index.update(search_docs)

# Remove MongoDB _id from search
if "_id" in search_doc:
del search_doc["_id"]
def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
"""
Write the data to s3 and return the metadata to be inserted into the index db

data = json.dumps(jsanitize(d)).encode()
Args:
doc: the document
search_keys: list of keys to pull from the docs and be inserted into the
index db
"""
search_doc = {k: doc[k] for k in search_keys}
search_doc[self.key] = doc[self.key] # Ensure key is in metadata
if self.sub_dir != "":
search_doc["sub_dir"] = self.sub_dir

# Compress with zlib if chosen
if self.compress:
search_doc["compression"] = "zlib"
data = zlib.compress(data)
# Remove MongoDB _id from search
if "_id" in search_doc:
del search_doc["_id"]

search_docs.append(search_doc.copy())
data = msgpack.packb(doc, default=monty_default)

if self.last_updated_field in search_doc:
search_doc[self.last_updated_field] = str(
to_isoformat_ceil_ms(search_doc[self.last_updated_field])
)
if self.compress:
# Compress with zlib if chosen
search_doc["compression"] = "zlib"
data = zlib.compress(data)

self.s3_bucket.put_object(
Key=self.sub_dir + str(d[self.key]), Body=data, Metadata=search_doc
if self.last_updated_field in search_doc:
search_doc[self.last_updated_field] = str(
to_isoformat_ceil_ms(search_doc[self.last_updated_field])
)

# Use store's update to remove key clashes
self.index.update(search_docs)
self.s3_bucket.put_object(
Key=self.sub_dir + str(doc[self.key]), Body=data, Metadata=search_doc
)

return search_doc

def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
"""
Expand Down
9 changes: 6 additions & 3 deletions tests/stores/test_aws.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import json
import msgpack
from monty.msgpack import default, object_hook
import boto3
import zlib
from moto import mock_s3
Expand Down Expand Up @@ -28,12 +29,14 @@ def s3store():

check_doc = {"task_id": "mp-1", "data": "asd"}
store.index.update([{"task_id": "mp-1"}])
store.s3_bucket.put_object(Key="mp-1", Body=json.dumps(check_doc).encode())
store.s3_bucket.put_object(
Key="mp-1", Body=msgpack.packb(check_doc, default=default)
)

check_doc2 = {"task_id": "mp-3", "data": "sdf"}
store.index.update([{"task_id": "mp-3", "compression": "zlib"}])
store.s3_bucket.put_object(
Key="mp-3", Body=zlib.compress(json.dumps(check_doc2).encode())
Key="mp-3", Body=zlib.compress(msgpack.packb(check_doc2, default=default))
)

yield store
Expand Down