Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Useability updates #19

Merged
merged 11 commits into from
Mar 31, 2018
12 changes: 7 additions & 5 deletions examples/runner_sample.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Usage:
# with multiprocessing:
# python runner_sample.py
# with mpi(need mpi4py pacakge):
# mpiexec -n 5 python runner_sample.py
"""
Usage:
with multiprocessing:
python runner_sample.py
with mpi(need mpi4py pacakge):
mpiexec -n 5 python runner_sample.py
"""

from maggma.stores import MemoryStore
from maggma.builder import Builder
Expand Down
297 changes: 254 additions & 43 deletions maggma/advanced_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
"""
Advanced Stores for behavior outside normal access patterns
"""
from maggma.stores import Store, MongoStore
from pydash.objects import set_, get, has
from pydash.utilities import to_path
import pydash.objects
import os
import hvac
import json
import os
import boto3
import botocore
import zlib
from datetime import datetime
from maggma.stores import Store, MongoStore
from maggma.utils import lazy_substitute, substitute
from monty.json import jsanitize


class VaultStore(MongoStore):
"""
Extends MongoStore to read credentials out of Vault server
and uses these values to initialize MongoStore instance
"""

def __init__(self, collection_name, vault_secret_path):
"""
collection (string): name of mongo collection
Expand Down Expand Up @@ -58,12 +62,7 @@ def __init__(self, collection_name, vault_secret_path):
username = db_creds.get("username", "")
password = db_creds.get("password", "")

super(VaultStore, self).__init__(database,
collection_name,
host,
port,
username,
password)
super(VaultStore, self).__init__(database, collection_name, host, port, username, password)


class AliasingStore(Store):
Expand All @@ -77,7 +76,9 @@ def __init__(self, store, aliases, **kwargs):
aliases (dict): dict of aliases of the form external key: internal key
"""
self.store = store
# Given an external key tells what the internal key is
self.aliases = aliases
# Given the internal key tells us what the external key is
self.reverse_aliases = {v: k for k, v in aliases.items()}
self.kwargs = kwargs

Expand Down Expand Up @@ -109,11 +110,36 @@ def query_one(self, properties=None, criteria=None, **kwargs):
return d

def distinct(self, key, criteria=None, **kwargs):
if key in self.aliases:
key = self.aliases[key]
if isinstance(key, list):
criteria = criteria if criteria else {}
# Update to ensure keys are there
if all_exist:
criteria.update({k: {"$exists": True} for k in key if k not in criteria})

results = []
for d in self.groupby(key, properties=key, criteria=criteria):
results.append(d["_id"])
return results

else:
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.reverse_aliases)
key = self.aliases[key] if key in self.aliases else key
return self.collection.distinct(key, filter=criteria, **kwargs)

def groupby(self, keys, properties=None, criteria=None, **kwargs):
# Convert to a list
keys = keys if isinstance(keys, list) else [keys]

# Make the aliasing transformations on keys
keys = [self.aliases[k] if k in self.aliases else k for k in keys]

# Update criteria and properties based on aliases
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.aliases)
return self.store.distinct(key, criteria, **kwargs)
substitute(properties, self.reverse_aliases)
lazy_substitute(criteria, self.reverse_aliases)

return self.store.groupby(keys=keys, properties=properties, criteria=criteria, **kwargs)

def update(self, docs, update_lu=True, key=None):
key = key if key else self.key
Expand All @@ -126,10 +152,10 @@ def update(self, docs, update_lu=True, key=None):

self.store.update(docs, update_lu=update_lu, key=key)

def ensure_index(self, key, unique=False):
def ensure_index(self, key, unique=False, **kwargs):
if key in self.aliases:
key = self.aliases
return self.store.ensure_index(key, unique)
return self.store.ensure_index(key, unique, **kwargs)

def close(self):
self.store.close()
Expand All @@ -138,37 +164,222 @@ def close(self):
def collection(self):
return self.store.collection

def connect(self):
self.store.connect()
def connect(self, force_reset=False):
self.store.connect(force_reset=force_reset)


def lazy_substitute(d, aliases):
class AmazonS3Store(Store):
"""
Simple top level substitute that doesn't dive into mongo like strings
GridFS like storage using Amazon S3 and a regular store for indexing
Assumes Amazon AWS key and secret key are set in environment or default config file
"""
for alias, key in aliases.items():
if key in d:
d[alias] = d[key]
del d[key]

def __init__(self, index, bucket, **kwargs):
"""
Initializes an S3 Store
Args:
index (Store): a store to use to index the S3 Bucket
bucket (str) : name of the bucket
"""
self.index = index
self.bucket = bucket
self.s3 = None
self.s3_bucket = None
# Force the key to be the same as the index
kwargs["key"] = index.key
super(AmazonS3Store, self).__init__(**kwargs)

def connect(self, force_reset=False):
self.index.connect(force_reset=force_reset)
if not self.s3:
self.s3 = boto3.resource("s3")
# TODO: Provide configuration variable to create bucket if not present
if self.bucket not in self.s3.list_buckets():
raise Exception("Bucket not present on AWS: {}".format(self.bucket))
self.s3_bucket = self.s3.Bucket(self.bucket)

def substitute(d, aliases):
"""
Substitutes keys in dictionary
Accepts multilevel mongo like keys
"""
for alias, key in aliases.items():
if has(d, key):
set_(d, alias, get(d, key))
unset(d, key)
def close(self):
self.index.close()

