Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to JointStore and MapBuilder #44

Merged
merged 37 commits into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e56ef02
only update when items available
shyamd Oct 8, 2018
e0abed7
Add debug messages
shyamd Oct 8, 2018
f067525
directly save warts
shyamd Oct 8, 2018
62333e2
Skip criteria building
shyamd Oct 8, 2018
126b75a
include query in key query
shyamd Oct 8, 2018
491e79a
Ensure lu_funcs are used
shyamd Oct 8, 2018
5360169
yield individual cursors
shyamd Oct 8, 2018
a5d183d
Chunk keys MapBuilder
shyamd Oct 8, 2018
dcdb410
unravel docs to root level
shyamd Oct 8, 2018
c979aae
Clear cursor before checking warnings
shyamd Oct 12, 2018
657cae9
Add unites import
shyamd Oct 12, 2018
e95aebf
Ref kwargs by name
shyamd Oct 12, 2018
35b17b2
proper iterating through chunked keys
shyamd Oct 12, 2018
bed0f50
Remove no longer valid test
shyamd Oct 12, 2018
10b31a8
remove old functions. Replaced with pydash
shyamd Oct 12, 2018
3d94b17
Raise exception if not mongodb3.6 instead of client merge
shyamd Oct 16, 2018
e105ef4
add logger to stores
shyamd Oct 16, 2018
bed8ff7
Fix check for mongodb version
shyamd Oct 17, 2018
1192ce2
only unwind if not merging
shyamd Oct 17, 2018
eb41a2a
Use dict in rather then get with None default
shyamd Oct 17, 2018
b925797
tests for merge_at_root
shyamd Oct 17, 2018
3dc7f83
Change name of get_criteria to get_keys
shyamd Oct 17, 2018
0cafb2d
Codacy Cleanup
shyamd Oct 17, 2018
df7b502
Allow mrun a builder
shyamd Oct 22, 2018
73d15b3
grab chunk of data
shyamd Oct 22, 2018
dc9b529
Error only message
shyamd Oct 22, 2018
4763629
Try mongodb 3.6 in travis
shyamd Oct 22, 2018
3566252
Bump mongo patch version
dwinston Oct 22, 2018
4970c54
Temp print to test travis
shyamd Oct 25, 2018
6f689a2
Let travis use add-on mongodb
shyamd Oct 25, 2018
6294114
Try starting mongod manually
shyamd Oct 25, 2018
d55e6d5
sudo?
shyamd Oct 25, 2018
332b034
Direct call to mongod
shyamd Oct 25, 2018
e1d10b1
manually start mongodb this time
shyamd Oct 26, 2018
9b666f5
Stay in same dir
shyamd Oct 26, 2018
8ee5aa9
update port
shyamd Oct 26, 2018
4e5099f
Noauth for test server
shyamd Oct 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
language: python
cache: pip
services: mongodb
sudo: required
python:
- "3.6"
install:
Expand All @@ -13,17 +11,23 @@ install:
before_script:
- python setup.py develop
- cd $HOME
- curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-3.6.5.tgz
- tar -zxvf mongodb-linux-x86_64-3.6.5.tgz
- curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-3.6.8.tgz
- tar -zxvf mongodb-linux-x86_64-3.6.8.tgz
- mkdir -p mongodbdata
- touch mongodblog
- |
mongodb-linux-x86_64-3.6.5/bin/mongod \
mongodb-linux-x86_64-3.6.8/bin/mongod \
--port 27020 --dbpath mongodbdata --logpath mongodblog \
--auth --bind_ip_all --fork
- |
mongodb-linux-x86_64-3.6.5/bin/mongo 127.0.0.1:27020/admin --eval \
mongodb-linux-x86_64-3.6.8/bin/mongo 127.0.0.1:27020/admin --eval \
'db.createUser({user:"mongoadmin",pwd:"mongoadminpass",roles:["root"]});'
- mkdir -p localdbdata
- touch localdblog
- |
mongodb-linux-x86_64-3.6.8/bin/mongod \
--port 27017 --dbpath localdbdata --logpath localdblog \
--noauth --bind_ip_all --fork
- cd -
script:
- mpiexec -n 2 python $PWD/maggma/tests/mpi_test.py
Expand Down
97 changes: 62 additions & 35 deletions maggma/advanced_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class MongograntStore(Mongolike, Store):

