Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Object Hash to S3Store #427

Merged
merged 5 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions src/maggma/stores/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import zlib
from concurrent.futures import wait
from concurrent.futures.thread import ThreadPoolExecutor
from hashlib import sha1
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

import msgpack # type: ignore
Expand Down Expand Up @@ -41,6 +42,7 @@ def __init__(
sub_dir: str = None,
s3_workers: int = 1,
key: str = "fs_id",
store_hash: bool = True,
searchable_fields: Optional[List[str]] = None,
**kwargs,
):
Expand All @@ -60,6 +62,7 @@ def __init__(
endpoint_url: endpoint_url to allow interface to minio service
sub_dir: (optional) subdirectory of the s3 bucket to store the data
s3_workers: number of concurrent S3 puts to run
store_hash: store the sha1 hash right before insertion to the database.
searchable_fields: fields to keep in the index store
"""
if boto3 is None:
Expand All @@ -75,6 +78,7 @@ def __init__(
self.s3_bucket = None # type: Any
self.s3_workers = s3_workers
self.searchable_fields = searchable_fields if searchable_fields else []
self.store_hash = store_hash

# Force the key to be the same as the index
assert isinstance(
Expand Down Expand Up @@ -306,7 +310,7 @@ def update(
with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
fs = {
pool.submit(
fn=self.write_doc_to_s3,
self.write_doc_to_s3,
doc=itr_doc,
search_keys=key + additional_metadata + self.searchable_fields,
)
Expand Down Expand Up @@ -358,6 +362,8 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
if "_id" in search_doc:
del search_doc["_id"]

# to make hashing more meaningful, make sure last updated field is removed
lu_info = doc.pop(self.last_updated_field, None)
data = msgpack.packb(doc, default=monty_default)

if self.compress:
Expand All @@ -375,9 +381,14 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
Key=self.sub_dir + str(doc[self.key]), Body=data, Metadata=search_doc
)

if self.last_updated_field in doc:
search_doc[self.last_updated_field] = doc[self.last_updated_field]
if lu_info is not None:
search_doc[self.last_updated_field] = lu_info

if self.store_hash:
hasher = sha1()
hasher.update(data)
obj_hash = hasher.hexdigest()
search_doc["obj_hash"] = obj_hash
return search_doc

def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
Expand Down Expand Up @@ -430,18 +441,22 @@ def newer_in(
def __hash__(self):
return hash((self.index.__hash__, self.bucket))

def rebuild_index_from_s3_data(self):
def rebuild_index_from_s3_data(self, **kwargs):
"""
Rebuilds the index Store from the data in S3
Relies on the index document being stores as the metadata for the file
This can help recover lost databases
"""
index_docs = []
for file in self.s3_bucket.objects.all():
# TODO: Transform the data back from strings and remove AWS S3 specific keys
index_docs.append(file.metadata)
bucket = self.s3_bucket
objects = bucket.objects.filter(Prefix=self.sub_dir)
for obj in objects:
key_ = self.sub_dir + obj.key
data = self.s3_bucket.Object(key_).get()["Body"].read()

self.index.update(index_docs)
if self.compress:
data = zlib.decompress(data)
unpacked_data = msgpack.unpackb(data, raw=False)
self.update(unpacked_data, **kwargs)

def rebuild_metadata_from_index(self, index_query: dict = None):
"""
Expand Down
24 changes: 20 additions & 4 deletions tests/stores/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def s3store():
conn = boto3.client("s3")
conn.create_bucket(Bucket="bucket1")

index = MemoryStore("index")
store = S3Store(index, "bucket1")
index = MemoryStore("index", key="task_id")
store = S3Store(index, "bucket1", key="task_id")
store.connect()

store.update(
Expand Down Expand Up @@ -154,8 +154,9 @@ def test_update(s3store):

s3store.compress = True
s3store.update([{"task_id": "mp-4", "data": "asd"}])
assert s3store.index.query_one({"task_id": "mp-4"})["compression"] == "zlib"
assert s3store.query_one({"task_id": "mp-4"}) is not None
obj = s3store.index.query_one({"task_id": "mp-4"})
assert obj["compression"] == "zlib"
assert obj["obj_hash"] == "be74de5ac71f00ec9e96441a3c325b0592c07f4c"
assert s3store.query_one({"task_id": "mp-4"})["data"] == "asd"


Expand All @@ -167,6 +168,21 @@ def test_rebuild_meta_from_index(s3store):
assert s3_object.metadata["add_meta"] == "hello"


def test_rebuild_index(s3store):
s3store.update([{"task_id": "mp-2", "data": "asd"}])
assert (
s3store.index.query_one({"task_id": "mp-2"})["obj_hash"]
== "a69fe0c2cca3a3384c2b1d2f476972704f179741"
)
s3store.index.remove_docs({})
assert s3store.index.query_one({"task_id": "mp-2"}) is None
s3store.rebuild_index_from_s3_data()
assert (
s3store.index.query_one({"task_id": "mp-2"})["obj_hash"]
== "a69fe0c2cca3a3384c2b1d2f476972704f179741"
)


def tests_msonable_read_write(s3store):
dd = s3store.as_dict()
s3store.update([{"task_id": "mp-2", "data": dd}])
Expand Down