@property
def collection(self):
# For now returns the index collection since that is what we would "search" on
return self.index

def unset(d, key):
"""
Unsets a key
"""
pydash.objects.unset(d, key)
path = to_path(key)
for i in reversed(range(1, len(path))):
if len(get(d, path[:i])) == 0:
unset(d, path[:i])
def query(self, properties=None, criteria=None, **kwargs):
"""
Function that gets data from Amazon S3. This store ignores all
property projections as its designed for whole document access

Args:
properties (list or dict): This will be ignored by the S3
Store
criteria (dict): filter for query, matches documents
against key-value pairs
**kwargs (kwargs): further kwargs to Collection.find
"""
for f in self.index.query(criteria=criteria, **kwargs):
try:
data = self.s3_bucket.Object(f[self.key]).get()
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the object does not exist.
error_code = int(e.response['Error']['Code'])
if error_code == 404:
self.logger.error("Could not find S3 object {}".format(f[self.key]))
break

if f.get("compression", "") is "zlib":
data = zlib.decompress(data)

yield json.loads(data)

def query_one(self, properties=None, criteria=None, **kwargs):
"""
Function that gets a single document from Amazon S3. This store
ignores all property projections as its designed for whole
document access

Args:
properties (list or dict): This will be ignored by the S3
Store
criteria (dict): filter for query, matches documents
against key-value pairs
**kwargs (kwargs): further kwargs to Collection.find
"""
f = self.index.query_one(criteria=criteria, **kwargs)
if f:
try:
data = self.s3_bucket.Object(f[self.key]).get()
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the object does not exist.
error_code = int(e.response['Error']['Code'])
if error_code == 404:
self.logger.error("Could not find S3 object {}".format(f[self.key]))
return None

if f.get("compression", "") is "zlib":
data = zlib.decompress(data)

return json.loads(data)
else:
return None

def distinct(self, key, criteria=None, all_exist=False, **kwargs):
"""
Function get to get all distinct values of a certain key in the
AmazonS3 Store. This searches the index collection for this data

Args:
key (mongolike key or list of mongolike keys): key or keys
for which to find distinct values or sets of values.
criteria (filter criteria): criteria for filter
all_exist (bool): whether to ensure all keys in list exist
in each document, defaults to False
**kwargs (kwargs): kwargs corresponding to collection.distinct
"""
# Index is a store so it should have its own distinct function
return self.index.distinct(key, filter=criteria, **kwargs)

def groupby(self, keys, properties=None, criteria=None, **kwargs):
"""
Simple grouping function that will group documents
by keys. Only searches the index collection

Args:
keys (list or string): fields to group documents
properties (list): properties to return in grouped documents
criteria (dict): filter for documents to group
allow_disk_use (bool): whether to allow disk use in aggregation

Returns:
command cursor corresponding to grouped documents

elements of the command cursor have the structure:
{'_id': {"KEY_1": value_1, "KEY_2": value_2 ...,
'docs': [list_of_documents corresponding to key values]}

"""
self.index.groupby(keys, properties, criteria, **kwargs)

def ensure_index(self, key, unique=False):
"""
Wrapper for pymongo.Collection.ensure_index for the files collection
"""
return self.index.ensure_index(key, unique=unique, background=True)

def update(self, docs, update_lu=True, key=None, compress=False):
"""
Function to update associated MongoStore collection.

Args:
docs ([dict]): list of documents
key ([str] or str): keys to use to build search doc
compress (bool): compress the document or not
"""
now = datetime.now()
search_docs = []
for d in docs:
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
elif key:
search_doc = {key: d[key]}
else:
search_doc = {}

# Always include our main key
search_doc[self.key] = d[self.key]

# Remove MongoDB _id from search
if "_id" in search_doc:
del search_doc["_id"]

# Add a timestamp
if update_lu:
search_doc[self.lu_field] = now
d[self.lu_field] = now

data = json.dumps(jsanitize(d)).encode()

# Compress with zlib if chosen
if compress:
search_doc["compression"] = "zlib"
data = zlib.compress(data)

self.s3_bucket.put_object(Key=d[self.key], Body=data, Metadata=search_doc)
search_docs.append(search_doc)

# Use store's update to remove key clashes
self.index.update(search_docs)

@property
def last_updated(self):
return self.index.last_updated

def lu_filter(self, targets):
"""Creates a MongoDB filter for new documents.

By "new", we mean documents in this Store that were last updated later
than any document in targets.

Args:
targets (list): A list of Stores

"""
self.index.lu_filter(targets)

def __hash__(self):
return hash((self.index.__hash__, self.bucket))

def rebuild_index_from_s3_data(self):
"""
Rebuilds the index Store from the data in S3
Relies on the index document being stores as the metadata for the file
"""
index_docs = []
for file in self.s3_bucket.objects.all():
# TODO: Transform the data back from strings and remove AWS S3 specific keys
index_docs.append(file.metadata)

self.index.update(index_docs)
4 changes: 3 additions & 1 deletion maggma/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import abc
import logging
import time
from collections import defaultdict, deque
from threading import Thread, Condition, BoundedSemaphore
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
Expand Down Expand Up @@ -225,6 +224,9 @@ def update_targets(self):


class MultiprocProcessor(BaseProcessor):
"""
Processor to run builders using python multiprocessing
"""
def __init__(self, builders, num_workers=None):
# multiprocessing only if mpi is not used, no mixing
self.num_workers = num_workers
Expand Down
Loading