From a363e68c1958809654ad8779880b8640eb595698 Mon Sep 17 00:00:00 2001 From: shyamd Date: Wed, 14 Mar 2018 13:53:45 -0700 Subject: [PATCH 01/11] only create new connecting if force_reset=True --- maggma/advanced_stores.py | 4 ++-- maggma/stores.py | 45 ++++++++++++++++++++------------------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/maggma/advanced_stores.py b/maggma/advanced_stores.py index 6a5b423c5..701337606 100644 --- a/maggma/advanced_stores.py +++ b/maggma/advanced_stores.py @@ -138,8 +138,8 @@ def close(self): def collection(self): return self.store.collection - def connect(self): - self.store.connect() + def connect(self, force_reset=False): + self.store.connect(force_reset=force_reset) def lazy_substitute(d, aliases): diff --git a/maggma/stores.py b/maggma/stores.py index 11139dca7..6460543a8 100644 --- a/maggma/stores.py +++ b/maggma/stores.py @@ -46,7 +46,7 @@ def collection(self): pass @abstractmethod - def connect(self): + def connect(self, force_reset=False): pass @abstractmethod @@ -270,12 +270,13 @@ def __init__(self, database, collection_name, host="localhost", port=27017, self.kwargs = kwargs super(MongoStore, self).__init__(**kwargs) - def connect(self): - conn = MongoClient(self.host, self.port) - db = conn[self.database] - if self.username is not "": - db.authenticate(self.username, self.password) - self._collection = db[self.collection_name] + def connect(self, force_reset=False): + if not self._collection or force_reset: + conn = MongoClient(self.host, self.port) + db = conn[self.database] + if self.username is not "": + db.authenticate(self.username, self.password) + self._collection = db[self.collection_name] def __hash__(self): return hash((self.database, self.collection_name, self.lu_field)) @@ -343,9 +344,8 @@ def __init__(self, name="memory_db", **kwargs): self.kwargs = kwargs super(MemoryStore, self).__init__(**kwargs) - def connect(self): - # Ensure we only connect once - if not self._collection: + def connect(self, force_reset=False): + if not self._collection or force_reset: self._collection = mongomock.MongoClient().db[self.name] def __hash__(self): @@ -380,8 +380,8 @@ def __init__(self, paths, **kwargs): self.kwargs = kwargs super(JSONStore, self).__init__("collection", **kwargs) - def connect(self): - super(JSONStore, self).connect() + def connect(self, force_reset=False): + super(JSONStore, self).connect(force_reset=force_reset) for path in self.paths: with zopen(path) as f: data = f.read() @@ -407,8 +407,8 @@ def __init__(self, dt, **kwargs): self.kwargs = kwargs super(DatetimeStore, self).__init__("date", **kwargs) - def connect(self): - super(DatetimeStore, self).connect() + def connect(self, force_reset=False): + super(DatetimeStore, self).connect(force_reset) self.collection.insert_one({self.lu_field: self.__dt}) @@ -434,15 +434,16 @@ def __init__(self, database, collection_name, host="localhost", port=27017, super(GridFSStore, self).__init__(**kwargs) - def connect(self): + def connect(self, force_reset=False): conn = MongoClient(self.host, self.port) - db = conn[self.database] - if self.username is not "": - db.authenticate(self.username, self.password) - - self._collection = gridfs.GridFS(db, self.collection_name) - self._files_collection = db["{}.files".format(self.collection_name)] - self._chunks_collection = db["{}.chunks".format(self.collection_name)] + if not self._collection or force_reset: + db = conn[self.database] + if self.username is not "": + db.authenticate(self.username, self.password) + + self._collection = gridfs.GridFS(db, self.collection_name) + self._files_collection = db["{}.files".format(self.collection_name)] + self._chunks_collection = db["{}.chunks".format(self.collection_name)] @property def collection(self): From 493fddf412f81982f38225943b37afd8f5e07460 Mon Sep 17 00:00:00 2001 From: shyamd Date: Wed, 14 Mar 2018 13:54:29 -0700 Subject: [PATCH 02/11] Add from_collection for MongoStore --- maggma/stores.py | 14 +++++++++ maggma/tests/test_stores.py | 59 ++++++++++++++++++++++--------------- 2 files changed, 49 insertions(+), 24 deletions(-) diff --git a/maggma/stores.py b/maggma/stores.py index 6460543a8..394c38132 100644 --- a/maggma/stores.py +++ b/maggma/stores.py @@ -331,6 +331,20 @@ def groupby(self, keys, properties=None, criteria=None, return self.collection.aggregate(pipeline, allowDiskUse=allow_disk_use) + @classmethod + def from_collection(cls, collection, **kwargs): + """ + Generates a MongoStore from a pymongo collection object + This is not a fully safe operation as it gives dummy information to the MongoStore + As a result, this will not serialize and can not reset its connection + """ + # TODO: How do we make this safer? + coll_name = collection.name + db_name = collection.database.name + + store = cls(db_name,coll_name,**kwargs) + store._collection = collection + return store class MemoryStore(Mongolike, Store): """ diff --git a/maggma/tests/test_stores.py b/maggma/tests/test_stores.py index 29e65d440..9c098931a 100644 --- a/maggma/tests/test_stores.py +++ b/maggma/tests/test_stores.py @@ -12,14 +12,11 @@ from maggma.stores import * module_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) -db_dir = os.path.abspath(os.path.join( - module_dir, "..", "..", "test_files", "settings_files")) -test_dir = os.path.abspath(os.path.join( - module_dir, "..", "..", "test_files", "test_set")) +db_dir = os.path.abspath(os.path.join(module_dir, "..", "..", "test_files", "settings_files")) +test_dir = os.path.abspath(os.path.join(module_dir, "..", "..", "test_files", "test_set")) class TestMongoStore(unittest.TestCase): - def setUp(self): self.mongostore = MongoStore("maggma_test", "test") self.mongostore.connect() @@ -28,8 +25,7 @@ def test_connect(self): mongostore = MongoStore("maggma_test", "test") self.assertEqual(mongostore.collection, None) mongostore.connect() - self.assertIsInstance(mongostore.collection, - pymongo.collection.Collection) + self.assertIsInstance(mongostore.collection, pymongo.collection.Collection) def test_query(self): self.mongostore.collection.insert({"a": 1, "b": 2, "c": 3}) @@ -53,28 +49,39 @@ def test_distinct(self): self.assertEqual(len(self.mongostore.distinct(["d", "e"], {"a": 4})), 3) all_exist = self.mongostore.distinct(["a", "b"], all_exist=True) self.assertEqual(len(all_exist), 1) - all_exist2 = self.mongostore.distinct( - ["a", "e"], all_exist=True, criteria={"d": 6}) + all_exist2 = self.mongostore.distinct(["a", "e"], all_exist=True, criteria={"d": 6}) self.assertEqual(len(all_exist2), 1) def test_update(self): self.mongostore.update([{"e": 6, "d": 4}], key="e") - self.assertEqual(self.mongostore.query( - criteria={"d": {"$exists": 1}}, properties=["d"])[0]["d"], 4) + self.assertEqual(self.mongostore.query(criteria={"d": {"$exists": 1}}, properties=["d"])[0]["d"], 4) self.mongostore.update([{"e": 7, "d": 8, "f": 9}], key=["d", "f"]) - self.assertEqual(self.mongostore.query_one( - criteria={"d": 8, "f": 9}, properties=["e"])["e"], 7) + self.assertEqual(self.mongostore.query_one(criteria={"d": 8, "f": 9}, properties=["e"])["e"], 7) self.mongostore.update([{"e": 11, "d": 8, "f": 9}], key=["d", "f"]) - self.assertEqual(self.mongostore.query_one( - criteria={"d": 8, "f": 9}, properties=["e"])["e"], 11) + self.assertEqual(self.mongostore.query_one(criteria={"d": 8, "f": 9}, properties=["e"])["e"], 11) def test_groupby(self): self.mongostore.collection.drop() - self.mongostore.update([{"e": 7, "d": 9, "f": 9}, - {"e": 7, "d": 9, "f": 10}, - {"e": 8, "d": 9, "f": 11}, - {"e": 9, "d": 10, "f": 12}], key="f") + self.mongostore.update( + [{ + "e": 7, + "d": 9, + "f": 9 + }, { + "e": 7, + "d": 9, + "f": 10 + }, { + "e": 8, + "d": 9, + "f": 11 + }, { + "e": 9, + "d": 10, + "f": 12 + }], + key="f") data = list(self.mongostore.groupby("d")) self.assertEqual(len(data), 2) grouped_by_9 = [g['docs'] for g in data if g['_id']['d'] == 9][0] @@ -89,28 +96,33 @@ def test_from_db_file(self): ms = MongoStore.from_db_file(os.path.join(db_dir, "db.json")) self.assertEqual(ms.collection_name, "tmp") + def test_from_collection(self): + ms = MongoStore.from_db_file(os.path.join(db_dir, "db.json")) + ms.connect() + + other_ms = MongoStore.from_collection(ms._collection) + self.assertEqual(ms.collection_name, other_ms.collection_name) + self.assertEqual(ms.database, other_ms.database) + def tearDown(self): if self.mongostore.collection: self.mongostore.collection.drop() class TestMemoryStore(unittest.TestCase): - def setUp(self): self.memstore = MemoryStore() def test(self): self.assertEqual(self.memstore.collection, None) self.memstore.connect() - self.assertIsInstance(self.memstore.collection, - mongomock.collection.Collection) + self.assertIsInstance(self.memstore.collection, mongomock.collection.Collection) def test_groupby(self): self.assertRaises(NotImplementedError, self.memstore.groupby, "a") class TestJsonStore(unittest.TestCase): - def test(self): files = [] for f in ["a.json", "b.json"]: @@ -126,7 +138,6 @@ def test(self): class TestGridFSStore(unittest.TestCase): - def setUp(self): self.gStore = GridFSStore("maggma_test", "test", key="task_id") self.gStore.connect() From 1da2678ccea59e59c1efae59bee8b86390745da8 Mon Sep 17 00:00:00 2001 From: shyamd Date: Wed, 14 Mar 2018 16:31:39 -0700 Subject: [PATCH 03/11] Move util methods to utils --- maggma/utils.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/maggma/utils.py b/maggma/utils.py index e4baa340b..1f4c5643a 100644 --- a/maggma/utils.py +++ b/maggma/utils.py @@ -4,7 +4,9 @@ """ import itertools from datetime import datetime, timedelta - +from pydash.utilities import to_path +from pydash.objects import set_, get, has +from pydash.objects import unset as _unset def dt_to_isoformat_ceil_ms(dt): """Helper to account for Mongo storing datetimes with only ms precision.""" @@ -113,3 +115,34 @@ def get_mpi(): size = 0 return comm, rank, size + +def lazy_substitute(d, aliases): + """ + Simple top level substitute that doesn't dive into mongo like strings + """ + for alias, key in aliases.items(): + if key in d: + d[alias] = d[key] + del d[key] + + +def substitute(d, aliases): + """ + Substitutes keys in dictionary + Accepts multilevel mongo like keys + """ + for alias, key in aliases.items(): + if has(d, key): + set_(d, alias, get(d, key)) + unset(d, key) + + +def unset(d, key): + """ + Unsets a key + """ + _unset(d, key) + path = to_path(key) + for i in reversed(range(1, len(path))): + if len(get(d, path[:i])) == 0: + unset(d, path[:i]) From 4208a3bd6ed6eed2af6f7200fb42e66737c1595c Mon Sep 17 00:00:00 2001 From: shyamd Date: Wed, 14 Mar 2018 16:31:47 -0700 Subject: [PATCH 04/11] remove unused sort options --- maggma/stores.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/maggma/stores.py b/maggma/stores.py index 394c38132..61817790b 100644 --- a/maggma/stores.py +++ b/maggma/stores.py @@ -476,7 +476,7 @@ def query(self, properties=None, criteria=None, **kwargs): against key-value pairs **kwargs (kwargs): further kwargs to Collection.find """ - for f in self.collection.find(filter=criteria, **kwargs).sort('uploadDate', pymongo.DESCENDING): + for f in self.collection.find(filter=criteria, **kwargs): data = f.read() try: json_dict = json.loads(data) @@ -484,7 +484,7 @@ def query(self, properties=None, criteria=None, **kwargs): except: yield data - def query_one(self, properties=None, criteria=None, sort=(('uploadDate', pymongo.DESCENDING),), **kwargs): + def query_one(self, properties=None, criteria=None, **kwargs): """ Function that gets a single document from GridFS. This store ignores all property projections as its designed for whole @@ -573,4 +573,4 @@ def update(self, docs, update_lu=True, key=None): self.collection.put(data, **search_doc) def close(self): - self.collection.database.client.close() + self.collection.database.client.close() \ No newline at end of file From c7d073b14f53d61a589c026625b121f580f63d0b Mon Sep 17 00:00:00 2001 From: shyamd Date: Wed, 14 Mar 2018 17:34:04 -0700 Subject: [PATCH 05/11] Initial commit of an indexed S3 Store --- maggma/advanced_stores.py | 235 ++++++++++++++++++++++++++++++++------ 1 file changed, 200 insertions(+), 35 deletions(-) diff --git a/maggma/advanced_stores.py b/maggma/advanced_stores.py index 701337606..fa54472aa 100644 --- a/maggma/advanced_stores.py +++ b/maggma/advanced_stores.py @@ -2,13 +2,15 @@ """ Advanced Stores for behavior outside normal access patterns """ -from maggma.stores import Store, MongoStore -from pydash.objects import set_, get, has -from pydash.utilities import to_path -import pydash.objects +import os import hvac import json -import os +import boto3 +import botocore +from datetime import datetime +from maggma.stores import Store, MongoStore +from maggma.utils import lazy_substitute, substitute, unset +from pymongo import DESCENDING class VaultStore(MongoStore): @@ -16,6 +18,7 @@ class VaultStore(MongoStore): Extends MongoStore to read credentials out of Vault server and uses these values to initialize MongoStore instance """ + def __init__(self, collection_name, vault_secret_path): """ collection (string): name of mongo collection @@ -58,12 +61,7 @@ def __init__(self, collection_name, vault_secret_path): username = db_creds.get("username", "") password = db_creds.get("password", "") - super(VaultStore, self).__init__(database, - collection_name, - host, - port, - username, - password) + super(VaultStore, self).__init__(database, collection_name, host, port, username, password) class AliasingStore(Store): @@ -142,33 +140,200 @@ def connect(self, force_reset=False): self.store.connect(force_reset=force_reset) -def lazy_substitute(d, aliases): +class AmazonS3Store(Store): """ - Simple top level substitute that doesn't dive into mongo like strings + GridFS like storage using Amazon S3 and a regular store for indexing + Assumes Amazon AWS key and secret key are set in environment or default config file """ - for alias, key in aliases.items(): - if key in d: - d[alias] = d[key] - del d[key] + def __init__(self, index, bucket, **kwargs): + """ + Initializes an S3 Store + Args: + index (Store): a store to use to index the S3 Bucket + bucket (str) : name of the bucket + """ + self.index = index + self.bucket = bucket + self.s3 = None + self.s3_bucket = None + # Force the key to be the same as the index + kwargs["key"] = index.key + super(AmazonS3Store, self).__init__(**kwargs) -def substitute(d, aliases): - """ - Substitutes keys in dictionary - Accepts multilevel mongo like keys - """ - for alias, key in aliases.items(): - if has(d, key): - set_(d, alias, get(d, key)) - unset(d, key) + @property + def collection(self): + return self.index + def connect(self, force_reset=False): + self.index.connect(force_reset=force_reset) + if not self.s3: + self.s3 = boto3.resource("s3") + # TODO: Provide configuration variable to create bucket if not present + if self.bucket not in self.s3.list_buckets(): + raise Exception("Bucket not present on AWS: {}".format(self.bucket)) + self.s3_bucket = s3.Bucket(self.bucket) -def unset(d, key): - """ - Unsets a key - """ - pydash.objects.unset(d, key) - path = to_path(key) - for i in reversed(range(1, len(path))): - if len(get(d, path[:i])) == 0: - unset(d, path[:i]) + def close(self): + self.index.close() + + @property + def collection(self): + # For now returns the index collection since that is what we would "search" on + return self.index + + def query(self, properties=None, criteria=None, **kwargs): + """ + Function that gets data from Amazon S3. This store ignores all + property projections as its designed for whole document access + + Args: + properties (list or dict): This will be ignored by the S3 + Store + criteria (dict): filter for query, matches documents + against key-value pairs + **kwargs (kwargs): further kwargs to Collection.find + """ + for f in self.index.find(filter=criteria, **kwargs): + try: + data = self.s3_bucket.Object(f[self.key]).get() + except botocore.exceptions.ClientError as e: + # If a client error is thrown, then check that it was a 404 error. + # If it was a 404 error, then the object does not exist. + error_code = int(e.response['Error']['Code']) + if error_code == 404: + self.logger.error("Could not find S3 object {}".format(f[self.key])) + break + + if f.get("compression", "") is "zlib": + data = zlib.decompress(data) + + yield json.loads(data) + + def query_one(self, properties=None, criteria=None, **kwargs): + """ + Function that gets a single document from Amazon S3. This store + ignores all property projections as its designed for whole + document access + + Args: + properties (list or dict): This will be ignored by the S3 + Store + criteria (dict): filter for query, matches documents + against key-value pairs + **kwargs (kwargs): further kwargs to Collection.find + """ + f = self.index.find_one(filter=criteria, **kwargs) + if f: + try: + data = self.s3_bucket.Object(f[self.key]).get() + except botocore.exceptions.ClientError as e: + # If a client error is thrown, then check that it was a 404 error. + # If it was a 404 error, then the object does not exist. + error_code = int(e.response['Error']['Code']) + if error_code == 404: + self.logger.error("Could not find S3 object {}".format(f[self.key])) + return None + + if f.get("compression", "") is "zlib": + data = zlib.decompress(data) + + return json.loads(data) + else: + return None + + def distinct(self, key, criteria=None, all_exist=False, **kwargs): + """ + Function get to get all distinct values of a certain key in the + GridFS Store. This searches the .files collection for this data + + Args: + key (mongolike key or list of mongolike keys): key or keys + for which to find distinct values or sets of values. + criteria (filter criteria): criteria for filter + all_exist (bool): whether to ensure all keys in list exist + in each document, defaults to False + **kwargs (kwargs): kwargs corresponding to collection.distinct + """ + # Index is a store so it should have its own distinct function + return self.index.distinct(key, filter=criteria, **kwargs) + + def ensure_index(self, key, unique=False): + """ + Wrapper for pymongo.Collection.ensure_index for the files collection + """ + return self.index.ensure_index(key, unique=unique, background=True) + + def update(self, docs, update_lu=True, key=None, compress=False): + """ + Function to update associated MongoStore collection. + + Args: + docs ([dict]): list of documents + key ([str] or str): keys to use to build search doc + compress (bool): compress the document or not + """ + now = datetime.now() + search_docs = [] + for d in docs: + search_doc = {} + if isinstance(key, list): + search_doc = {k: d[k] for k in key} + elif key: + search_doc = {key: d[key]} + + # Always include our main key + search_doc = {self.key: d[self.key]} + + # Remove MongoDB _id from search + if "_id" in search_doc: + del search_doc["_id"] + + # Add a timestamp + if update_lu: + search_doc[self.lu_key] = now + d[self.lu_key] = now + + data = json.dumps(jsanitize(d)).encode() + + # Compress with zlib if chosen + if compress: + search_doc["compression"] = "zlib" + data = zlib.compress(data) + + self.s3_bucket.put_object(Key=d[self.key], Body=data, Metadata=search_doc) + search_docs.append(search_doc) + + # Use store's update to remove key clashes + self.index.update(search_docs) + + @property + def last_updated(self): + self.index.last_updated + + def lu_filter(self, targets): + """Creates a MongoDB filter for new documents. + + By "new", we mean documents in this Store that were last updated later + than any document in targets. + + Args: + targets (list): A list of Stores + + """ + self.index.last_updated(targets) + + def __hash__(self): + return hash((self.index.__hash__, self.bucket)) + + def rebuild_index_from_s3_data(self): + """ + Rebuilds the index Store from the data in S3 + Relies on the index document being stores as the metadata for the file + """ + index_docs = [] + for file in self.s3_bucket.objects.all(): + # TODO: Transform the data back from strings and remove AWS S3 specific keys + index_docs.append(file.metadata) + + self.index.update(index_docs) From 7ddbd8cead91188330f74e6e1e3b197df582aad3 Mon Sep 17 00:00:00 2001 From: shyamd Date: Thu, 15 Mar 2018 07:14:40 -0700 Subject: [PATCH 06/11] Add boto3 as optional req --- requirements-optional.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-optional.txt b/requirements-optional.txt index 68033521c..b78a1acfe 100644 --- a/requirements-optional.txt +++ b/requirements-optional.txt @@ -1,3 +1,4 @@ nose==1.3.4 mpi4py==3.0.0 numpy==1.14.0 +boto3==1.6.9 \ No newline at end of file From 06cba7b46ce568132e18c28ea0c0fb9739226ccc Mon Sep 17 00:00:00 2001 From: shyamd Date: Thu, 29 Mar 2018 11:59:45 -0700 Subject: [PATCH 07/11] Fix dumb bugs --- maggma/advanced_stores.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/maggma/advanced_stores.py b/maggma/advanced_stores.py index fa54472aa..1519997c2 100644 --- a/maggma/advanced_stores.py +++ b/maggma/advanced_stores.py @@ -10,7 +10,6 @@ from datetime import datetime from maggma.stores import Store, MongoStore from maggma.utils import lazy_substitute, substitute, unset -from pymongo import DESCENDING class VaultStore(MongoStore): @@ -161,10 +160,6 @@ def __init__(self, index, bucket, **kwargs): kwargs["key"] = index.key super(AmazonS3Store, self).__init__(**kwargs) - @property - def collection(self): - return self.index - def connect(self, force_reset=False): self.index.connect(force_reset=force_reset) if not self.s3: @@ -309,7 +304,7 @@ def update(self, docs, update_lu=True, key=None, compress=False): @property def last_updated(self): - self.index.last_updated + return self.index.last_updated def lu_filter(self, targets): """Creates a MongoDB filter for new documents. @@ -321,7 +316,7 @@ def lu_filter(self, targets): targets (list): A list of Stores """ - self.index.last_updated(targets) + self.index.lu_filter(targets) def __hash__(self): return hash((self.index.__hash__, self.bucket)) From 8d381c82aa823d79cf51f4e78c1b2e7054bfd01b Mon Sep 17 00:00:00 2001 From: shyamd Date: Thu, 29 Mar 2018 16:49:02 -0700 Subject: [PATCH 08/11] Bug fixes and tests --- maggma/advanced_stores.py | 19 ++++++------ maggma/tests/test_advanced_stores.py | 43 +++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/maggma/advanced_stores.py b/maggma/advanced_stores.py index 1519997c2..7570cc781 100644 --- a/maggma/advanced_stores.py +++ b/maggma/advanced_stores.py @@ -7,9 +7,11 @@ import json import boto3 import botocore +import zlib from datetime import datetime from maggma.stores import Store, MongoStore -from maggma.utils import lazy_substitute, substitute, unset +from maggma.utils import lazy_substitute, substitute +from monty.json import jsanitize class VaultStore(MongoStore): @@ -167,7 +169,7 @@ def connect(self, force_reset=False): # TODO: Provide configuration variable to create bucket if not present if self.bucket not in self.s3.list_buckets(): raise Exception("Bucket not present on AWS: {}".format(self.bucket)) - self.s3_bucket = s3.Bucket(self.bucket) + self.s3_bucket = self.s3.Bucket(self.bucket) def close(self): self.index.close() @@ -189,7 +191,7 @@ def query(self, properties=None, criteria=None, **kwargs): against key-value pairs **kwargs (kwargs): further kwargs to Collection.find """ - for f in self.index.find(filter=criteria, **kwargs): + for f in self.index.query(criteria=criteria, **kwargs): try: data = self.s3_bucket.Object(f[self.key]).get() except botocore.exceptions.ClientError as e: @@ -218,7 +220,7 @@ def query_one(self, properties=None, criteria=None, **kwargs): against key-value pairs **kwargs (kwargs): further kwargs to Collection.find """ - f = self.index.find_one(filter=criteria, **kwargs) + f = self.index.query_one(criteria=criteria, **kwargs) if f: try: data = self.s3_bucket.Object(f[self.key]).get() @@ -271,14 +273,15 @@ def update(self, docs, update_lu=True, key=None, compress=False): now = datetime.now() search_docs = [] for d in docs: - search_doc = {} if isinstance(key, list): search_doc = {k: d[k] for k in key} elif key: search_doc = {key: d[key]} + else: + search_doc = {} # Always include our main key - search_doc = {self.key: d[self.key]} + search_doc[self.key] = d[self.key] # Remove MongoDB _id from search if "_id" in search_doc: @@ -286,8 +289,8 @@ def update(self, docs, update_lu=True, key=None, compress=False): # Add a timestamp if update_lu: - search_doc[self.lu_key] = now - d[self.lu_key] = now + search_doc[self.lu_field] = now + d[self.lu_field] = now data = json.dumps(jsanitize(d)).encode() diff --git a/maggma/tests/test_advanced_stores.py b/maggma/tests/test_advanced_stores.py index 259e95d11..750941421 100644 --- a/maggma/tests/test_advanced_stores.py +++ b/maggma/tests/test_advanced_stores.py @@ -4,10 +4,11 @@ """ import os import unittest -from unittest.mock import patch +from unittest.mock import patch, MagicMock from maggma.stores import MemoryStore, MongoStore from maggma.advanced_stores import * +import zlib module_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) @@ -72,6 +73,46 @@ def test_vault_missing_env(self): self._create_vault_store() +class TestS3Store(unittest.TestCase): + + def setUp(self): + self.index = MemoryStore("index'") + with patch("boto3.resource") as mock_resource: + mock_resource.return_value = MagicMock() + mock_resource("s3").list_buckets.return_value = ["bucket1","bucket2"] + self.s3store = AmazonS3Store(self.index,"bucket1") + self.s3store.connect() + + def test_qeuery_one(self): + self.s3store.s3_bucket.Object.return_value = MagicMock() + self.s3store.s3_bucket.Object().get.return_value = '{"task_id": "mp-1", "data": "asd"}' + self.index.update([{"task_id":"mp-1"}]) + self.assertEqual(self.s3store.query_one(criteria={"task_id": "mp-2"}),None) + self.assertEqual(self.s3store.query_one(criteria={"task_id": "mp-1"})["data"],"asd") + + self.s3store.s3_bucket.Object().get.return_value = zlib.compress('{"task_id": "mp-3", "data": "sdf"}'.encode()) + self.index.update([{"task_id":"mp-3", "compression": "zlib"}]) + self.assertEqual(self.s3store.query_one(criteria={"task_id": "mp-3"})["data"],"sdf") + + def test_update(self): + + self.s3store.update([{"task_id": "mp-1", "data": "asd"}]) + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + called_kwargs = self.s3store.s3_bucket.put_object.call_args[1] + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + self.assertEqual(called_kwargs["Key"], "mp-1") + self.assertTrue(len(called_kwargs["Body"]) > 0) + self.assertEqual(called_kwargs["Metadata"]["task_id"], "mp-1") + + def test_update_compression(self): + self.s3store.update([{"task_id": "mp-1", "data": "asd"}],compress=True) + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + called_kwargs = self.s3store.s3_bucket.put_object.call_args[1] + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + self.assertEqual(called_kwargs["Key"], "mp-1") + self.assertTrue(len(called_kwargs["Body"]) > 0) + self.assertEqual(called_kwargs["Metadata"]["task_id"], "mp-1") + self.assertEqual(called_kwargs["Metadata"]["compression"], "zlib") class TestAliasingStore(unittest.TestCase): From 1a567e26038cb0015b36dad3857db2c14db38abd Mon Sep 17 00:00:00 2001 From: shyamd Date: Sat, 31 Mar 2018 08:39:24 -0700 Subject: [PATCH 09/11] Updated distinct and groupby to work for all basic stores --- maggma/stores.py | 296 +++++++++++++++++++++++++----------- maggma/tests/test_stores.py | 31 +++- 2 files changed, 233 insertions(+), 94 deletions(-) diff --git a/maggma/stores.py b/maggma/stores.py index 61817790b..780b8b406 100644 --- a/maggma/stores.py +++ b/maggma/stores.py @@ -8,10 +8,11 @@ from datetime import datetime import json - import mongomock import pymongo import gridfs +from itertools import groupby +from operator import itemgetter from pymongo import MongoClient, DESCENDING from pydash import identity @@ -43,43 +44,87 @@ def __init__(self, key="task_id", lu_field='last_updated', lu_type="datetime"): @property @abstractmethod def collection(self): + """ + Returns a handle to the pymongo collection object + Not guaranteed to exist in the future + """ pass @abstractmethod def connect(self, force_reset=False): + """ + Connect to the source data + """ pass @abstractmethod def close(self): + """ + Closes any connections + """ pass @abstractmethod def query(self, properties=None, criteria=None, **kwargs): + """ + Queries the Store for a set of properties + """ pass @abstractmethod def query_one(self, properties=None, criteria=None, **kwargs): + """ + Get one property from the store + """ pass @abstractmethod def distinct(self, key, criteria=None, **kwargs): + """ + Get all distinct values for a key + """ pass @abstractmethod - def update(self, docs, update_lu=True, key=None): + def update(self, docs, update_lu=True, key=None, **kwargs): + """ + Update docs into the store + """ pass @abstractmethod - def ensure_index(self, key, unique=False): + def ensure_index(self, key, unique=False, **kwargs): + """ + Ensure index gets assigned + """ + pass + + @abstractmethod + def groupby(self, keys, properties=None, criteria=None, **kwargs): + """ + Simple grouping function that will group documents + by keys. + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + + """ pass @property def last_updated(self): - doc = next(self.query(properties=[self.lu_field]).sort( - [(self.lu_field, pymongo.DESCENDING)]).limit(1), None) + doc = next(self.query(properties=[self.lu_field]).sort([(self.lu_field, pymongo.DESCENDING)]).limit(1), None) # Handle when collection has docs but `NoneType` lu_field. - return (self.lu_func[0](doc[self.lu_field]) if (doc and doc[self.lu_field]) - else datetime.min) + return (self.lu_func[0](doc[self.lu_field]) if (doc and doc[self.lu_field]) else datetime.min) def lu_filter(self, targets): """Creates a MongoDB filter for new documents. @@ -104,7 +149,7 @@ def __ne__(self, other): return not self == other def __hash__(self): - return hash((self.lu_field,)) + return hash((self.lu_field, )) def __getstate__(self): return self.as_dict() @@ -140,8 +185,7 @@ def query(self, properties=None, criteria=None, **kwargs): """ if isinstance(properties, list): properties = {p: 1 for p in properties} - return self.collection.find(filter=criteria, projection=properties, - **kwargs) + return self.collection.find(filter=criteria, projection=properties, **kwargs) def query_one(self, properties=None, criteria=None, **kwargs): """ @@ -158,51 +202,17 @@ def query_one(self, properties=None, criteria=None, **kwargs): """ if isinstance(properties, list): properties = {p: 1 for p in properties} - return self.collection.find_one(filter=criteria, projection=properties, - **kwargs) + return self.collection.find_one(filter=criteria, projection=properties, **kwargs) - def distinct(self, key, criteria=None, all_exist=False, **kwargs): - """ - Function get to get all distinct values of a certain key in - a mongolike store. May take a single key or a list of keys - - Args: - key (mongolike key or list of mongolike keys): key or keys - for which to find distinct values or sets of values. - criteria (filter criteria): criteria for filter - all_exist (bool): whether to ensure all keys in list exist - in each document, defaults to False - **kwargs (kwargs): kwargs corresponding to collection.distinct - """ - if isinstance(key, list): - agg_pipeline = [{"$match": criteria}] if criteria else [] - if all_exist: - agg_pipeline.append( - {"$match": {k: {"$exists": True} for k in key}}) - # use string ints as keys and replace later to avoid bug - # where periods can't be in group keys, then reconstruct after - group_op = {"$group": { - "_id": {str(n): "${}".format(k) for n, k in enumerate(key)}}} - agg_pipeline.append(group_op) - results = [r['_id'] - for r in self.collection.aggregate(agg_pipeline)] - for result in results: - for n in list(result.keys()): - result[key[int(n)]] = result.pop(n) - - # Return as document as partial matches are included - return results - - else: - return self.collection.distinct(key, filter=criteria, **kwargs) - - def ensure_index(self, key, unique=False): + def ensure_index(self, key, unique=False, **kwargs): """ Wrapper for pymongo.Collection.ensure_index """ - return self.collection.create_index(key, unique=unique, background=True) + if "background" not in kwargs: + kwargs["background"] = True + return self.collection.create_index(key, unique=unique, **kwargs) - def update(self, docs, update_lu=True, key=None): + def update(self, docs, update_lu=True, key=None, **kwargs): """ Function to update associated MongoStore collection. @@ -227,7 +237,6 @@ def update(self, docs, update_lu=True, key=None): self.logger.error('Document failed to validate: {}'.format(d)) if validates: - search_doc = {} if isinstance(key, list): search_doc = {k: d[k] for k in key} elif key: @@ -240,6 +249,33 @@ def update(self, docs, update_lu=True, key=None): bulk.execute() + def distinct(self, key, criteria=None, all_exist=False, **kwargs): + """ + Function get to get all distinct values of a certain key in + a mongolike store. May take a single key or a list of keys + + Args: + key (mongolike key or list of mongolike keys): key or keys + for which to find distinct values or sets of values. + criteria (filter criteria): criteria for filter + all_exist (bool): whether to ensure all keys in list exist + in each document, defaults to False + **kwargs (kwargs): kwargs corresponding to collection.distinct + """ + if isinstance(key, list): + criteria = criteria if criteria else {} + # Update to ensure keys are there + if all_exist: + criteria.update({k: {"$exists": True} for k in key if k not in criteria}) + + results = [] + for d in self.groupby(key, properties=key, criteria=criteria): + results.append(d["_id"]) + return results + + else: + return self.collection.distinct(key, filter=criteria, **kwargs) + def close(self): self.collection.database.client.close() @@ -249,8 +285,7 @@ class MongoStore(Mongolike, Store): A Store that connects to a Mongo collection """ - def __init__(self, database, collection_name, host="localhost", port=27017, - username="", password="", **kwargs): + def __init__(self, database, collection_name, host="localhost", port=27017, username="", password="", **kwargs): """ Args: database (str): database name @@ -293,8 +328,7 @@ def from_db_file(cls, filename): kwargs.pop("aliases", None) return cls(**kwargs) - def groupby(self, keys, properties=None, criteria=None, - allow_disk_use=True): + def groupby(self, keys, properties=None, criteria=None, allow_disk_use=True, **kwargs): """ Simple grouping function that will group documents by keys. @@ -324,10 +358,7 @@ def groupby(self, keys, properties=None, criteria=None, keys = [keys] group_id = {key: "${}".format(key) for key in keys} - pipeline.append({"$group": {"_id": group_id, - "docs": {"$push": "$$ROOT"} - } - }) + pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}}) return self.collection.aggregate(pipeline, allowDiskUse=allow_disk_use) @@ -338,14 +369,15 @@ def from_collection(cls, collection, **kwargs): This is not a fully safe operation as it gives dummy information to the MongoStore As a result, this will not serialize and can not reset its connection """ - # TODO: How do we make this safer? + # TODO: How do we make this safer? coll_name = collection.name db_name = collection.database.name - store = cls(db_name,coll_name,**kwargs) + store = cls(db_name, coll_name, **kwargs) store._collection = collection return store + class MemoryStore(Mongolike, Store): """ An in-memory Store that functions similarly @@ -365,17 +397,72 @@ def connect(self, force_reset=False): def __hash__(self): return hash((self.name, self.lu_field)) - def groupby(self, keys, properties=None, criteria=None, - allow_disk_use=True): + def groupby(self, keys, properties=None, criteria=None, **kwargs): + """ + Simple grouping function that will group documents + by keys. + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + allow_disk_use (bool): whether to allow disk use in aggregation + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + """ - This is a placeholder for groupby for memorystores, - current version of mongomock do not allow for standard - aggregation methods, so this method currently raises - a NotImplementedError + keys = keys if isinstance(keys, list) else [keys] + + input_data = list(self.query(properties=keys, criteria=criteria)) + + if len(keys) > 1: + grouper = itemgetter(*keys) + for key, grp in groupby(sorted(input_data, key=grouper), grouper): + temp_dict = {"_id": zip(keys, key), "docs": list(grp)} + yield temp_dict + else: + grouper = itemgetter(*keys) + for key, grp in groupby(sorted(input_data, key=grouper), grouper): + temp_dict = {"_id": {keys[0]: key}, "docs": list(grp)} + yield temp_dict + + def update(self, docs, update_lu=True, key=None, **kwargs): """ - raise NotImplementedError("groupby not available for {}" - "due to mongomock incompatibility".format( - self.__class__)) + Function to update associated MongoStore collection. + + Args: + docs: list of documents + """ + + for d in docs: + + d = jsanitize(d, allow_bson=True) + + # document-level validation is optional + validates = True + if self.validator: + validates = self.validator.is_valid(d) + if not validates: + if self.validator.strict: + raise ValueError('Document failed to validate: {}'.format(d)) + else: + self.logger.error('Document failed to validate: {}'.format(d)) + + if validates: + if isinstance(key, list): + search_doc = {k: d[k] for k in key} + elif key: + search_doc = {key: d[key]} + else: + search_doc = {self.key: d[self.key]} + if update_lu: + d[self.lu_field] = datetime.utcnow() + self.collection.insert_one(d) class JSONStore(MemoryStore): @@ -401,8 +488,7 @@ def connect(self, force_reset=False): data = f.read() data = data.decode() if isinstance(data, bytes) else data objects = json.loads(data) - objects = [objects] if not isinstance( - objects, list) else objects + objects = [objects] if not isinstance(objects, list) else objects self.collection.insert_many(objects) def __hash__(self): @@ -431,8 +517,7 @@ class GridFSStore(Store): A Store for GrdiFS backend. Provides a common access method consistent with other stores """ - def __init__(self, database, collection_name, host="localhost", port=27017, - username="", password="", **kwargs): + def __init__(self, database, collection_name, host="localhost", port=27017, username="", password="", **kwargs): self.database = database self.collection_name = collection_name @@ -510,8 +595,8 @@ def query_one(self, properties=None, criteria=None, **kwargs): def distinct(self, key, criteria=None, all_exist=False, **kwargs): """ - Function get to get all distinct values of a certain key in the - GridFS Store. This searches the .files collection for this data + Function get to get all distinct values of a certain key in + a mongolike store. May take a single key or a list of keys Args: key (mongolike key or list of mongolike keys): key or keys @@ -522,27 +607,53 @@ def distinct(self, key, criteria=None, all_exist=False, **kwargs): **kwargs (kwargs): kwargs corresponding to collection.distinct """ if isinstance(key, list): - agg_pipeline = [{"$match": criteria}] if criteria else [] + criteria = criteria if criteria else {} + # Update to ensure keys are there if all_exist: - agg_pipeline.append( - {"$match": {k: {"$exists": True} for k in key}}) - # use string ints as keys and replace later to avoid bug - # where periods can't be in group keys, then reconstruct after - group_op = {"$group": { - "_id": {str(n): "${}".format(k) for n, k in enumerate(key)}}} - agg_pipeline.append(group_op) - results = [r['_id'] - for r in self._files_collection.aggregate(agg_pipeline)] - for result in results: - for n in list(result.keys()): - result[key[int(n)]] = result.pop(n) - - # Return as document as partial matches are included + criteria.update({k: {"$exists": True} for k in key if k not in criteria}) + + results = [] + for d in self.groupby(key, properties=key, criteria=criteria): + results.append(d["_id"]) return results else: return self._files_collection.distinct(key, filter=criteria, **kwargs) + def groupby(self, keys, properties=None, criteria=None, allow_disk_use=True, **kwargs): + """ + Simple grouping function that will group documents + by keys. + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + allow_disk_use (bool): whether to allow disk use in aggregation + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + + """ + pipeline = [] + if criteria is not None: + pipeline.append({"$match": criteria}) + + if properties is not None: + pipeline.append({"$project": {p: 1 for p in properties}}) + + if isinstance(keys, str): + keys = [keys] + + group_id = {key: "${}".format(key) for key in keys} + pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}}) + + return self.collection.aggregate(pipeline, allowDiskUse=allow_disk_use) + def ensure_index(self, key, unique=False): """ Wrapper for pymongo.Collection.ensure_index for the files collection @@ -556,7 +667,6 @@ def update(self, docs, update_lu=True, key=None): Args: docs: list of documents """ - for d in docs: search_doc = {} if isinstance(key, list): @@ -573,4 +683,4 @@ def update(self, docs, update_lu=True, key=None): self.collection.put(data, **search_doc) def close(self): - self.collection.database.client.close() \ No newline at end of file + self.collection.database.client.close() diff --git a/maggma/tests/test_stores.py b/maggma/tests/test_stores.py index 9c098931a..61b714eee 100644 --- a/maggma/tests/test_stores.py +++ b/maggma/tests/test_stores.py @@ -113,13 +113,42 @@ class TestMemoryStore(unittest.TestCase): def setUp(self): self.memstore = MemoryStore() + def test(self): self.assertEqual(self.memstore.collection, None) self.memstore.connect() self.assertIsInstance(self.memstore.collection, mongomock.collection.Collection) def test_groupby(self): - self.assertRaises(NotImplementedError, self.memstore.groupby, "a") + self.memstore.connect() + self.memstore.update( + [{ + "e": 7, + "d": 9, + "f": 9 + }, { + "e": 7, + "d": 9, + "f": 10 + }, { + "e": 8, + "d": 9, + "f": 11 + }, { + "e": 9, + "d": 10, + "f": 12 + }], + key="f") + data = list(self.memstore.groupby("d")) + self.assertEqual(len(data), 2) + grouped_by_9 = [g['docs'] for g in data if g['_id']['d'] == 9][0] + self.assertEqual(len(grouped_by_9), 3) + grouped_by_10 = [g['docs'] for g in data if g['_id']['d'] == 10][0] + self.assertEqual(len(grouped_by_10), 1) + + data = list(self.memstore.groupby(["e", "d"])) + self.assertEqual(len(data), 3) class TestJsonStore(unittest.TestCase): From 6508aab54e52b6e4c41944043da97e11f9c66b1d Mon Sep 17 00:00:00 2001 From: shyamd Date: Sat, 31 Mar 2018 08:51:35 -0700 Subject: [PATCH 10/11] fix groupby and distinct for advanced stores --- maggma/advanced_stores.py | 58 +++++++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/maggma/advanced_stores.py b/maggma/advanced_stores.py index 7570cc781..117812c42 100644 --- a/maggma/advanced_stores.py +++ b/maggma/advanced_stores.py @@ -76,7 +76,9 @@ def __init__(self, store, aliases, **kwargs): aliases (dict): dict of aliases of the form external key: internal key """ self.store = store + # Given an external key tells what the internal key is self.aliases = aliases + # Given the internal key tells us what the external key is self.reverse_aliases = {v: k for k, v in aliases.items()} self.kwargs = kwargs @@ -108,11 +110,36 @@ def query_one(self, properties=None, criteria=None, **kwargs): return d def distinct(self, key, criteria=None, **kwargs): - if key in self.aliases: - key = self.aliases[key] + if isinstance(key, list): + criteria = criteria if criteria else {} + # Update to ensure keys are there + if all_exist: + criteria.update({k: {"$exists": True} for k in key if k not in criteria}) + + results = [] + for d in self.groupby(key, properties=key, criteria=criteria): + results.append(d["_id"]) + return results + + else: + criteria = criteria if criteria else {} + lazy_substitute(criteria, self.reverse_aliases) + key = self.aliases[key] if key in self.aliases else key + return self.collection.distinct(key, filter=criteria, **kwargs) + + def groupby(self, keys, properties=None, criteria=None, **kwargs): + # Convert to a list + keys = keys if isinstance(keys, list) else [keys] + + # Make the aliasing transformations on keys + keys = [self.aliases[k] if k in self.aliases else k for k in keys] + + # Update criteria and properties based on aliases criteria = criteria if criteria else {} - lazy_substitute(criteria, self.aliases) - return self.store.distinct(key, criteria, **kwargs) + substitute(properties, self.reverse_aliases) + lazy_substitute(criteria, self.reverse_aliases) + + return self.store.groupby(keys=keys, properties=properties, criteria=criteria, **kwargs) def update(self, docs, update_lu=True, key=None): key = key if key else self.key @@ -242,7 +269,7 @@ def query_one(self, properties=None, criteria=None, **kwargs): def distinct(self, key, criteria=None, all_exist=False, **kwargs): """ Function get to get all distinct values of a certain key in the - GridFS Store. This searches the .files collection for this data + AmazonS3 Store. This searches the index collection for this data Args: key (mongolike key or list of mongolike keys): key or keys @@ -255,6 +282,27 @@ def distinct(self, key, criteria=None, all_exist=False, **kwargs): # Index is a store so it should have its own distinct function return self.index.distinct(key, filter=criteria, **kwargs) + def groupby(self, keys, properties=None, criteria=None, **kwargs): + """ + Simple grouping function that will group documents + by keys. Only searches the index collection + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + allow_disk_use (bool): whether to allow disk use in aggregation + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + + """ + self.index.groupby(keys, properties, criteria, **kwargs) + def ensure_index(self, key, unique=False): """ Wrapper for pymongo.Collection.ensure_index for the files collection From 0456b009f12b96d57851ab3b0fd795771eeba64a Mon Sep 17 00:00:00 2001 From: shyamd Date: Sat, 31 Mar 2018 09:03:53 -0700 Subject: [PATCH 11/11] Codacy Clean up --- examples/runner_sample.py | 12 +++++++----- maggma/advanced_stores.py | 4 ++-- maggma/runner.py | 4 +++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/examples/runner_sample.py b/examples/runner_sample.py index 017746b1f..a259b3f24 100755 --- a/examples/runner_sample.py +++ b/examples/runner_sample.py @@ -1,8 +1,10 @@ -# Usage: -# with multiprocessing: -# python runner_sample.py -# with mpi(need mpi4py pacakge): -# mpiexec -n 5 python runner_sample.py +""" + Usage: + with multiprocessing: + python runner_sample.py + with mpi(need mpi4py pacakge): + mpiexec -n 5 python runner_sample.py +""" from maggma.stores import MemoryStore from maggma.builder import Builder diff --git a/maggma/advanced_stores.py b/maggma/advanced_stores.py index 117812c42..e523bd013 100644 --- a/maggma/advanced_stores.py +++ b/maggma/advanced_stores.py @@ -152,10 +152,10 @@ def update(self, docs, update_lu=True, key=None): self.store.update(docs, update_lu=update_lu, key=key) - def ensure_index(self, key, unique=False): + def ensure_index(self, key, unique=False, **kwargs): if key in self.aliases: key = self.aliases - return self.store.ensure_index(key, unique) + return self.store.ensure_index(key, unique, **kwargs) def close(self): self.store.close() diff --git a/maggma/runner.py b/maggma/runner.py index 4150ea66f..835dd411b 100644 --- a/maggma/runner.py +++ b/maggma/runner.py @@ -7,7 +7,6 @@ import abc import logging -import time from collections import defaultdict, deque from threading import Thread, Condition, BoundedSemaphore from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor @@ -225,6 +224,9 @@ def update_targets(self): class MultiprocProcessor(BaseProcessor): + """ + Processor to run builders using python multiprocessing + """ def __init__(self, builders, num_workers=None): # multiprocessing only if mpi is not used, no mixing self.num_workers = num_workers