Skip to content

Commit

Permalink
Merge pull request #338 from materialsproject/fix-mongo-update-too-large
Browse files Browse the repository at this point in the history
Ensure MongoStore can safely continue updating when documents are too large
  • Loading branch information
shyamd authored Nov 19, 2020
2 parents 22dddba + 0a24712 commit b3ec833
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 7 deletions.
6 changes: 5 additions & 1 deletion src/maggma/core/drone.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ def parent_directory(self) -> Path:
:return:
"""
paths = [doc.path.as_posix() for doc in self.documents]
return Path(os.path.commonprefix(paths))
parent_path = Path(os.path.commonprefix(paths))
if not parent_path.is_dir():
return parent_path.parent

return parent_path

def compute_state_hash(self) -> str:
"""
Expand Down
18 changes: 17 additions & 1 deletion src/maggma/stores/mongolike.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
username: str = "",
password: str = "",
ssh_tunnel: Optional[SSHTunnel] = None,
safe_update: bool = False,
**kwargs,
):
"""
Expand All @@ -50,6 +51,7 @@ def __init__(
port: TCP port to connect to
username: Username for the collection
password: Password to connect with
safe_update: fail gracefully on DocumentTooLarge errors on update
"""
self.database = database
self.collection_name = collection_name
Expand All @@ -58,6 +60,7 @@ def __init__(
self.username = username
self.password = password
self.ssh_tunnel = ssh_tunnel
self.safe_update = safe_update
self._collection = None # type: Any
self.kwargs = kwargs
super().__init__(**kwargs)
Expand Down Expand Up @@ -314,7 +317,20 @@ def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = No
requests.append(ReplaceOne(search_doc, d, upsert=True))

if len(requests) > 0:
self._collection.bulk_write(requests, ordered=False)
try:
self._collection.bulk_write(requests, ordered=False)
except (OperationFailure, DocumentTooLarge) as e:
if self.safe_update:
for req in requests:
req._filter
try:
self._collection.bulk_write([req], ordered=False)
except (OperationFailure, DocumentTooLarge):
self.logger.error(
f"Could not upload document for {req._filter} as it was too large for Mongo"
)
else:
raise e

def remove_docs(self, criteria: Dict):
"""
Expand Down
14 changes: 10 additions & 4 deletions tests/builders/test_simple_bib_drone.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ def test_process_item(init_drone: SimpleBibDrone):
:return:
None
"""
list_record_id = init_drone.read(init_drone.path)
data = init_drone.process_item(list_record_id[0])
list_record_id = list(init_drone.read(init_drone.path))
text_record = next(
d for d in list_record_id if any("text" in f.name for f in d.documents)
)
data = init_drone.process_item(text_record)
assert "citations" in data
assert "text" in data
assert "record_key" in data
Expand Down Expand Up @@ -125,7 +128,10 @@ def test_compute_data(init_drone: SimpleBibDrone):
:return:
None
"""
list_record_id = init_drone.read(init_drone.path)
data = init_drone.process_item(list_record_id[0])
list_record_id = list(init_drone.read(init_drone.path))
text_record = next(
d for d in list_record_id if any("text" in f.name for f in d.documents)
)
data = init_drone.process_item(text_record)
assert "citations" in data
assert "text" in data
13 changes: 12 additions & 1 deletion tests/stores/test_mongolike.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import mongomock.collection
import pymongo.collection
from pymongo.errors import OperationFailure, DocumentTooLarge
import pytest

from maggma.core import StoreError
Expand Down Expand Up @@ -106,9 +107,19 @@ def test_mongostore_update(mongostore):
mongostore.validator = JSONSchemaValidator(schema=test_schema)
mongostore.update({"e": 100, "d": 3}, key="e")

# Non strict update
# Continue to update doc when validator is not set to strict mode
mongostore.update({"e": "abc", "d": 3}, key="e")

# ensure safe_update works to not throw DocumentTooLarge errors
large_doc = {f"mp-{i}": f"mp-{i}" for i in range(1000000)}
large_doc["e"] = 999
with pytest.raises((OperationFailure, DocumentTooLarge)):
mongostore.update([large_doc, {"e": 1001}], key="e")

mongostore.safe_update = True
mongostore.update([large_doc, {"e": 1001}], key="e")
assert mongostore.query_one({"e": 1001}) is not None


def test_mongostore_groupby(mongostore):
mongostore.update(
Expand Down

0 comments on commit b3ec833

Please sign in to comment.