mongogrant documentation: https://github.com/materialsproject/mongogrant
"""
def __init__(self, mongogrant_spec, collection_name,
mgclient_config_path=None, **kwargs):

def __init__(self, mongogrant_spec, collection_name, mgclient_config_path=None, **kwargs):
"""

Args:
Expand All @@ -54,7 +54,7 @@ def __init__(self, mongogrant_spec, collection_name,
self.collection_name = collection_name
self.mgclient_config_path = mgclient_config_path
self._collection = None
if set(("username", "password","database", "host")) & set(kwargs):
if set(("username", "password", "database", "host")) & set(kwargs):
raise StoreError("MongograntStore does not accept "
"username, password, database, or host "
"arguments. Use `mongogrant_spec`.")
Expand All @@ -75,8 +75,7 @@ def __hash__(self):
return hash((self.mongogrant_spec, self.collection_name, self.lu_field))

def groupby(self, keys, criteria=None, properties=None, **kwargs):
return MongoStore.groupby(
self, keys, criteria=None, properties=None, **kwargs)
return MongoStore.groupby(self, keys, criteria=None, properties=None, **kwargs)


class VaultStore(MongoStore):
Expand Down Expand Up @@ -247,19 +246,19 @@ def __init__(self, store, sandbox, exclusive=False):
self.store = store
self.sandbox = sandbox
self.exclusive = exclusive
super().__init__(key=self.store.key,
lu_field=self.store.lu_field,
lu_type=self.store.lu_type,
validator=self.store.validator)
super().__init__(
key=self.store.key,
lu_field=self.store.lu_field,
lu_type=self.store.lu_type,
validator=self.store.validator)

@property
@lru_cache(maxsize=1)
def sbx_criteria(self):
if self.exclusive:
return {"sbxn": self.sandbox}
else:
return {"$or": [{"sbxn": {"$in": [self.sandbox]}},
{"sbxn": {"$exists": False}}]}
return {"$or": [{"sbxn": {"$in": [self.sandbox]}}, {"sbxn": {"$exists": False}}]}

def query(self, criteria=None, properties=None, **kwargs):
criteria = dict(**criteria, **self.sbx_criteria) if criteria else self.sbx_criteria
Expand Down Expand Up @@ -315,8 +314,7 @@ def __init__(self, index, bucket, **kwargs):
bucket (str) : name of the bucket
"""
if not boto_import:
raise ValueError("boto not available, please install boto3 to "
"use AmazonS3Store")
raise ValueError("boto not available, please install boto3 to " "use AmazonS3Store")
self.index = index
self.bucket = bucket
self.s3 = None
Expand Down Expand Up @@ -523,8 +521,17 @@ def rebuild_index_from_s3_data(self):

class JointStore(Store):
"""Store corresponding to multiple collections, uses lookup to join"""
def __init__(self, database, collection_names, host="localhost",
port=27017, username="", password="", master=None, **kwargs):

def __init__(self,
database,
collection_names,
host="localhost",
port=27017,
username="",
password="",
master=None,
merge_at_root=False,
**kwargs):
self.database = database
self.collection_names = collection_names
self.host = host
Expand All @@ -533,6 +540,7 @@ def __init__(self, database, collection_names, host="localhost",
self.password = password
self._collection = None
self.master = master or collection_names[0]
self.merge_at_root = merge_at_root
self.kwargs = kwargs
super(JointStore, self).__init__(**kwargs)

Expand All @@ -542,6 +550,7 @@ def connect(self, force_reset=False):
if self.username is not "":
db.authenticate(self.username, self.password)
self._collection = db[self.master]
self._has_merge_objects = self._collection.database.client.server_info()["version"] > "3.6"

def close(self):
self.collection.database.client.close()
Expand All @@ -554,17 +563,11 @@ def collection(self):
def nonmaster_names(self):
return list(set(self.collection_names) - {self.master})

def query(self, criteria=None, properties=None, **kwargs):
pipeline = self._get_pipeline(criteria=criteria, properties=properties)
return self.collection.aggregate(pipeline, **kwargs)

@property
def last_updated(self):
lus = []
for cname in self.collection_names:
lu = MongoStore.from_collection(
self.collection.database[cname],
lu_field=self.lu_field).last_updated
lu = MongoStore.from_collection(self.collection.database[cname], lu_field=self.lu_field).last_updated
lus.append(lu)
return max(lus)

Expand All @@ -579,8 +582,7 @@ def distinct(self, key, criteria=None, all_exist=True, **kwargs):
g_key = key if isinstance(key, list) else [key]
if all_exist:
criteria = criteria or {}
criteria.update({k: {"$exists": True} for k in g_key
if k not in criteria})
criteria.update({k: {"$exists": True} for k in g_key if k not in criteria})
cursor = self.groupby(g_key, criteria=criteria, **kwargs)
if isinstance(key, list):
return [d['_id'] for d in cursor]
Expand All @@ -605,16 +607,34 @@ def _get_pipeline(self, criteria=None, properties=None):
for cname in self.collection_names:
if cname is not self.master:
pipeline.append({
"$lookup": {"from": cname, "localField": self.key,
"foreignField": self.key, "as": cname}})
pipeline.append({
"$unwind": {"path": "${}".format(cname),
"preserveNullAndEmptyArrays": True}})
"$lookup": {
"from": cname,
"localField": self.key,
"foreignField": self.key,
"as": cname
}
})

if self.merge_at_root:
if not self._has_merge_objects:
raise Exception(
"MongoDB server version too low to use $mergeObjects.")

pipeline.append({
"$replaceRoot": {
"newRoot": {
"$mergeObjects": [{
"$arrayElemAt": ["${}".format(cname), 0]
}, "$$ROOT"]
}
}
})
else:
pipeline.append({"$unwind": {"path": "${}".format(cname), "preserveNullAndEmptyArrays": True}})

# Do projection for max last_updated
lu_max_fields = ["${}".format(self.lu_field)]
lu_max_fields.extend(["${}.{}".format(cname, self.lu_field)
for cname in self.collection_names])
lu_max_fields.extend(["${}.{}".format(cname, self.lu_field) for cname in self.collection_names])
lu_proj = {self.lu_field: {"$max": lu_max_fields}}
pipeline.append({"$addFields": lu_proj})

Expand All @@ -624,19 +644,26 @@ def _get_pipeline(self, criteria=None, properties=None):
properties = {k: 1 for k in properties}
if properties:
pipeline.append({"$project": properties})

return pipeline

def query(self, criteria=None, properties=None, **kwargs):
pipeline = self._get_pipeline(criteria=criteria, properties=properties)
agg = self.collection.aggregate(pipeline, **kwargs)
return agg

def groupby(self, keys, criteria=None, properties=None, **kwargs):
pipeline = self._get_pipeline(criteria=criteria, properties=properties)
if not isinstance(keys, list):
keys = [keys]
group_id = {}
for key in keys:
set_(group_id, key, "${}".format(key))
pipeline.append({"$group": {"_id": group_id,
"docs": {"$push": "$$ROOT"}}})
pipeline.append({"$group": {"_id": group_id, "docs": {"$push": "$$ROOT"}}})

agg = self.collection.aggregate(pipeline, **kwargs)

return self.collection.aggregate(pipeline, **kwargs)
return agg

def query_one(self, criteria=None, properties=None, **kwargs):
"""
Expand All @@ -655,7 +682,7 @@ def query_one(self, criteria=None, properties=None, **kwargs):
# pipeline.append({"$limit": 1})
query = self.query(criteria=criteria, properties=properties, **kwargs)
try:
doc = query.next()
doc = next(query)
return doc
except StopIteration:
return None
3 changes: 2 additions & 1 deletion maggma/cli/mrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def main():
# This is a runner:
root.info("Changing number of workers from default in input file")
runner = Runner(objects.builders, args.num_workers, mpi=args.mpi)
else:
elif isinstance(objects, Builder):
runner = Runner([objects], args.num_workers, mpi=args.mpi)
root.error("Couldn't properly read the builder file.")

if not args.dry_run:
Expand Down
Loading