diff --git a/src/maggma/api/resource/post_resource.py b/src/maggma/api/resource/post_resource.py index 6cf846959..4b816a24b 100644 --- a/src/maggma/api/resource/post_resource.py +++ b/src/maggma/api/resource/post_resource.py @@ -9,7 +9,7 @@ from maggma.api.models import Meta, Response from maggma.api.query_operator import PaginationQuery, QueryOperator, SparseFieldsQuery from maggma.api.resource import Resource -from maggma.api.resource.utils import attach_query_ops +from maggma.api.resource.utils import attach_query_ops, generate_query_pipeline from maggma.api.utils import STORE_PARAMS, merge_queries from maggma.core import Store from maggma.stores import S3Store @@ -111,28 +111,8 @@ def search(**queries: Dict[str, STORE_PARAMS]) -> Dict: if isinstance(self.store, S3Store): data = list(self.store.query(**query)) # type: ignore else: - pipeline = [ - {"$match": query["criteria"]}, - ] - sort_dict = {"$sort": {}} # type: dict - - if query.get("sort", False): - sort_dict["$sort"].update(query["sort"]) - - sort_dict["$sort"].update({self.store.key: 1}) # Ensures sort by key is last in dict - - projection_dict = {"$project": {"_id": 0}} # Do not return _id by default - - if query.get("properties", False): - projection_dict["$project"].update({p: 1 for p in query["properties"]}) - - pipeline.append(sort_dict) - pipeline.append(projection_dict) - pipeline.append({"$skip": query["skip"] if "skip" in query else 0}) - - if query.get("limit", False): - pipeline.append({"$limit": query["limit"]}) + pipeline = generate_query_pipeline(query, self.store) data = list( self.store._collection.aggregate( diff --git a/src/maggma/api/resource/read_resource.py b/src/maggma/api/resource/read_resource.py index bcfce380a..8d88f6cb5 100644 --- a/src/maggma/api/resource/read_resource.py +++ b/src/maggma/api/resource/read_resource.py @@ -11,7 +11,7 @@ from maggma.api.models import Response as ResponseModel from maggma.api.query_operator import PaginationQuery, QueryOperator, SparseFieldsQuery from maggma.api.resource import Resource, HintScheme, HeaderProcessor -from maggma.api.resource.utils import attach_query_ops +from maggma.api.resource.utils import attach_query_ops, generate_query_pipeline from maggma.api.utils import STORE_PARAMS, merge_queries, serialization_helper from maggma.core import Store from maggma.stores import MongoStore, S3Store @@ -237,28 +237,7 @@ def search(**queries: Dict[str, STORE_PARAMS]) -> Union[Dict, Response]: data = list(self.store.query(**query)) else: - pipeline = [ - {"$match": query["criteria"]}, - ] - - sort_dict = {"$sort": {}} # type: dict - - if query.get("sort", False): - sort_dict["$sort"].update(query["sort"]) - - sort_dict["$sort"].update({self.store.key: 1}) # Ensures sort by key is last in dict - - projection_dict = {"$project": {"_id": 0}} # Do not return _id by default - - if query.get("properties", False): - projection_dict["$project"].update({p: 1 for p in query["properties"]}) - - pipeline.append(sort_dict) - pipeline.append(projection_dict) - pipeline.append({"$skip": query["skip"] if "skip" in query else 0}) - - if query.get("limit", False): - pipeline.append({"$limit": query["limit"]}) + pipeline = generate_query_pipeline(query, self.store) data = list( self.store._collection.aggregate( diff --git a/src/maggma/api/resource/submission.py b/src/maggma/api/resource/submission.py index 4531aaac1..61454a3c9 100644 --- a/src/maggma/api/resource/submission.py +++ b/src/maggma/api/resource/submission.py @@ -12,7 +12,7 @@ from maggma.api.models import Meta, Response from maggma.api.query_operator import QueryOperator, SubmissionQuery from maggma.api.resource import Resource -from maggma.api.resource.utils import attach_query_ops +from maggma.api.resource.utils import attach_query_ops, generate_query_pipeline from maggma.api.utils import STORE_PARAMS, merge_queries from maggma.core import Store from maggma.stores import S3Store @@ -214,28 +214,7 @@ def search(**queries: STORE_PARAMS): data = list(self.store.query(**query)) # type: ignore else: - pipeline = [ - {"$match": query["criteria"]}, - ] - - sort_dict = {"$sort": {}} # type: dict - - if query.get("sort", False): - sort_dict["$sort"].update(query["sort"]) - - sort_dict["$sort"].update({self.store.key: 1}) # Ensures sort by key is last in dict - - projection_dict = {"$project": {"_id": 0}} # Do not return _id by default - - if query.get("properties", False): - projection_dict["$project"].update({p: 1 for p in query["properties"]}) - - pipeline.append(sort_dict) - pipeline.append(projection_dict) - pipeline.append({"$skip": query["skip"] if "skip" in query else 0}) - - if query.get("limit", False): - pipeline.append({"$limit": query["limit"]}) + pipeline = generate_query_pipeline(query, self.store) data = list( self.store._collection.aggregate( diff --git a/src/maggma/api/resource/utils.py b/src/maggma/api/resource/utils.py index 7921d85cb..9cbc11a11 100644 --- a/src/maggma/api/resource/utils.py +++ b/src/maggma/api/resource/utils.py @@ -4,6 +4,7 @@ from maggma.api.query_operator import QueryOperator from maggma.api.utils import STORE_PARAMS, attach_signature +from maggma.core.store import Store def attach_query_ops( @@ -26,3 +27,38 @@ def attach_query_ops( defaults={f"dep{i}": Depends(dep.query) for i, dep in enumerate(query_ops)}, ) return function + + +def generate_query_pipeline(query: dict, store: Store): + """ + Generate the generic aggregation pipeline used in GET endpoint queries + + Args: + query: Query parameters + store: Store containing endpoint data + """ + pipeline = [ + {"$match": query["criteria"]}, + ] + + sort_dict = {"$sort": {}} # type: dict + + if query.get("sort", False): + sort_dict["$sort"].update(query["sort"]) + + sort_dict["$sort"].update({store.key: 1}) # Ensures sort by key is last in dict + + projection_dict = {"_id": 0} # Do not return _id by default + + if query.get("properties", False): + projection_dict.update({p: 1 for p in query["properties"]}) + + pipeline.append({"$project": {**projection_dict, store.key: 1}}) + pipeline.append(sort_dict) + pipeline.append({"$project": projection_dict}) + pipeline.append({"$skip": query["skip"] if "skip" in query else 0}) + + if query.get("limit", False): + pipeline.append({"$limit": query["limit"]}) + + return pipeline