From 35660591ce917ece78a0d970881ef0d8fd6b3894 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 20 Jan 2020 11:12:28 -0800 Subject: [PATCH] refactor group_builder to be many-to-one --- src/maggma/builders/group_builder.py | 112 ++++++++++++++------------- 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/src/maggma/builders/group_builder.py b/src/maggma/builders/group_builder.py index b6294ba6b..abc902044 100644 --- a/src/maggma/builders/group_builder.py +++ b/src/maggma/builders/group_builder.py @@ -3,14 +3,14 @@ Many-to-Many GroupBuilder """ import traceback -from abc import ABCMeta, abstractproperty +from abc import ABCMeta, abstractmethod from time import time from datetime import datetime +from maggma.core import Store from maggma.utils import grouper, Timeout from maggma.builders import MapBuilder from typing import Dict, List, Iterator, Iterable, Set, Tuple from pydash import get -from itertools import chain class GroupBuilder(MapBuilder, metaclass=ABCMeta): @@ -18,8 +18,16 @@ class GroupBuilder(MapBuilder, metaclass=ABCMeta): Group source docs and produces merged documents for each group Supports incremental building, where a source group gets (re)built only if it has a newer (by last_updated_field) doc than the corresponding (by key) target doc. + + This is a Many-to-One Builder """ + def __init__( + self, source: Store, target: Store, grouping_keys: List[str], **kwargs + ): + self.grouping_keys = grouping_keys + super().__init__(source=source, target=target, **kwargs) + def ensure_indexes(self): """ Ensures indicies on critical fields for GroupBuilder @@ -50,12 +58,25 @@ def get_ids_to_process(self) -> Iterable: query = self.query or {} - processed_ids = set(self.target.distinct(f"{self.target.key}s", criteria=query)) + distinct_from_target = list( + self.target.distinct(f"{self.source.key}s", criteria=query) + ) + processed_ids = [] + # Not always gauranteed that MongoDB will unpack the list so we + # have to make sure we do that + for d in distinct_from_target: + if isinstance(d, list): + processed_ids.extend(d) + else: + processed_ids.append(d) + all_ids = set(self.source.distinct(self.source.key, criteria=query)) - unprocessed_ids = all_ids - processed_ids + unprocessed_ids = all_ids - set(processed_ids) self.logger.info(f"Found {len(unprocessed_ids)} to process") - new_ids = self.source.newer_in(self.target, criteria=query, exhaustive=False) + new_ids = set( + self.source.newer_in(self.target, criteria=query, exhaustive=False) + ) return list(new_ids | unprocessed_ids) @@ -63,19 +84,24 @@ def get_groups_from_keys(self, keys) -> Set[Tuple]: """ Get the groups by grouping_keys for these documents """ + grouping_keys = self.grouping_keys groups: Set[Tuple] = set() - for chunked_keys in grouper(keys, self.chunk_size, None): - chunked_keys = list(filter(None.__ne__, chunked_keys)) - docs = list( - self.source.query( - criteria={self.source.key: {"$in": chunked_keys}}, - properties=grouping_keys, + for chunked_keys in grouper(keys, self.chunk_size): + chunked_keys = chunked_keys + + docs = [ + d[0] + for d in self.source.groupby( + grouping_keys, criteria={self.source.key: {"$in": chunked_keys}} ) + ] + + groups |= set( + tuple(get(d, prop, None) for prop in grouping_keys) for d in docs ) - groups |= set((get(prop, d, None) for prop in grouping_keys) for d in docs) self.logger.info(f"Found {len(groups)} to process") return groups @@ -99,67 +125,51 @@ def get_items(self): for group in groups: docs = list( self.source.query( - criteria=dict(zip(self.grouping_keys, group)), projection=projection + criteria=dict(zip(self.grouping_keys, group)), properties=projection ) ) yield docs def process_item(self, item: List[Dict]) -> Dict[Tuple, Dict]: - keys = (d[self.source.key] for d in item) + keys = list(d[self.source.key] for d in item) + self.logger.debug("Processing: {}".format(keys)) time_start = time() try: with Timeout(seconds=self.timeout): - processed = self.grouped_unary_function(item) - for _, d in processed: - d.update({"state": "successful"}) + processed = self.unary_function(item) + processed.update({"state": "successful"}) except Exception as e: self.logger.error(traceback.format_exc()) - processed = {keys: {"error": str(e), "state": "failed"}} + processed = {"error": str(e), "state": "failed"} time_end = time() - for doc_keys, doc in processed.items(): - last_updated = [item[k][self.source.last_updated_field] for k in doc_keys] - last_updated = [self.source._lu_func[0](lu) for lu in last_updated] - - doc.update( - { - f"{self.target.key}s": doc_keys, - self.target.last_updated_field: max(last_updated), - "_bt": datetime.utcnow(), - self.target.key: doc_keys[0], - } - ) + last_updated = [d[self.source.last_updated_field] for d in item] + last_updated = [self.source._lu_func[0](lu) for lu in last_updated] - if self.store_process_time: - for doc in processed.values(): - doc["_process_time"] = time_end - time_start - - return list(processed.values()) - - def update_targets(self, items: List[List[Dict]]): - """ - Generic update targets for Map Builder - """ - items = list(chain.from_iterable(items)) + processed.update( + { + self.target.key: keys[0], + f"{self.source.key}s": keys, + self.target.last_updated_field: max(last_updated), + "_bt": datetime.utcnow(), + } + ) - for item in items: - # Add the built time - if "_id" in item: - del item["_id"] + if self.store_process_time: + processed["_process_time"] = time_end - time_start - if len(items) > 0: - self.target.update(items) + return processed - def grouped_unary_function(self, items: List[Dict]) -> Dict[Tuple, Dict]: + @abstractmethod + def unary_function(self, items: List[Dict]) -> Dict: """ Processing function for GroupBuilder - Returns: Dictionary mapping: tuple of source document keys that are in the grouped document @@ -167,7 +177,3 @@ def grouped_unary_function(self, items: List[Dict]) -> Dict[Tuple, Dict]: the grouped and processed document """ pass - - @abstractproperty - def grouping_keys(self) -> List[str]: - pass