Skip to content

Commit

Permalink
refactor group_builder to be many-to-one
Browse files Browse the repository at this point in the history
  • Loading branch information
shyamd committed Jan 20, 2020
1 parent 7dd0928 commit 3566059
Showing 1 changed file with 59 additions and 53 deletions.
112 changes: 59 additions & 53 deletions src/maggma/builders/group_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@
Many-to-Many GroupBuilder
"""
import traceback
from abc import ABCMeta, abstractproperty
from abc import ABCMeta, abstractmethod
from time import time
from datetime import datetime
from maggma.core import Store
from maggma.utils import grouper, Timeout
from maggma.builders import MapBuilder
from typing import Dict, List, Iterator, Iterable, Set, Tuple
from pydash import get
from itertools import chain


class GroupBuilder(MapBuilder, metaclass=ABCMeta):
"""
Group source docs and produces merged documents for each group
Supports incremental building, where a source group gets (re)built only if
it has a newer (by last_updated_field) doc than the corresponding (by key) target doc.
This is a Many-to-One Builder
"""

def __init__(
self, source: Store, target: Store, grouping_keys: List[str], **kwargs
):
self.grouping_keys = grouping_keys
super().__init__(source=source, target=target, **kwargs)

def ensure_indexes(self):
"""
Ensures indicies on critical fields for GroupBuilder
Expand Down Expand Up @@ -50,32 +58,50 @@ def get_ids_to_process(self) -> Iterable:

query = self.query or {}

processed_ids = set(self.target.distinct(f"{self.target.key}s", criteria=query))
distinct_from_target = list(
self.target.distinct(f"{self.source.key}s", criteria=query)
)
processed_ids = []
# Not always gauranteed that MongoDB will unpack the list so we
# have to make sure we do that
for d in distinct_from_target:
if isinstance(d, list):
processed_ids.extend(d)
else:
processed_ids.append(d)

all_ids = set(self.source.distinct(self.source.key, criteria=query))
unprocessed_ids = all_ids - processed_ids
unprocessed_ids = all_ids - set(processed_ids)
self.logger.info(f"Found {len(unprocessed_ids)} to process")

new_ids = self.source.newer_in(self.target, criteria=query, exhaustive=False)
new_ids = set(
self.source.newer_in(self.target, criteria=query, exhaustive=False)
)

return list(new_ids | unprocessed_ids)

def get_groups_from_keys(self, keys) -> Set[Tuple]:
"""
Get the groups by grouping_keys for these documents
"""

grouping_keys = self.grouping_keys

groups: Set[Tuple] = set()
for chunked_keys in grouper(keys, self.chunk_size, None):
chunked_keys = list(filter(None.__ne__, chunked_keys))

docs = list(
self.source.query(
criteria={self.source.key: {"$in": chunked_keys}},
properties=grouping_keys,
for chunked_keys in grouper(keys, self.chunk_size):
chunked_keys = chunked_keys

docs = [
d[0]
for d in self.source.groupby(
grouping_keys, criteria={self.source.key: {"$in": chunked_keys}}
)
]

groups |= set(
tuple(get(d, prop, None) for prop in grouping_keys) for d in docs
)
groups |= set((get(prop, d, None) for prop in grouping_keys) for d in docs)

self.logger.info(f"Found {len(groups)} to process")
return groups
Expand All @@ -99,75 +125,55 @@ def get_items(self):
for group in groups:
docs = list(
self.source.query(
criteria=dict(zip(self.grouping_keys, group)), projection=projection
criteria=dict(zip(self.grouping_keys, group)), properties=projection
)
)
yield docs

def process_item(self, item: List[Dict]) -> Dict[Tuple, Dict]:

keys = (d[self.source.key] for d in item)
keys = list(d[self.source.key] for d in item)

self.logger.debug("Processing: {}".format(keys))

time_start = time()

try:
with Timeout(seconds=self.timeout):
processed = self.grouped_unary_function(item)
for _, d in processed:
d.update({"state": "successful"})
processed = self.unary_function(item)
processed.update({"state": "successful"})
except Exception as e:
self.logger.error(traceback.format_exc())
processed = {keys: {"error": str(e), "state": "failed"}}
processed = {"error": str(e), "state": "failed"}

time_end = time()

for doc_keys, doc in processed.items():
last_updated = [item[k][self.source.last_updated_field] for k in doc_keys]
last_updated = [self.source._lu_func[0](lu) for lu in last_updated]

doc.update(
{
f"{self.target.key}s": doc_keys,
self.target.last_updated_field: max(last_updated),
"_bt": datetime.utcnow(),
self.target.key: doc_keys[0],
}
)
last_updated = [d[self.source.last_updated_field] for d in item]
last_updated = [self.source._lu_func[0](lu) for lu in last_updated]

if self.store_process_time:
for doc in processed.values():
doc["_process_time"] = time_end - time_start

return list(processed.values())

def update_targets(self, items: List[List[Dict]]):
"""
Generic update targets for Map Builder
"""
items = list(chain.from_iterable(items))
processed.update(
{
self.target.key: keys[0],
f"{self.source.key}s": keys,
self.target.last_updated_field: max(last_updated),
"_bt": datetime.utcnow(),
}
)

for item in items:
# Add the built time
if "_id" in item:
del item["_id"]
if self.store_process_time:
processed["_process_time"] = time_end - time_start

if len(items) > 0:
self.target.update(items)
return processed

def grouped_unary_function(self, items: List[Dict]) -> Dict[Tuple, Dict]:
@abstractmethod
def unary_function(self, items: List[Dict]) -> Dict:
"""
Processing function for GroupBuilder
Returns:
Dictionary mapping:
tuple of source document keys that are in the grouped document
to
the grouped and processed document
"""
pass

@abstractproperty
def grouping_keys(self) -> List[str]:
pass

0 comments on commit 3566059

Please sign in to comment.