diff --git a/examples/runner_sample.py b/examples/runner_sample.py index 017746b1f..a259b3f24 100755 --- a/examples/runner_sample.py +++ b/examples/runner_sample.py @@ -1,8 +1,10 @@ -# Usage: -# with multiprocessing: -# python runner_sample.py -# with mpi(need mpi4py pacakge): -# mpiexec -n 5 python runner_sample.py +""" + Usage: + with multiprocessing: + python runner_sample.py + with mpi(need mpi4py pacakge): + mpiexec -n 5 python runner_sample.py +""" from maggma.stores import MemoryStore from maggma.builder import Builder diff --git a/maggma/advanced_stores.py b/maggma/advanced_stores.py index 6a5b423c5..e523bd013 100644 --- a/maggma/advanced_stores.py +++ b/maggma/advanced_stores.py @@ -2,13 +2,16 @@ """ Advanced Stores for behavior outside normal access patterns """ -from maggma.stores import Store, MongoStore -from pydash.objects import set_, get, has -from pydash.utilities import to_path -import pydash.objects +import os import hvac import json -import os +import boto3 +import botocore +import zlib +from datetime import datetime +from maggma.stores import Store, MongoStore +from maggma.utils import lazy_substitute, substitute +from monty.json import jsanitize class VaultStore(MongoStore): @@ -16,6 +19,7 @@ class VaultStore(MongoStore): Extends MongoStore to read credentials out of Vault server and uses these values to initialize MongoStore instance """ + def __init__(self, collection_name, vault_secret_path): """ collection (string): name of mongo collection @@ -58,12 +62,7 @@ def __init__(self, collection_name, vault_secret_path): username = db_creds.get("username", "") password = db_creds.get("password", "") - super(VaultStore, self).__init__(database, - collection_name, - host, - port, - username, - password) + super(VaultStore, self).__init__(database, collection_name, host, port, username, password) class AliasingStore(Store): @@ -77,7 +76,9 @@ def __init__(self, store, aliases, **kwargs): aliases (dict): dict of aliases of the form external key: internal key """ self.store = store + # Given an external key tells what the internal key is self.aliases = aliases + # Given the internal key tells us what the external key is self.reverse_aliases = {v: k for k, v in aliases.items()} self.kwargs = kwargs @@ -109,11 +110,36 @@ def query_one(self, properties=None, criteria=None, **kwargs): return d def distinct(self, key, criteria=None, **kwargs): - if key in self.aliases: - key = self.aliases[key] + if isinstance(key, list): + criteria = criteria if criteria else {} + # Update to ensure keys are there + if all_exist: + criteria.update({k: {"$exists": True} for k in key if k not in criteria}) + + results = [] + for d in self.groupby(key, properties=key, criteria=criteria): + results.append(d["_id"]) + return results + + else: + criteria = criteria if criteria else {} + lazy_substitute(criteria, self.reverse_aliases) + key = self.aliases[key] if key in self.aliases else key + return self.collection.distinct(key, filter=criteria, **kwargs) + + def groupby(self, keys, properties=None, criteria=None, **kwargs): + # Convert to a list + keys = keys if isinstance(keys, list) else [keys] + + # Make the aliasing transformations on keys + keys = [self.aliases[k] if k in self.aliases else k for k in keys] + + # Update criteria and properties based on aliases criteria = criteria if criteria else {} - lazy_substitute(criteria, self.aliases) - return self.store.distinct(key, criteria, **kwargs) + substitute(properties, self.reverse_aliases) + lazy_substitute(criteria, self.reverse_aliases) + + return self.store.groupby(keys=keys, properties=properties, criteria=criteria, **kwargs) def update(self, docs, update_lu=True, key=None): key = key if key else self.key @@ -126,10 +152,10 @@ def update(self, docs, update_lu=True, key=None): self.store.update(docs, update_lu=update_lu, key=key) - def ensure_index(self, key, unique=False): + def ensure_index(self, key, unique=False, **kwargs): if key in self.aliases: key = self.aliases - return self.store.ensure_index(key, unique) + return self.store.ensure_index(key, unique, **kwargs) def close(self): self.store.close() @@ -138,37 +164,222 @@ def close(self): def collection(self): return self.store.collection - def connect(self): - self.store.connect() + def connect(self, force_reset=False): + self.store.connect(force_reset=force_reset) -def lazy_substitute(d, aliases): +class AmazonS3Store(Store): """ - Simple top level substitute that doesn't dive into mongo like strings + GridFS like storage using Amazon S3 and a regular store for indexing + Assumes Amazon AWS key and secret key are set in environment or default config file """ - for alias, key in aliases.items(): - if key in d: - d[alias] = d[key] - del d[key] + def __init__(self, index, bucket, **kwargs): + """ + Initializes an S3 Store + Args: + index (Store): a store to use to index the S3 Bucket + bucket (str) : name of the bucket + """ + self.index = index + self.bucket = bucket + self.s3 = None + self.s3_bucket = None + # Force the key to be the same as the index + kwargs["key"] = index.key + super(AmazonS3Store, self).__init__(**kwargs) + + def connect(self, force_reset=False): + self.index.connect(force_reset=force_reset) + if not self.s3: + self.s3 = boto3.resource("s3") + # TODO: Provide configuration variable to create bucket if not present + if self.bucket not in self.s3.list_buckets(): + raise Exception("Bucket not present on AWS: {}".format(self.bucket)) + self.s3_bucket = self.s3.Bucket(self.bucket) -def substitute(d, aliases): - """ - Substitutes keys in dictionary - Accepts multilevel mongo like keys - """ - for alias, key in aliases.items(): - if has(d, key): - set_(d, alias, get(d, key)) - unset(d, key) + def close(self): + self.index.close() + @property + def collection(self): + # For now returns the index collection since that is what we would "search" on + return self.index -def unset(d, key): - """ - Unsets a key - """ - pydash.objects.unset(d, key) - path = to_path(key) - for i in reversed(range(1, len(path))): - if len(get(d, path[:i])) == 0: - unset(d, path[:i]) + def query(self, properties=None, criteria=None, **kwargs): + """ + Function that gets data from Amazon S3. This store ignores all + property projections as its designed for whole document access + + Args: + properties (list or dict): This will be ignored by the S3 + Store + criteria (dict): filter for query, matches documents + against key-value pairs + **kwargs (kwargs): further kwargs to Collection.find + """ + for f in self.index.query(criteria=criteria, **kwargs): + try: + data = self.s3_bucket.Object(f[self.key]).get() + except botocore.exceptions.ClientError as e: + # If a client error is thrown, then check that it was a 404 error. + # If it was a 404 error, then the object does not exist. + error_code = int(e.response['Error']['Code']) + if error_code == 404: + self.logger.error("Could not find S3 object {}".format(f[self.key])) + break + + if f.get("compression", "") is "zlib": + data = zlib.decompress(data) + + yield json.loads(data) + + def query_one(self, properties=None, criteria=None, **kwargs): + """ + Function that gets a single document from Amazon S3. This store + ignores all property projections as its designed for whole + document access + + Args: + properties (list or dict): This will be ignored by the S3 + Store + criteria (dict): filter for query, matches documents + against key-value pairs + **kwargs (kwargs): further kwargs to Collection.find + """ + f = self.index.query_one(criteria=criteria, **kwargs) + if f: + try: + data = self.s3_bucket.Object(f[self.key]).get() + except botocore.exceptions.ClientError as e: + # If a client error is thrown, then check that it was a 404 error. + # If it was a 404 error, then the object does not exist. + error_code = int(e.response['Error']['Code']) + if error_code == 404: + self.logger.error("Could not find S3 object {}".format(f[self.key])) + return None + + if f.get("compression", "") is "zlib": + data = zlib.decompress(data) + + return json.loads(data) + else: + return None + + def distinct(self, key, criteria=None, all_exist=False, **kwargs): + """ + Function get to get all distinct values of a certain key in the + AmazonS3 Store. This searches the index collection for this data + + Args: + key (mongolike key or list of mongolike keys): key or keys + for which to find distinct values or sets of values. + criteria (filter criteria): criteria for filter + all_exist (bool): whether to ensure all keys in list exist + in each document, defaults to False + **kwargs (kwargs): kwargs corresponding to collection.distinct + """ + # Index is a store so it should have its own distinct function + return self.index.distinct(key, filter=criteria, **kwargs) + + def groupby(self, keys, properties=None, criteria=None, **kwargs): + """ + Simple grouping function that will group documents + by keys. Only searches the index collection + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + allow_disk_use (bool): whether to allow disk use in aggregation + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + + """ + self.index.groupby(keys, properties, criteria, **kwargs) + + def ensure_index(self, key, unique=False): + """ + Wrapper for pymongo.Collection.ensure_index for the files collection + """ + return self.index.ensure_index(key, unique=unique, background=True) + + def update(self, docs, update_lu=True, key=None, compress=False): + """ + Function to update associated MongoStore collection. + + Args: + docs ([dict]): list of documents + key ([str] or str): keys to use to build search doc + compress (bool): compress the document or not + """ + now = datetime.now() + search_docs = [] + for d in docs: + if isinstance(key, list): + search_doc = {k: d[k] for k in key} + elif key: + search_doc = {key: d[key]} + else: + search_doc = {} + + # Always include our main key + search_doc[self.key] = d[self.key] + + # Remove MongoDB _id from search + if "_id" in search_doc: + del search_doc["_id"] + + # Add a timestamp + if update_lu: + search_doc[self.lu_field] = now + d[self.lu_field] = now + + data = json.dumps(jsanitize(d)).encode() + + # Compress with zlib if chosen + if compress: + search_doc["compression"] = "zlib" + data = zlib.compress(data) + + self.s3_bucket.put_object(Key=d[self.key], Body=data, Metadata=search_doc) + search_docs.append(search_doc) + + # Use store's update to remove key clashes + self.index.update(search_docs) + + @property + def last_updated(self): + return self.index.last_updated + + def lu_filter(self, targets): + """Creates a MongoDB filter for new documents. + + By "new", we mean documents in this Store that were last updated later + than any document in targets. + + Args: + targets (list): A list of Stores + + """ + self.index.lu_filter(targets) + + def __hash__(self): + return hash((self.index.__hash__, self.bucket)) + + def rebuild_index_from_s3_data(self): + """ + Rebuilds the index Store from the data in S3 + Relies on the index document being stores as the metadata for the file + """ + index_docs = [] + for file in self.s3_bucket.objects.all(): + # TODO: Transform the data back from strings and remove AWS S3 specific keys + index_docs.append(file.metadata) + + self.index.update(index_docs) diff --git a/maggma/runner.py b/maggma/runner.py index 4150ea66f..835dd411b 100644 --- a/maggma/runner.py +++ b/maggma/runner.py @@ -7,7 +7,6 @@ import abc import logging -import time from collections import defaultdict, deque from threading import Thread, Condition, BoundedSemaphore from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor @@ -225,6 +224,9 @@ def update_targets(self): class MultiprocProcessor(BaseProcessor): + """ + Processor to run builders using python multiprocessing + """ def __init__(self, builders, num_workers=None): # multiprocessing only if mpi is not used, no mixing self.num_workers = num_workers diff --git a/maggma/stores.py b/maggma/stores.py index 11139dca7..780b8b406 100644 --- a/maggma/stores.py +++ b/maggma/stores.py @@ -8,10 +8,11 @@ from datetime import datetime import json - import mongomock import pymongo import gridfs +from itertools import groupby +from operator import itemgetter from pymongo import MongoClient, DESCENDING from pydash import identity @@ -43,43 +44,87 @@ def __init__(self, key="task_id", lu_field='last_updated', lu_type="datetime"): @property @abstractmethod def collection(self): + """ + Returns a handle to the pymongo collection object + Not guaranteed to exist in the future + """ pass @abstractmethod - def connect(self): + def connect(self, force_reset=False): + """ + Connect to the source data + """ pass @abstractmethod def close(self): + """ + Closes any connections + """ pass @abstractmethod def query(self, properties=None, criteria=None, **kwargs): + """ + Queries the Store for a set of properties + """ pass @abstractmethod def query_one(self, properties=None, criteria=None, **kwargs): + """ + Get one property from the store + """ pass @abstractmethod def distinct(self, key, criteria=None, **kwargs): + """ + Get all distinct values for a key + """ pass @abstractmethod - def update(self, docs, update_lu=True, key=None): + def update(self, docs, update_lu=True, key=None, **kwargs): + """ + Update docs into the store + """ pass @abstractmethod - def ensure_index(self, key, unique=False): + def ensure_index(self, key, unique=False, **kwargs): + """ + Ensure index gets assigned + """ + pass + + @abstractmethod + def groupby(self, keys, properties=None, criteria=None, **kwargs): + """ + Simple grouping function that will group documents + by keys. + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + + """ pass @property def last_updated(self): - doc = next(self.query(properties=[self.lu_field]).sort( - [(self.lu_field, pymongo.DESCENDING)]).limit(1), None) + doc = next(self.query(properties=[self.lu_field]).sort([(self.lu_field, pymongo.DESCENDING)]).limit(1), None) # Handle when collection has docs but `NoneType` lu_field. - return (self.lu_func[0](doc[self.lu_field]) if (doc and doc[self.lu_field]) - else datetime.min) + return (self.lu_func[0](doc[self.lu_field]) if (doc and doc[self.lu_field]) else datetime.min) def lu_filter(self, targets): """Creates a MongoDB filter for new documents. @@ -104,7 +149,7 @@ def __ne__(self, other): return not self == other def __hash__(self): - return hash((self.lu_field,)) + return hash((self.lu_field, )) def __getstate__(self): return self.as_dict() @@ -140,8 +185,7 @@ def query(self, properties=None, criteria=None, **kwargs): """ if isinstance(properties, list): properties = {p: 1 for p in properties} - return self.collection.find(filter=criteria, projection=properties, - **kwargs) + return self.collection.find(filter=criteria, projection=properties, **kwargs) def query_one(self, properties=None, criteria=None, **kwargs): """ @@ -158,51 +202,17 @@ def query_one(self, properties=None, criteria=None, **kwargs): """ if isinstance(properties, list): properties = {p: 1 for p in properties} - return self.collection.find_one(filter=criteria, projection=properties, - **kwargs) - - def distinct(self, key, criteria=None, all_exist=False, **kwargs): - """ - Function get to get all distinct values of a certain key in - a mongolike store. May take a single key or a list of keys + return self.collection.find_one(filter=criteria, projection=properties, **kwargs) - Args: - key (mongolike key or list of mongolike keys): key or keys - for which to find distinct values or sets of values. - criteria (filter criteria): criteria for filter - all_exist (bool): whether to ensure all keys in list exist - in each document, defaults to False - **kwargs (kwargs): kwargs corresponding to collection.distinct - """ - if isinstance(key, list): - agg_pipeline = [{"$match": criteria}] if criteria else [] - if all_exist: - agg_pipeline.append( - {"$match": {k: {"$exists": True} for k in key}}) - # use string ints as keys and replace later to avoid bug - # where periods can't be in group keys, then reconstruct after - group_op = {"$group": { - "_id": {str(n): "${}".format(k) for n, k in enumerate(key)}}} - agg_pipeline.append(group_op) - results = [r['_id'] - for r in self.collection.aggregate(agg_pipeline)] - for result in results: - for n in list(result.keys()): - result[key[int(n)]] = result.pop(n) - - # Return as document as partial matches are included - return results - - else: - return self.collection.distinct(key, filter=criteria, **kwargs) - - def ensure_index(self, key, unique=False): + def ensure_index(self, key, unique=False, **kwargs): """ Wrapper for pymongo.Collection.ensure_index """ - return self.collection.create_index(key, unique=unique, background=True) + if "background" not in kwargs: + kwargs["background"] = True + return self.collection.create_index(key, unique=unique, **kwargs) - def update(self, docs, update_lu=True, key=None): + def update(self, docs, update_lu=True, key=None, **kwargs): """ Function to update associated MongoStore collection. @@ -227,7 +237,6 @@ def update(self, docs, update_lu=True, key=None): self.logger.error('Document failed to validate: {}'.format(d)) if validates: - search_doc = {} if isinstance(key, list): search_doc = {k: d[k] for k in key} elif key: @@ -240,6 +249,33 @@ def update(self, docs, update_lu=True, key=None): bulk.execute() + def distinct(self, key, criteria=None, all_exist=False, **kwargs): + """ + Function get to get all distinct values of a certain key in + a mongolike store. May take a single key or a list of keys + + Args: + key (mongolike key or list of mongolike keys): key or keys + for which to find distinct values or sets of values. + criteria (filter criteria): criteria for filter + all_exist (bool): whether to ensure all keys in list exist + in each document, defaults to False + **kwargs (kwargs): kwargs corresponding to collection.distinct + """ + if isinstance(key, list): + criteria = criteria if criteria else {} + # Update to ensure keys are there + if all_exist: + criteria.update({k: {"$exists": True} for k in key if k not in criteria}) + + results = [] + for d in self.groupby(key, properties=key, criteria=criteria): + results.append(d["_id"]) + return results + + else: + return self.collection.distinct(key, filter=criteria, **kwargs) + def close(self): self.collection.database.client.close() @@ -249,8 +285,7 @@ class MongoStore(Mongolike, Store): A Store that connects to a Mongo collection """ - def __init__(self, database, collection_name, host="localhost", port=27017, - username="", password="", **kwargs): + def __init__(self, database, collection_name, host="localhost", port=27017, username="", password="", **kwargs): """ Args: database (str): database name @@ -270,12 +305,13 @@ def __init__(self, database, collection_name, host="localhost", port=27017, self.kwargs = kwargs super(MongoStore, self).__init__(**kwargs) - def connect(self): - conn = MongoClient(self.host, self.port) - db = conn[self.database] - if self.username is not "": - db.authenticate(self.username, self.password) - self._collection = db[self.collection_name] + def connect(self, force_reset=False): + if not self._collection or force_reset: + conn = MongoClient(self.host, self.port) + db = conn[self.database] + if self.username is not "": + db.authenticate(self.username, self.password) + self._collection = db[self.collection_name] def __hash__(self): return hash((self.database, self.collection_name, self.lu_field)) @@ -292,8 +328,7 @@ def from_db_file(cls, filename): kwargs.pop("aliases", None) return cls(**kwargs) - def groupby(self, keys, properties=None, criteria=None, - allow_disk_use=True): + def groupby(self, keys, properties=None, criteria=None, allow_disk_use=True, **kwargs): """ Simple grouping function that will group documents by keys. @@ -323,13 +358,25 @@ def groupby(self, keys, properties=None, criteria=None, keys = [keys] group_id = {key: "${}".format(key) for key in keys} - pipeline.append({"$group": {"_id": group_id, - "docs": {"$push": "$$ROOT"} - } - }) + pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}}) return self.collection.aggregate(pipeline, allowDiskUse=allow_disk_use) + @classmethod + def from_collection(cls, collection, **kwargs): + """ + Generates a MongoStore from a pymongo collection object + This is not a fully safe operation as it gives dummy information to the MongoStore + As a result, this will not serialize and can not reset its connection + """ + # TODO: How do we make this safer? + coll_name = collection.name + db_name = collection.database.name + + store = cls(db_name, coll_name, **kwargs) + store._collection = collection + return store + class MemoryStore(Mongolike, Store): """ @@ -343,25 +390,79 @@ def __init__(self, name="memory_db", **kwargs): self.kwargs = kwargs super(MemoryStore, self).__init__(**kwargs) - def connect(self): - # Ensure we only connect once - if not self._collection: + def connect(self, force_reset=False): + if not self._collection or force_reset: self._collection = mongomock.MongoClient().db[self.name] def __hash__(self): return hash((self.name, self.lu_field)) - def groupby(self, keys, properties=None, criteria=None, - allow_disk_use=True): + def groupby(self, keys, properties=None, criteria=None, **kwargs): """ - This is a placeholder for groupby for memorystores, - current version of mongomock do not allow for standard - aggregation methods, so this method currently raises - a NotImplementedError + Simple grouping function that will group documents + by keys. + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + allow_disk_use (bool): whether to allow disk use in aggregation + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + + """ + keys = keys if isinstance(keys, list) else [keys] + + input_data = list(self.query(properties=keys, criteria=criteria)) + + if len(keys) > 1: + grouper = itemgetter(*keys) + for key, grp in groupby(sorted(input_data, key=grouper), grouper): + temp_dict = {"_id": zip(keys, key), "docs": list(grp)} + yield temp_dict + else: + grouper = itemgetter(*keys) + for key, grp in groupby(sorted(input_data, key=grouper), grouper): + temp_dict = {"_id": {keys[0]: key}, "docs": list(grp)} + yield temp_dict + + def update(self, docs, update_lu=True, key=None, **kwargs): + """ + Function to update associated MongoStore collection. + + Args: + docs: list of documents """ - raise NotImplementedError("groupby not available for {}" - "due to mongomock incompatibility".format( - self.__class__)) + + for d in docs: + + d = jsanitize(d, allow_bson=True) + + # document-level validation is optional + validates = True + if self.validator: + validates = self.validator.is_valid(d) + if not validates: + if self.validator.strict: + raise ValueError('Document failed to validate: {}'.format(d)) + else: + self.logger.error('Document failed to validate: {}'.format(d)) + + if validates: + if isinstance(key, list): + search_doc = {k: d[k] for k in key} + elif key: + search_doc = {key: d[key]} + else: + search_doc = {self.key: d[self.key]} + if update_lu: + d[self.lu_field] = datetime.utcnow() + self.collection.insert_one(d) class JSONStore(MemoryStore): @@ -380,15 +481,14 @@ def __init__(self, paths, **kwargs): self.kwargs = kwargs super(JSONStore, self).__init__("collection", **kwargs) - def connect(self): - super(JSONStore, self).connect() + def connect(self, force_reset=False): + super(JSONStore, self).connect(force_reset=force_reset) for path in self.paths: with zopen(path) as f: data = f.read() data = data.decode() if isinstance(data, bytes) else data objects = json.loads(data) - objects = [objects] if not isinstance( - objects, list) else objects + objects = [objects] if not isinstance(objects, list) else objects self.collection.insert_many(objects) def __hash__(self): @@ -407,8 +507,8 @@ def __init__(self, dt, **kwargs): self.kwargs = kwargs super(DatetimeStore, self).__init__("date", **kwargs) - def connect(self): - super(DatetimeStore, self).connect() + def connect(self, force_reset=False): + super(DatetimeStore, self).connect(force_reset) self.collection.insert_one({self.lu_field: self.__dt}) @@ -417,8 +517,7 @@ class GridFSStore(Store): A Store for GrdiFS backend. Provides a common access method consistent with other stores """ - def __init__(self, database, collection_name, host="localhost", port=27017, - username="", password="", **kwargs): + def __init__(self, database, collection_name, host="localhost", port=27017, username="", password="", **kwargs): self.database = database self.collection_name = collection_name @@ -434,15 +533,16 @@ def __init__(self, database, collection_name, host="localhost", port=27017, super(GridFSStore, self).__init__(**kwargs) - def connect(self): + def connect(self, force_reset=False): conn = MongoClient(self.host, self.port) - db = conn[self.database] - if self.username is not "": - db.authenticate(self.username, self.password) + if not self._collection or force_reset: + db = conn[self.database] + if self.username is not "": + db.authenticate(self.username, self.password) - self._collection = gridfs.GridFS(db, self.collection_name) - self._files_collection = db["{}.files".format(self.collection_name)] - self._chunks_collection = db["{}.chunks".format(self.collection_name)] + self._collection = gridfs.GridFS(db, self.collection_name) + self._files_collection = db["{}.files".format(self.collection_name)] + self._chunks_collection = db["{}.chunks".format(self.collection_name)] @property def collection(self): @@ -461,7 +561,7 @@ def query(self, properties=None, criteria=None, **kwargs): against key-value pairs **kwargs (kwargs): further kwargs to Collection.find """ - for f in self.collection.find(filter=criteria, **kwargs).sort('uploadDate', pymongo.DESCENDING): + for f in self.collection.find(filter=criteria, **kwargs): data = f.read() try: json_dict = json.loads(data) @@ -469,7 +569,7 @@ def query(self, properties=None, criteria=None, **kwargs): except: yield data - def query_one(self, properties=None, criteria=None, sort=(('uploadDate', pymongo.DESCENDING),), **kwargs): + def query_one(self, properties=None, criteria=None, **kwargs): """ Function that gets a single document from GridFS. This store ignores all property projections as its designed for whole @@ -495,8 +595,8 @@ def query_one(self, properties=None, criteria=None, sort=(('uploadDate', pymongo def distinct(self, key, criteria=None, all_exist=False, **kwargs): """ - Function get to get all distinct values of a certain key in the - GridFS Store. This searches the .files collection for this data + Function get to get all distinct values of a certain key in + a mongolike store. May take a single key or a list of keys Args: key (mongolike key or list of mongolike keys): key or keys @@ -507,27 +607,53 @@ def distinct(self, key, criteria=None, all_exist=False, **kwargs): **kwargs (kwargs): kwargs corresponding to collection.distinct """ if isinstance(key, list): - agg_pipeline = [{"$match": criteria}] if criteria else [] + criteria = criteria if criteria else {} + # Update to ensure keys are there if all_exist: - agg_pipeline.append( - {"$match": {k: {"$exists": True} for k in key}}) - # use string ints as keys and replace later to avoid bug - # where periods can't be in group keys, then reconstruct after - group_op = {"$group": { - "_id": {str(n): "${}".format(k) for n, k in enumerate(key)}}} - agg_pipeline.append(group_op) - results = [r['_id'] - for r in self._files_collection.aggregate(agg_pipeline)] - for result in results: - for n in list(result.keys()): - result[key[int(n)]] = result.pop(n) - - # Return as document as partial matches are included + criteria.update({k: {"$exists": True} for k in key if k not in criteria}) + + results = [] + for d in self.groupby(key, properties=key, criteria=criteria): + results.append(d["_id"]) return results else: return self._files_collection.distinct(key, filter=criteria, **kwargs) + def groupby(self, keys, properties=None, criteria=None, allow_disk_use=True, **kwargs): + """ + Simple grouping function that will group documents + by keys. + + Args: + keys (list or string): fields to group documents + properties (list): properties to return in grouped documents + criteria (dict): filter for documents to group + allow_disk_use (bool): whether to allow disk use in aggregation + + Returns: + command cursor corresponding to grouped documents + + elements of the command cursor have the structure: + {'_id': {"KEY_1": value_1, "KEY_2": value_2 ..., + 'docs': [list_of_documents corresponding to key values]} + + """ + pipeline = [] + if criteria is not None: + pipeline.append({"$match": criteria}) + + if properties is not None: + pipeline.append({"$project": {p: 1 for p in properties}}) + + if isinstance(keys, str): + keys = [keys] + + group_id = {key: "${}".format(key) for key in keys} + pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}}) + + return self.collection.aggregate(pipeline, allowDiskUse=allow_disk_use) + def ensure_index(self, key, unique=False): """ Wrapper for pymongo.Collection.ensure_index for the files collection @@ -541,7 +667,6 @@ def update(self, docs, update_lu=True, key=None): Args: docs: list of documents """ - for d in docs: search_doc = {} if isinstance(key, list): diff --git a/maggma/tests/test_advanced_stores.py b/maggma/tests/test_advanced_stores.py index 259e95d11..750941421 100644 --- a/maggma/tests/test_advanced_stores.py +++ b/maggma/tests/test_advanced_stores.py @@ -4,10 +4,11 @@ """ import os import unittest -from unittest.mock import patch +from unittest.mock import patch, MagicMock from maggma.stores import MemoryStore, MongoStore from maggma.advanced_stores import * +import zlib module_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) @@ -72,6 +73,46 @@ def test_vault_missing_env(self): self._create_vault_store() +class TestS3Store(unittest.TestCase): + + def setUp(self): + self.index = MemoryStore("index'") + with patch("boto3.resource") as mock_resource: + mock_resource.return_value = MagicMock() + mock_resource("s3").list_buckets.return_value = ["bucket1","bucket2"] + self.s3store = AmazonS3Store(self.index,"bucket1") + self.s3store.connect() + + def test_qeuery_one(self): + self.s3store.s3_bucket.Object.return_value = MagicMock() + self.s3store.s3_bucket.Object().get.return_value = '{"task_id": "mp-1", "data": "asd"}' + self.index.update([{"task_id":"mp-1"}]) + self.assertEqual(self.s3store.query_one(criteria={"task_id": "mp-2"}),None) + self.assertEqual(self.s3store.query_one(criteria={"task_id": "mp-1"})["data"],"asd") + + self.s3store.s3_bucket.Object().get.return_value = zlib.compress('{"task_id": "mp-3", "data": "sdf"}'.encode()) + self.index.update([{"task_id":"mp-3", "compression": "zlib"}]) + self.assertEqual(self.s3store.query_one(criteria={"task_id": "mp-3"})["data"],"sdf") + + def test_update(self): + + self.s3store.update([{"task_id": "mp-1", "data": "asd"}]) + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + called_kwargs = self.s3store.s3_bucket.put_object.call_args[1] + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + self.assertEqual(called_kwargs["Key"], "mp-1") + self.assertTrue(len(called_kwargs["Body"]) > 0) + self.assertEqual(called_kwargs["Metadata"]["task_id"], "mp-1") + + def test_update_compression(self): + self.s3store.update([{"task_id": "mp-1", "data": "asd"}],compress=True) + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + called_kwargs = self.s3store.s3_bucket.put_object.call_args[1] + self.assertEqual(self.s3store.s3_bucket.put_object.call_count,1) + self.assertEqual(called_kwargs["Key"], "mp-1") + self.assertTrue(len(called_kwargs["Body"]) > 0) + self.assertEqual(called_kwargs["Metadata"]["task_id"], "mp-1") + self.assertEqual(called_kwargs["Metadata"]["compression"], "zlib") class TestAliasingStore(unittest.TestCase): diff --git a/maggma/tests/test_stores.py b/maggma/tests/test_stores.py index 29e65d440..61b714eee 100644 --- a/maggma/tests/test_stores.py +++ b/maggma/tests/test_stores.py @@ -12,14 +12,11 @@ from maggma.stores import * module_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) -db_dir = os.path.abspath(os.path.join( - module_dir, "..", "..", "test_files", "settings_files")) -test_dir = os.path.abspath(os.path.join( - module_dir, "..", "..", "test_files", "test_set")) +db_dir = os.path.abspath(os.path.join(module_dir, "..", "..", "test_files", "settings_files")) +test_dir = os.path.abspath(os.path.join(module_dir, "..", "..", "test_files", "test_set")) class TestMongoStore(unittest.TestCase): - def setUp(self): self.mongostore = MongoStore("maggma_test", "test") self.mongostore.connect() @@ -28,8 +25,7 @@ def test_connect(self): mongostore = MongoStore("maggma_test", "test") self.assertEqual(mongostore.collection, None) mongostore.connect() - self.assertIsInstance(mongostore.collection, - pymongo.collection.Collection) + self.assertIsInstance(mongostore.collection, pymongo.collection.Collection) def test_query(self): self.mongostore.collection.insert({"a": 1, "b": 2, "c": 3}) @@ -53,28 +49,39 @@ def test_distinct(self): self.assertEqual(len(self.mongostore.distinct(["d", "e"], {"a": 4})), 3) all_exist = self.mongostore.distinct(["a", "b"], all_exist=True) self.assertEqual(len(all_exist), 1) - all_exist2 = self.mongostore.distinct( - ["a", "e"], all_exist=True, criteria={"d": 6}) + all_exist2 = self.mongostore.distinct(["a", "e"], all_exist=True, criteria={"d": 6}) self.assertEqual(len(all_exist2), 1) def test_update(self): self.mongostore.update([{"e": 6, "d": 4}], key="e") - self.assertEqual(self.mongostore.query( - criteria={"d": {"$exists": 1}}, properties=["d"])[0]["d"], 4) + self.assertEqual(self.mongostore.query(criteria={"d": {"$exists": 1}}, properties=["d"])[0]["d"], 4) self.mongostore.update([{"e": 7, "d": 8, "f": 9}], key=["d", "f"]) - self.assertEqual(self.mongostore.query_one( - criteria={"d": 8, "f": 9}, properties=["e"])["e"], 7) + self.assertEqual(self.mongostore.query_one(criteria={"d": 8, "f": 9}, properties=["e"])["e"], 7) self.mongostore.update([{"e": 11, "d": 8, "f": 9}], key=["d", "f"]) - self.assertEqual(self.mongostore.query_one( - criteria={"d": 8, "f": 9}, properties=["e"])["e"], 11) + self.assertEqual(self.mongostore.query_one(criteria={"d": 8, "f": 9}, properties=["e"])["e"], 11) def test_groupby(self): self.mongostore.collection.drop() - self.mongostore.update([{"e": 7, "d": 9, "f": 9}, - {"e": 7, "d": 9, "f": 10}, - {"e": 8, "d": 9, "f": 11}, - {"e": 9, "d": 10, "f": 12}], key="f") + self.mongostore.update( + [{ + "e": 7, + "d": 9, + "f": 9 + }, { + "e": 7, + "d": 9, + "f": 10 + }, { + "e": 8, + "d": 9, + "f": 11 + }, { + "e": 9, + "d": 10, + "f": 12 + }], + key="f") data = list(self.mongostore.groupby("d")) self.assertEqual(len(data), 2) grouped_by_9 = [g['docs'] for g in data if g['_id']['d'] == 9][0] @@ -89,28 +96,62 @@ def test_from_db_file(self): ms = MongoStore.from_db_file(os.path.join(db_dir, "db.json")) self.assertEqual(ms.collection_name, "tmp") + def test_from_collection(self): + ms = MongoStore.from_db_file(os.path.join(db_dir, "db.json")) + ms.connect() + + other_ms = MongoStore.from_collection(ms._collection) + self.assertEqual(ms.collection_name, other_ms.collection_name) + self.assertEqual(ms.database, other_ms.database) + def tearDown(self): if self.mongostore.collection: self.mongostore.collection.drop() class TestMemoryStore(unittest.TestCase): - def setUp(self): self.memstore = MemoryStore() + def test(self): self.assertEqual(self.memstore.collection, None) self.memstore.connect() - self.assertIsInstance(self.memstore.collection, - mongomock.collection.Collection) + self.assertIsInstance(self.memstore.collection, mongomock.collection.Collection) def test_groupby(self): - self.assertRaises(NotImplementedError, self.memstore.groupby, "a") + self.memstore.connect() + self.memstore.update( + [{ + "e": 7, + "d": 9, + "f": 9 + }, { + "e": 7, + "d": 9, + "f": 10 + }, { + "e": 8, + "d": 9, + "f": 11 + }, { + "e": 9, + "d": 10, + "f": 12 + }], + key="f") + data = list(self.memstore.groupby("d")) + self.assertEqual(len(data), 2) + grouped_by_9 = [g['docs'] for g in data if g['_id']['d'] == 9][0] + self.assertEqual(len(grouped_by_9), 3) + grouped_by_10 = [g['docs'] for g in data if g['_id']['d'] == 10][0] + self.assertEqual(len(grouped_by_10), 1) + data = list(self.memstore.groupby(["e", "d"])) + self.assertEqual(len(data), 3) -class TestJsonStore(unittest.TestCase): +class TestJsonStore(unittest.TestCase): def test(self): files = [] for f in ["a.json", "b.json"]: @@ -126,7 +167,6 @@ def test(self): class TestGridFSStore(unittest.TestCase): - def setUp(self): self.gStore = GridFSStore("maggma_test", "test", key="task_id") self.gStore.connect() diff --git a/maggma/utils.py b/maggma/utils.py index e4baa340b..1f4c5643a 100644 --- a/maggma/utils.py +++ b/maggma/utils.py @@ -4,7 +4,9 @@ """ import itertools from datetime import datetime, timedelta - +from pydash.utilities import to_path +from pydash.objects import set_, get, has +from pydash.objects import unset as _unset def dt_to_isoformat_ceil_ms(dt): """Helper to account for Mongo storing datetimes with only ms precision.""" @@ -113,3 +115,34 @@ def get_mpi(): size = 0 return comm, rank, size + +def lazy_substitute(d, aliases): + """ + Simple top level substitute that doesn't dive into mongo like strings + """ + for alias, key in aliases.items(): + if key in d: + d[alias] = d[key] + del d[key] + + +def substitute(d, aliases): + """ + Substitutes keys in dictionary + Accepts multilevel mongo like keys + """ + for alias, key in aliases.items(): + if has(d, key): + set_(d, alias, get(d, key)) + unset(d, key) + + +def unset(d, key): + """ + Unsets a key + """ + _unset(d, key) + path = to_path(key) + for i in reversed(range(1, len(path))): + if len(get(d, path[:i])) == 0: + unset(d, path[:i]) diff --git a/requirements-optional.txt b/requirements-optional.txt index 68033521c..b78a1acfe 100644 --- a/requirements-optional.txt +++ b/requirements-optional.txt @@ -1,3 +1,4 @@ nose==1.3.4 mpi4py==3.0.0 numpy==1.14.0 +boto3==1.6.9 \ No newline at end of file