Skip to content

Commit

Permalink
Merge pull request #767 from materialsproject/bugfix/pipeline_memory_…
Browse files Browse the repository at this point in the history
…fixes

Query pipeline out of memory fix
  • Loading branch information
Jason Munro authored Jan 23, 2023
2 parents f499b24 + c2369db commit 0ef72f2
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 68 deletions.
24 changes: 2 additions & 22 deletions src/maggma/api/resource/post_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from maggma.api.models import Meta, Response
from maggma.api.query_operator import PaginationQuery, QueryOperator, SparseFieldsQuery
from maggma.api.resource import Resource
from maggma.api.resource.utils import attach_query_ops
from maggma.api.resource.utils import attach_query_ops, generate_query_pipeline
from maggma.api.utils import STORE_PARAMS, merge_queries
from maggma.core import Store
from maggma.stores import S3Store
Expand Down Expand Up @@ -111,28 +111,8 @@ def search(**queries: Dict[str, STORE_PARAMS]) -> Dict:
if isinstance(self.store, S3Store):
data = list(self.store.query(**query)) # type: ignore
else:
pipeline = [
{"$match": query["criteria"]},
]

sort_dict = {"$sort": {}} # type: dict

if query.get("sort", False):
sort_dict["$sort"].update(query["sort"])

sort_dict["$sort"].update({self.store.key: 1}) # Ensures sort by key is last in dict

projection_dict = {"$project": {"_id": 0}} # Do not return _id by default

if query.get("properties", False):
projection_dict["$project"].update({p: 1 for p in query["properties"]})

pipeline.append(sort_dict)
pipeline.append(projection_dict)
pipeline.append({"$skip": query["skip"] if "skip" in query else 0})

if query.get("limit", False):
pipeline.append({"$limit": query["limit"]})
pipeline = generate_query_pipeline(query, self.store)

data = list(
self.store._collection.aggregate(
Expand Down
25 changes: 2 additions & 23 deletions src/maggma/api/resource/read_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from maggma.api.models import Response as ResponseModel
from maggma.api.query_operator import PaginationQuery, QueryOperator, SparseFieldsQuery
from maggma.api.resource import Resource, HintScheme, HeaderProcessor
from maggma.api.resource.utils import attach_query_ops
from maggma.api.resource.utils import attach_query_ops, generate_query_pipeline
from maggma.api.utils import STORE_PARAMS, merge_queries, serialization_helper
from maggma.core import Store
from maggma.stores import MongoStore, S3Store
Expand Down Expand Up @@ -237,28 +237,7 @@ def search(**queries: Dict[str, STORE_PARAMS]) -> Union[Dict, Response]:
data = list(self.store.query(**query))
else:

pipeline = [
{"$match": query["criteria"]},
]

sort_dict = {"$sort": {}} # type: dict

if query.get("sort", False):
sort_dict["$sort"].update(query["sort"])

sort_dict["$sort"].update({self.store.key: 1}) # Ensures sort by key is last in dict

projection_dict = {"$project": {"_id": 0}} # Do not return _id by default

if query.get("properties", False):
projection_dict["$project"].update({p: 1 for p in query["properties"]})

pipeline.append(sort_dict)
pipeline.append(projection_dict)
pipeline.append({"$skip": query["skip"] if "skip" in query else 0})

if query.get("limit", False):
pipeline.append({"$limit": query["limit"]})
pipeline = generate_query_pipeline(query, self.store)

data = list(
self.store._collection.aggregate(
Expand Down
25 changes: 2 additions & 23 deletions src/maggma/api/resource/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from maggma.api.models import Meta, Response
from maggma.api.query_operator import QueryOperator, SubmissionQuery
from maggma.api.resource import Resource
from maggma.api.resource.utils import attach_query_ops
from maggma.api.resource.utils import attach_query_ops, generate_query_pipeline
from maggma.api.utils import STORE_PARAMS, merge_queries
from maggma.core import Store
from maggma.stores import S3Store
Expand Down Expand Up @@ -214,28 +214,7 @@ def search(**queries: STORE_PARAMS):
data = list(self.store.query(**query)) # type: ignore
else:

pipeline = [
{"$match": query["criteria"]},
]

sort_dict = {"$sort": {}} # type: dict

if query.get("sort", False):
sort_dict["$sort"].update(query["sort"])

sort_dict["$sort"].update({self.store.key: 1}) # Ensures sort by key is last in dict

projection_dict = {"$project": {"_id": 0}} # Do not return _id by default

if query.get("properties", False):
projection_dict["$project"].update({p: 1 for p in query["properties"]})

pipeline.append(sort_dict)
pipeline.append(projection_dict)
pipeline.append({"$skip": query["skip"] if "skip" in query else 0})

if query.get("limit", False):
pipeline.append({"$limit": query["limit"]})
pipeline = generate_query_pipeline(query, self.store)

data = list(
self.store._collection.aggregate(
Expand Down
36 changes: 36 additions & 0 deletions src/maggma/api/resource/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from maggma.api.query_operator import QueryOperator
from maggma.api.utils import STORE_PARAMS, attach_signature
from maggma.core.store import Store


def attach_query_ops(
Expand All @@ -26,3 +27,38 @@ def attach_query_ops(
defaults={f"dep{i}": Depends(dep.query) for i, dep in enumerate(query_ops)},
)
return function


def generate_query_pipeline(query: dict, store: Store):
"""
Generate the generic aggregation pipeline used in GET endpoint queries
Args:
query: Query parameters
store: Store containing endpoint data
"""
pipeline = [
{"$match": query["criteria"]},
]

sort_dict = {"$sort": {}} # type: dict

if query.get("sort", False):
sort_dict["$sort"].update(query["sort"])

sort_dict["$sort"].update({store.key: 1}) # Ensures sort by key is last in dict

projection_dict = {"_id": 0} # Do not return _id by default

if query.get("properties", False):
projection_dict.update({p: 1 for p in query["properties"]})

pipeline.append({"$project": {**projection_dict, store.key: 1}})
pipeline.append(sort_dict)
pipeline.append({"$project": projection_dict})
pipeline.append({"$skip": query["skip"] if "skip" in query else 0})

if query.get("limit", False):
pipeline.append({"$limit": query["limit"]})

return pipeline

0 comments on commit 0ef72f2

Please sign in to comment.