From 34c8dd6dbe7ce20211467769176518b35ee63107 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 11:37:45 -0700 Subject: [PATCH 01/11] fix async leave one out error --- src/maggma/cli/multiprocessing.py | 200 +++++++++++++++++++++--------- 1 file changed, 144 insertions(+), 56 deletions(-) diff --git a/src/maggma/cli/multiprocessing.py b/src/maggma/cli/multiprocessing.py index 91a6e389a..ffbd8f6df 100644 --- a/src/maggma/cli/multiprocessing.py +++ b/src/maggma/cli/multiprocessing.py @@ -3,8 +3,19 @@ from logging import getLogger from types import GeneratorType -from asyncio import BoundedSemaphore, get_running_loop, gather -from aioitertools import zip_longest +from asyncio import ( + BoundedSemaphore, + get_running_loop, + Queue, + create_task, + Condition, + wait, + FIRST_COMPLETED, + Event, + gather, +) + +from aioitertools import islice, enumerate from concurrent.futures import ProcessPoolExecutor from maggma.utils import primed from tqdm import tqdm @@ -13,75 +24,141 @@ logger = getLogger("MultiProcessor") -class ProcessItemsSemaphore(BoundedSemaphore): +class BackPressure: """ - Modified BoundedSemaphore to update a TQDM bar - for process_items + Wrapper for an iterator to provide + async access with backpressure """ - def __init__(self, total=None, *args, **kwargs): - self.tqdm = tqdm(total=total, desc="Process Items") - super().__init__(*args, **kwargs) + def __init__(self, iterator, n): + self.iterator = iter(iterator) + self.back_pressure = BoundedSemaphore(n) - def release(self): - self.tqdm.update(1) - super().release() + def __aiter__(self): + return self + async def __anext__(self): + await self.back_pressure.acquire() -def safe_dispatch(val): - func, item = val - try: - return func(item) - except Exception as e: - logger.error(e) - return None + try: + return next(self.iterator) + except StopIteration: + raise StopAsyncIteration + async def release(self, async_iterator): + """ + release iterator to pipeline the backpressure + """ + async for item in async_iterator: + try: + self.back_pressure.release() + except ValueError: + pass -class AsyncBackPressuredMap: + yield item + + +class AsyncMap: """ - Wrapper for an iterator to provide - async access with backpressure + Async iterator that maps a function to an async iterator + usign an executor and returns items as they are done """ - def __init__(self, iterator, func, max_run, executor, total=None): - self.iterator = iter(iterator) + def __init__(self, func, async_iterator, executor): + self.iterator = async_iterator self.func = func self.executor = executor - self.back_pressure = ProcessItemsSemaphore(value=max_run, total=total) + self.fill_task = create_task(self.get_from_iterator()) + + self.done_sentinel = object() + self.results = Queue() + self.tasks = {} + + async def process_and_release(self, idx): + future = self.tasks[idx] + try: + item = await future + self.results.put_nowait(item) + except Exception: + pass + finally: + self.tasks.pop(idx) + + async def get_from_iterator(self): + loop = get_running_loop() + async for idx, item in enumerate(self.iterator): + future = loop.run_in_executor( + self.executor, safe_dispatch, (self.func, item) + ) + + self.tasks[idx] = future + + loop.create_task(self.process_and_release(idx)) + + await gather(*self.tasks.values()) + self.results.put_nowait(self.done_sentinel) def __aiter__(self): return self async def __anext__(self): - await self.back_pressure.acquire() - loop = get_running_loop() + item = await self.results.get() - try: - item = next(self.iterator) - except StopIteration: + if item == self.done_sentinel: raise StopAsyncIteration + else: + return item - future = loop.run_in_executor(self.executor, safe_dispatch, (self.func, item)) - async def process_and_release(): - await future - self.back_pressure.release() - return future +async def atqdm(async_iterator, *args, **kwargs): + """ + Wrapper around tqdm for async generators + """ + _tqdm = tqdm(*args, **kwargs) + async for item in async_iterator: + _tqdm.update() + yield item - return process_and_release() + _tqdm.close() -async def grouper(iterable, n, fillvalue=None): +async def amap(func, async_iterator, executor): + """ + Maps a function onto an async iterable using executor + """ + loop = get_running_loop() + async for item in async_iterator: + yield loop.run_in_executor(executor, safe_dispatch, (func, item)) + + +async def grouper(async_iterator, n: int): """ Collect data into fixed-length chunks or blocks. + >>> list(grouper(3, 'ABCDEFG')) + [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] + + Updated from: + https://stackoverflow.com/questions/31164731/python-chunking-csv-file-multiproccessing/31170795#31170795 + + Modified for async """ - # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx - args = [iterable] * n - iterator = zip_longest(*args, fillvalue=fillvalue) + chunk = [] + async for item in async_iterator: + chunk.append(item) + if len(chunk) >= n: + yield chunk + chunk.clear() + if chunk != []: + yield chunk - async for group in iterator: - group = [g for g in group if g is not None] - yield group + +def safe_dispatch(val): + func, item = val + try: + return func(item) + except Exception as e: + logger.error(e) + return None async def multi(builder, num_workers): @@ -107,15 +184,6 @@ async def multi(builder, num_workers): elif hasattr(cursor, "count"): total = cursor.count() - mapper = AsyncBackPressuredMap( - iterator=tqdm(cursor, desc="Get", total=total), - func=builder.process_item, - max_run=builder.chunk_size, - executor=executor, - total=total, - ) - update_items = tqdm(total=total, desc="Update Targets") - logger.info( f"Starting multiprocessing: {builder.__class__.__name__}", extra={ @@ -128,9 +196,29 @@ async def multi(builder, num_workers): } }, ) - async for chunk in grouper(mapper, builder.chunk_size, fillvalue=None): + + back_pressured_get = BackPressure( + iterator=tqdm(cursor, desc="Get", total=total), n=builder.chunk_size + ) + + processed_items = atqdm( + async_iterator=AsyncMap( + func=builder.process_item, + async_iterator=back_pressured_get, + executor=executor, + ), + total=total, + desc="Process Items", + ) + + back_pressure_relief = back_pressured_get.release(processed_items) + + update_items = tqdm(total=total, desc="Update Targets") + + async for chunk in grouper(back_pressure_relief, n=builder.chunk_size): + logger.info( - "Processing batch of {} items".format(builder.chunk_size), + "Processed batch of {} items".format(builder.chunk_size), extra={ "maggma": { "event": "UPDATE", @@ -141,9 +229,7 @@ async def multi(builder, num_workers): } }, ) - chunk = await gather(*chunk) - processed_chunk = [c.result() for c in chunk if c is not None] - processed_items = [item for item in processed_chunk if item is not None] + processed_items = [item for item in chunk if item is not None] builder.update_targets(processed_items) update_items.update(len(processed_items)) @@ -158,4 +244,6 @@ async def multi(builder, num_workers): } }, ) + + update_items.close() builder.finalize() From d17a9a65ae553ea17dc6ff22a11d45a565ed391b Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 12:02:12 -0700 Subject: [PATCH 02/11] remove old amap --- src/maggma/cli/multiprocessing.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/maggma/cli/multiprocessing.py b/src/maggma/cli/multiprocessing.py index ffbd8f6df..ffa5cc5f9 100644 --- a/src/maggma/cli/multiprocessing.py +++ b/src/maggma/cli/multiprocessing.py @@ -122,15 +122,6 @@ async def atqdm(async_iterator, *args, **kwargs): _tqdm.close() -async def amap(func, async_iterator, executor): - """ - Maps a function onto an async iterable using executor - """ - loop = get_running_loop() - async for item in async_iterator: - yield loop.run_in_executor(executor, safe_dispatch, (func, item)) - - async def grouper(async_iterator, n: int): """ Collect data into fixed-length chunks or blocks. From b597df9dcbcbf8e2a2fb773b5dceaddb07c5e05e Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 12:02:22 -0700 Subject: [PATCH 03/11] rename for unordered behavior --- src/maggma/cli/multiprocessing.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/maggma/cli/multiprocessing.py b/src/maggma/cli/multiprocessing.py index ffa5cc5f9..ffa2c0c25 100644 --- a/src/maggma/cli/multiprocessing.py +++ b/src/maggma/cli/multiprocessing.py @@ -58,10 +58,11 @@ async def release(self, async_iterator): yield item -class AsyncMap: +class AsyncUnorderedMap: """ Async iterator that maps a function to an async iterator usign an executor and returns items as they are done + This does not guarantee order """ def __init__(self, func, async_iterator, executor): @@ -193,7 +194,7 @@ async def multi(builder, num_workers): ) processed_items = atqdm( - async_iterator=AsyncMap( + async_iterator=AsyncUnorderedMap( func=builder.process_item, async_iterator=back_pressured_get, executor=executor, From 6943f5dec9748b8c90e0a0fddf9e2ccb96409880 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 12:14:35 -0700 Subject: [PATCH 04/11] update tests --- tests/cli/test_multiprocessing.py | 68 +++++++++++++++++++------------ 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/tests/cli/test_multiprocessing.py b/tests/cli/test_multiprocessing.py index ee93b2df5..68b8f7686 100644 --- a/tests/cli/test_multiprocessing.py +++ b/tests/cli/test_multiprocessing.py @@ -1,7 +1,12 @@ import pytest import time import asyncio -from maggma.cli.multiprocessing import AsyncBackPressuredMap, grouper +from maggma.cli.multiprocessing import ( + grouper, + BackPressure, + AsyncUnorderedMap, + safe_dispatch, +) from concurrent.futures import ThreadPoolExecutor @@ -14,9 +19,6 @@ async def arange(count): async for group in grouper(arange(100), n=10): assert len(group) == 10 - async for group in grouper(arange(9), n=10, fillvalue="s"): - assert len(group) == 10 - async for group in grouper(arange(9), n=10): assert len(group) == 9 @@ -26,30 +28,46 @@ def wait_and_return(x): return x * x -@pytest.mark.asyncio -async def test_backpressure_map(): +async def arange(n): + for num in range(n): + yield num - executor = ThreadPoolExecutor(1) - mapper = AsyncBackPressuredMap( - iterator=range(3), func=wait_and_return, max_run=2, executor=executor - ) - true_values = [x * x for x in range(3)] - async for finished_val in mapper: - finished_val = await finished_val - assert finished_val.result() == true_values.pop(0) +@pytest.mark.asyncio +async def test_backpressure(): - mapper = AsyncBackPressuredMap( - iterator=range(3), func=wait_and_return, max_run=2, executor=executor - ) + iterable = range(10) + backpressure = BackPressure(iterable, 2) # Put two items into the process queue - futures = [await mapper.__anext__(), await mapper.__anext__()] + asyncio.gather(backpressure.__anext__(), backpressure.__anext__()) + # Ensure back_pressure enabled - assert mapper.back_pressure.locked() - await asyncio.sleep(2) - # Ensure back_pressure enabled till data is dequeued from process_pipeline - assert mapper.back_pressure.locked() - # Dequeue futures and ensure back_pressure is gone - await asyncio.gather(*futures) - assert not mapper.back_pressure.locked() + assert backpressure.back_pressure.locked() + + releaser = backpressure.release(arange(10)) + await releaser.__anext__() + assert not backpressure.back_pressure.locked() + asyncio.gather(releaser.__anext__(), releaser.__anext__()) + assert not backpressure.back_pressure.locked() + + +@pytest.mark.asyncio +async def test_async_map(): + + executor = ThreadPoolExecutor(1) + amap = AsyncUnorderedMap(wait_and_return, arange(3), executor) + true_values = {x * x for x in range(3)} + + finished_vals = set() + async for finished_val in amap: + finished_vals.add(finished_val) + + assert finished_vals == true_values + + +def test_safe_dispatch(): + def bad_func(val): + raise ValueError("AAAH") + + safe_dispatch((bad_func, "")) From 2e6989349c211670278f6b8ba05f5ca27a87e3b2 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 12:55:44 -0700 Subject: [PATCH 05/11] update tests --- tests/cli/test_multiprocessing.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/cli/test_multiprocessing.py b/tests/cli/test_multiprocessing.py index 68b8f7686..286a38c7a 100644 --- a/tests/cli/test_multiprocessing.py +++ b/tests/cli/test_multiprocessing.py @@ -40,15 +40,26 @@ async def test_backpressure(): backpressure = BackPressure(iterable, 2) # Put two items into the process queue - asyncio.gather(backpressure.__anext__(), backpressure.__anext__()) + await backpressure.__anext__() + await backpressure.__anext__() # Ensure back_pressure enabled assert backpressure.back_pressure.locked() + # Release back pressure releaser = backpressure.release(arange(10)) await releaser.__anext__() assert not backpressure.back_pressure.locked() - asyncio.gather(releaser.__anext__(), releaser.__anext__()) + + # Ensure can keep releasing backing pressure and won't error + await releaser.__anext__() + await releaser.__anext__() + + # Ensure stop itteration works + with pytest.raises(StopAsyncIteration): + for i in range(10): + await releaser.__anext__() + assert not backpressure.back_pressure.locked() From fcca9104f37ed6d0c61b2ed6a198eb53081d93c0 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 13:22:59 -0700 Subject: [PATCH 06/11] fix lgtm issues --- src/maggma/cli/multiprocessing.py | 2 +- src/maggma/stores/mongolike.py | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/maggma/cli/multiprocessing.py b/src/maggma/cli/multiprocessing.py index ffa2c0c25..15aff066b 100644 --- a/src/maggma/cli/multiprocessing.py +++ b/src/maggma/cli/multiprocessing.py @@ -15,7 +15,7 @@ gather, ) -from aioitertools import islice, enumerate +from aioitertools import enumerate from concurrent.futures import ProcessPoolExecutor from maggma.utils import primed from tqdm import tqdm diff --git a/src/maggma/stores/mongolike.py b/src/maggma/stores/mongolike.py index e634cfc49..2adbccb50 100644 --- a/src/maggma/stores/mongolike.py +++ b/src/maggma/stores/mongolike.py @@ -70,7 +70,9 @@ def name(self) -> str: """ return f"mongo://{self.host}/{self.database}/{self.collection_name}" - def connect(self, force_reset: bool = False, ssh_tunnel: SSHTunnelForwarder = None): + def connect( + self, force_reset: bool = False, ssh_tunnel: SSHTunnelForwarder = None + ): # lgtm[py/conflicting-attributes] """ Connect to the source data """ @@ -341,7 +343,7 @@ def __init__(self, uri: str, database: str, collection_name: str, **kwargs): self.collection_name = collection_name self.kwargs = kwargs self._collection = None - super(MongoStore, self).__init__(**kwargs) + super(self).__init__(**kwargs) @property def name(self) -> str: @@ -351,7 +353,9 @@ def name(self) -> str: # TODO: This is not very safe since it exposes the username/password info return self.uri - def connect(self, force_reset: bool = False, ssh_tunnel: SSHTunnelForwarder = None): + def connect( + self, force_reset: bool = False, ssh_tunnel: SSHTunnelForwarder = None + ): # lgtm[py/conflicting-attributes] """ Connect to the source data """ @@ -380,7 +384,9 @@ def __init__(self, collection_name: str = "memory_db", **kwargs): self.kwargs = kwargs super(MongoStore, self).__init__(**kwargs) # noqa - def connect(self, force_reset: bool = False, ssh_tunnel: SSHTunnelForwarder = None): + def connect( + self, force_reset: bool = False, ssh_tunnel: SSHTunnelForwarder = None + ): # lgtm[py/conflicting-attributes] """ Connect to the source data """ @@ -465,7 +471,9 @@ def __init__(self, paths: Union[str, List[str]], **kwargs): self.kwargs = kwargs super().__init__(collection_name="collection", **kwargs) - def connect(self, force_reset=False, ssh_tunnel=None): + def connect( + self, force_reset=False, ssh_tunnel=None + ): # lgtm[py/conflicting-attributes] """ Loads the files into the collection in memory """ From 0611441d71b8d109fe618e90b6457cd4a6470038 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 13:45:30 -0700 Subject: [PATCH 07/11] fix lint issues --- src/maggma/api/APIManager.py | 2 +- src/maggma/stores/mongolike.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/maggma/api/APIManager.py b/src/maggma/api/APIManager.py index f3f7d4197..bbf96fe13 100644 --- a/src/maggma/api/APIManager.py +++ b/src/maggma/api/APIManager.py @@ -54,7 +54,7 @@ def load(self, endpoint, prefix: str = "/"): class_name = endpoint.split(".")[-1] new_endpoint = dynamic_import(module_path, class_name) self.__setitem__(prefix, new_endpoint) - pass + elif isclass(endpoint) and issubclass(endpoint, Resource): self.__setitem__(prefix, endpoint) else: diff --git a/src/maggma/stores/mongolike.py b/src/maggma/stores/mongolike.py index 2adbccb50..dfd39e95c 100644 --- a/src/maggma/stores/mongolike.py +++ b/src/maggma/stores/mongolike.py @@ -343,7 +343,7 @@ def __init__(self, uri: str, database: str, collection_name: str, **kwargs): self.collection_name = collection_name self.kwargs = kwargs self._collection = None - super(self).__init__(**kwargs) + super().__init__(**kwargs) @property def name(self) -> str: From cdb897dd40e7cb44f344ebe217e2ac3e7d389409 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 13:48:10 -0700 Subject: [PATCH 08/11] unneeded hash function --- src/maggma/core/store.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/maggma/core/store.py b/src/maggma/core/store.py index d6efd5507..80d705468 100644 --- a/src/maggma/core/store.py +++ b/src/maggma/core/store.py @@ -337,9 +337,6 @@ def updated_keys(self, target, criteria=None): def __ne__(self, other): return not self == other - def __hash__(self): - return hash((self.last_updated_field,)) - def __getstate__(self): return self.as_dict() From 35a6359e326a6c96cc234878ef69de9350f2134c Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 13:49:03 -0700 Subject: [PATCH 09/11] more lgtm issues --- src/maggma/api/query_operator.py | 2 +- src/maggma/api/resource.py | 1 - src/maggma/stores/gridfs.py | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/maggma/api/query_operator.py b/src/maggma/api/query_operator.py index 33dd88053..f72c1a54a 100644 --- a/src/maggma/api/query_operator.py +++ b/src/maggma/api/query_operator.py @@ -1,4 +1,4 @@ -from typing import List, Dict, Optional, Any, Type, Tuple, Mapping +from typing import List, Dict, Optional, Any, Mapping from pydantic import BaseModel from fastapi import Query from monty.json import MSONable diff --git a/src/maggma/api/resource.py b/src/maggma/api/resource.py index 6184af6c0..f0966b3d6 100644 --- a/src/maggma/api/resource.py +++ b/src/maggma/api/resource.py @@ -16,7 +16,6 @@ ) from fastapi import FastAPI, APIRouter, Path, HTTPException, Depends from maggma.api.models import Response, Meta -from starlette.responses import RedirectResponse class Resource(MSONable): diff --git a/src/maggma/stores/gridfs.py b/src/maggma/stores/gridfs.py index fe4314c8b..52867bc84 100644 --- a/src/maggma/stores/gridfs.py +++ b/src/maggma/stores/gridfs.py @@ -18,7 +18,6 @@ from pymongo import MongoClient from monty.json import jsanitize from monty.dev import deprecated -from maggma.utils import confirm_field_index from maggma.core import Store, Sort from maggma.stores import MongoStore From d6153dd12f7309d0cdd6360498523e5d40fe2022 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 13:55:41 -0700 Subject: [PATCH 10/11] have to skip parent in super() --- src/maggma/stores/mongolike.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/stores/mongolike.py b/src/maggma/stores/mongolike.py index dfd39e95c..87202c669 100644 --- a/src/maggma/stores/mongolike.py +++ b/src/maggma/stores/mongolike.py @@ -343,7 +343,7 @@ def __init__(self, uri: str, database: str, collection_name: str, **kwargs): self.collection_name = collection_name self.kwargs = kwargs self._collection = None - super().__init__(**kwargs) + super(MongoStore).__init__(**kwargs) @property def name(self) -> str: From aee3e5441f57aea31733672f73d7aae799521c87 Mon Sep 17 00:00:00 2001 From: Shyam Dwaraknath Date: Mon, 6 Apr 2020 14:00:06 -0700 Subject: [PATCH 11/11] back around to an lgtm error, just ignoring for now --- src/maggma/stores/mongolike.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/stores/mongolike.py b/src/maggma/stores/mongolike.py index 87202c669..41219bb67 100644 --- a/src/maggma/stores/mongolike.py +++ b/src/maggma/stores/mongolike.py @@ -343,7 +343,7 @@ def __init__(self, uri: str, database: str, collection_name: str, **kwargs): self.collection_name = collection_name self.kwargs = kwargs self._collection = None - super(MongoStore).__init__(**kwargs) + super(MongoStore, self).__init__(**kwargs) # lgtm @property def name(self) -> str: