diff --git a/src/maggma/stores/aws.py b/src/maggma/stores/aws.py index 64e690675..976d5640a 100644 --- a/src/maggma/stores/aws.py +++ b/src/maggma/stores/aws.py @@ -8,6 +8,7 @@ import zlib from concurrent.futures import wait from concurrent.futures.thread import ThreadPoolExecutor +from hashlib import sha1 from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import msgpack # type: ignore @@ -41,6 +42,7 @@ def __init__( sub_dir: str = None, s3_workers: int = 1, key: str = "fs_id", + store_hash: bool = True, searchable_fields: Optional[List[str]] = None, **kwargs, ): @@ -60,6 +62,7 @@ def __init__( endpoint_url: endpoint_url to allow interface to minio service sub_dir: (optional) subdirectory of the s3 bucket to store the data s3_workers: number of concurrent S3 puts to run + store_hash: store the sha1 hash right before insertion to the database. searchable_fields: fields to keep in the index store """ if boto3 is None: @@ -75,6 +78,7 @@ def __init__( self.s3_bucket = None # type: Any self.s3_workers = s3_workers self.searchable_fields = searchable_fields if searchable_fields else [] + self.store_hash = store_hash # Force the key to be the same as the index assert isinstance( @@ -306,7 +310,7 @@ def update( with ThreadPoolExecutor(max_workers=self.s3_workers) as pool: fs = { pool.submit( - fn=self.write_doc_to_s3, + self.write_doc_to_s3, doc=itr_doc, search_keys=key + additional_metadata + self.searchable_fields, ) @@ -358,6 +362,8 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]): if "_id" in search_doc: del search_doc["_id"] + # to make hashing more meaningful, make sure last updated field is removed + lu_info = doc.pop(self.last_updated_field, None) data = msgpack.packb(doc, default=monty_default) if self.compress: @@ -375,9 +381,14 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]): Key=self.sub_dir + str(doc[self.key]), Body=data, Metadata=search_doc ) - if self.last_updated_field in doc: - search_doc[self.last_updated_field] = doc[self.last_updated_field] + if lu_info is not None: + search_doc[self.last_updated_field] = lu_info + if self.store_hash: + hasher = sha1() + hasher.update(data) + obj_hash = hasher.hexdigest() + search_doc["obj_hash"] = obj_hash return search_doc def remove_docs(self, criteria: Dict, remove_s3_object: bool = False): @@ -430,18 +441,22 @@ def newer_in( def __hash__(self): return hash((self.index.__hash__, self.bucket)) - def rebuild_index_from_s3_data(self): + def rebuild_index_from_s3_data(self, **kwargs): """ Rebuilds the index Store from the data in S3 Relies on the index document being stores as the metadata for the file This can help recover lost databases """ - index_docs = [] - for file in self.s3_bucket.objects.all(): - # TODO: Transform the data back from strings and remove AWS S3 specific keys - index_docs.append(file.metadata) + bucket = self.s3_bucket + objects = bucket.objects.filter(Prefix=self.sub_dir) + for obj in objects: + key_ = self.sub_dir + obj.key + data = self.s3_bucket.Object(key_).get()["Body"].read() - self.index.update(index_docs) + if self.compress: + data = zlib.decompress(data) + unpacked_data = msgpack.unpackb(data, raw=False) + self.update(unpacked_data, **kwargs) def rebuild_metadata_from_index(self, index_query: dict = None): """ diff --git a/tests/stores/test_aws.py b/tests/stores/test_aws.py index cc944f9e9..8b2ddea30 100644 --- a/tests/stores/test_aws.py +++ b/tests/stores/test_aws.py @@ -26,8 +26,8 @@ def s3store(): conn = boto3.client("s3") conn.create_bucket(Bucket="bucket1") - index = MemoryStore("index") - store = S3Store(index, "bucket1") + index = MemoryStore("index", key="task_id") + store = S3Store(index, "bucket1", key="task_id") store.connect() store.update( @@ -154,8 +154,9 @@ def test_update(s3store): s3store.compress = True s3store.update([{"task_id": "mp-4", "data": "asd"}]) - assert s3store.index.query_one({"task_id": "mp-4"})["compression"] == "zlib" - assert s3store.query_one({"task_id": "mp-4"}) is not None + obj = s3store.index.query_one({"task_id": "mp-4"}) + assert obj["compression"] == "zlib" + assert obj["obj_hash"] == "be74de5ac71f00ec9e96441a3c325b0592c07f4c" assert s3store.query_one({"task_id": "mp-4"})["data"] == "asd" @@ -167,6 +168,21 @@ def test_rebuild_meta_from_index(s3store): assert s3_object.metadata["add_meta"] == "hello" +def test_rebuild_index(s3store): + s3store.update([{"task_id": "mp-2", "data": "asd"}]) + assert ( + s3store.index.query_one({"task_id": "mp-2"})["obj_hash"] + == "a69fe0c2cca3a3384c2b1d2f476972704f179741" + ) + s3store.index.remove_docs({}) + assert s3store.index.query_one({"task_id": "mp-2"}) is None + s3store.rebuild_index_from_s3_data() + assert ( + s3store.index.query_one({"task_id": "mp-2"})["obj_hash"] + == "a69fe0c2cca3a3384c2b1d2f476972704f179741" + ) + + def tests_msonable_read_write(s3store): dd = s3store.as_dict() s3store.update([{"task_id": "mp-2", "data": dd}])