From 5dfa5a937dba8606ce114d5fa9007a01c93bcaa8 Mon Sep 17 00:00:00 2001 From: Cristian Garcia Date: Wed, 26 Sep 2018 20:38:49 -0500 Subject: [PATCH] on_start, on_done, StageStatus --- pypeln/process.py | 208 +++++++++++++++++++++++++++++++++++----------- pypeln/utils.py | 84 ++++++++++++------- tests/test_pr.py | 47 +++++++++++ 3 files changed, 258 insertions(+), 81 deletions(-) diff --git a/pypeln/process.py b/pypeln/process.py index 4cce0ee..83d321a 100644 --- a/pypeln/process.py +++ b/pypeln/process.py @@ -41,10 +41,12 @@ def _get_namespace(): class _Stage(utils.BaseStage): - def __init__(self, worker_constructor, workers, maxsize, target, args, dependencies): + def __init__(self, worker_constructor, workers, maxsize, on_start, on_done, target, args, dependencies): self.worker_constructor = worker_constructor self.workers = workers self.maxsize = maxsize + self.on_start = on_start + self.on_done = on_done self.target = target self.args = args self.dependencies = dependencies @@ -62,6 +64,9 @@ def __repr__(self): dependencies = len(self.dependencies), ) +class _StageParams(namedtuple("_StageParams", + ["input_queue", "output_queues", "on_start", "on_done", "stage_namespace", "stage_lock"])): + pass class _InputQueue(object): @@ -116,23 +121,40 @@ def done(self): # map ########### -def _map(f, input_queue, output_queues): +def _map(f, params): - for x in input_queue: - y = f(x) - output_queues.put(y) + args = params.on_start() if params.on_start is not None else None + + if args is None: + args = () + + elif not isinstance(args, tuple): + args = (args,) + + for x in params.input_queue: + y = f(x, *args) + params.output_queues.put(y) + params.output_queues.done() + + if params.on_done is not None: + with params.stage_lock: + params.stage_namespace.active_workers -= 1 + + stage_status = utils.StageStatus( + namespace = params.stage_namespace, + lock = params.stage_lock, + ) - output_queues.done() + params.on_done(stage_status, *args) -# @utils.maybe_partial(2) -def map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): +def map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None): """ """ if utils.is_undefined(stage): - return utils.Partial(lambda stage: map(f, stage, workers=workers, maxsize=maxsize)) + return utils.Partial(lambda stage: map(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done)) stage = _to_stage(stage) @@ -140,6 +162,8 @@ def map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): worker_constructor = WORKER, workers = workers, maxsize = maxsize, + on_start = on_start, + on_done = on_done, target = _map, args = (f,), dependencies = [stage], @@ -149,20 +173,41 @@ def map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): # flat_map ########### -def _flat_map(f, input_queue, output_queues): - for x in input_queue: +def _flat_map(f, params): + + args = params.on_start() if params.on_start is not None else None + + if args is None: + args = () + + elif not isinstance(args, tuple): + args = (args,) + + for x in params.input_queue: for y in f(x): - output_queues.put(y) + params.output_queues.put(y) - output_queues.done() + params.output_queues.done() + if params.on_done is not None: + with params.stage_lock: + params.stage_namespace.active_workers -= 1 -# @utils.maybe_partial(2) -def flat_map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): + stage_status = utils.StageStatus( + namespace = params.stage_namespace, + lock = params.stage_lock, + ) + + params.on_done(stage_status, *args) + + +def flat_map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None): + """ + """ if utils.is_undefined(stage): - return utils.Partial(lambda stage: flat_map(f, stage, workers=workers, maxsize=maxsize)) + return utils.Partial(lambda stage: flat_map(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done)) stage = _to_stage(stage) @@ -170,6 +215,8 @@ def flat_map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): worker_constructor = WORKER, workers = workers, maxsize = maxsize, + on_start = on_start, + on_done = on_done, target = _flat_map, args = (f,), dependencies = [stage], @@ -180,27 +227,49 @@ def flat_map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): # filter ########### -def _filter(f, input_queue, output_queues): +def _filter(f, params): - for x in input_queue: - if f(x): - output_queues.put(x) + args = params.on_start() if params.on_start is not None else None - output_queues.done() + if args is None: + args = () + + elif not isinstance(args, tuple): + args = (args,) + + for x in params.input_queue: + if f(x, *args): + params.output_queues.put(x) + + params.output_queues.done() + + if params.on_done is not None: + with params.stage_lock: + params.stage_namespace.active_workers -= 1 + + stage_status = utils.StageStatus( + namespace = params.stage_namespace, + lock = params.stage_lock, + ) + params.on_done(stage_status, *args) -# @utils.maybe_partial(2) -def filter(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): + +def filter(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None): + """ + """ if utils.is_undefined(stage): - return utils.Partial(lambda stage: filter(f, stage, workers=workers, maxsize=maxsize)) + return utils.Partial(lambda stage: filter(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done)) stage = _to_stage(stage) - + return _Stage( worker_constructor = WORKER, workers = workers, maxsize = maxsize, + on_start = on_start, + on_done = on_done, target = _filter, args = (f,), dependencies = [stage], @@ -211,19 +280,39 @@ def filter(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0): # each ########### -def _each(f, input_queue, output_queues): +def _each(f, params): - for x in input_queue: - f(x) + args = params.on_start() if params.on_start is not None else None + + if args is None: + args = () + + elif not isinstance(args, tuple): + args = (args,) + + for x in params.input_queue: + f(x, *args) + + params.output_queues.done() + + if params.on_done is not None: + with params.stage_lock: + params.stage_namespace.active_workers -= 1 + + stage_status = utils.StageStatus( + namespace = params.stage_namespace, + lock = params.stage_lock, + ) - output_queues.done() + params.on_done(stage_status, *args) -# @utils.maybe_partial(2) -def each(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, run = True): +def each(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None, run = False): + """ + """ if utils.is_undefined(stage): - return utils.Partial(lambda stage: each(f, stage, workers=workers, maxsize=maxsize, run=run)) + return utils.Partial(lambda stage: each(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done)) stage = _to_stage(stage) @@ -231,6 +320,8 @@ def each(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, run = True): worker_constructor = WORKER, workers = workers, maxsize = maxsize, + on_start = on_start, + on_done = on_done, target = _each, args = (f,), dependencies = [stage], @@ -247,12 +338,12 @@ def each(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, run = True): # concat ########### -def _concat(input_queue, output_queues): +def _concat(params): - for x in input_queue: - output_queues.put(x) + for x in params.input_queue: + params.output_queues.put(x) - output_queues.done() + params.output_queues.done() def concat(stages, maxsize = 0): @@ -263,6 +354,8 @@ def concat(stages, maxsize = 0): worker_constructor = WORKER, workers = 1, maxsize = maxsize, + on_start = None, + on_done = None, target = _concat, args = tuple(), dependencies = stages, @@ -309,12 +402,12 @@ def _to_stage(obj): # from_iterable ################ -def _from_iterable(iterable, input_queue, output_queues): +def _from_iterable(iterable, params): for x in iterable: - output_queues.put(x) + params.output_queues.put(x) - output_queues.done() + params.output_queues.done() # @utils.maybe_partial(1) def from_iterable(iterable = utils.UNDEFINED, worker_constructor = Thread): @@ -326,6 +419,8 @@ def from_iterable(iterable = utils.UNDEFINED, worker_constructor = Thread): worker_constructor = worker_constructor, workers = 1, maxsize = None, + on_start = None, + on_done = None, target = _from_iterable, args = (iterable,), dependencies = [], @@ -386,19 +481,34 @@ def _to_iterable(stage, maxsize): ) stage_output_queues[stage] = _OutputQueues([ input_queue ]) + + processes = [] + for _stage in stage_output_queues: + + if _stage.on_done is not None: + stage_lock = Lock() + stage_namespace = _get_namespace() + stage_namespace.active_workers = _stage.workers + else: + stage_lock = None + stage_namespace = None + + for _ in range(_stage.workers): - processes = [ - _stage.worker_constructor( - target = _stage.target, - args = _stage.args, - kwargs = dict( + stage_params = _StageParams( output_queues = stage_output_queues[_stage], input_queue = stage_input_queue.get(_stage, None), - ), - ) - for _stage in stage_output_queues - for _ in range(_stage.workers) - ] + on_start = _stage.on_start, + on_done = _stage.on_done, + stage_lock = stage_lock, + stage_namespace = stage_namespace + ) + process = _stage.worker_constructor( + target = _stage.target, + args = _stage.args + (stage_params,) + ) + + processes.append(process) for p in processes: p.daemon = True diff --git a/pypeln/utils.py b/pypeln/utils.py index 666cf1f..0b7f845 100644 --- a/pypeln/utils.py +++ b/pypeln/utils.py @@ -2,23 +2,6 @@ import traceback from collections import namedtuple -try: - from wrapt import decorator as wrapt_decorator -except ImportError: - def wrapt_decorator(f): - - @functools.wraps(f) - def wrapper_f(g): - - @functools.wraps(g) - def wrapper_g(*args, **kwargs): - return f(g, None, args, kwargs) - - return wrapper_g - - return wrapper_f - - TIMEOUT = 0.0001 @@ -41,6 +24,29 @@ class BaseStage(object): def __or__(self, f): return f(self) +class StageStatus(object): + + def __init__(self, namespace, lock): + self._namespace = namespace + self._lock = lock + + @property + def done(self): + with self._lock: + return self._namespace.active_workers == 0 + + @property + def active_workers(self): + with self._lock: + return self._namespace.active_workers + + + def __str__(self): + return "StageStatus(done = {done}, active_workers = {active_workers})".format( + done = self.done, + active_workers = self.active_workers, + ) + class Namespace(object): @@ -82,21 +88,6 @@ def chunks(n, l): yield l[i:i + n] - - -def maybe_partial(n): - - @wrapt_decorator - def wrapper(wrapped_f, instance, args, kwargs): - - if len(args) < n: - return Partial(lambda s: wrapped_f(*(args + (s,)), **kwargs)) - else: - return wrapped_f(*args, **kwargs) - - return wrapper - - def print_error(f): @functools.wraps(f) @@ -108,3 +99,32 @@ def _lambda(*args, **kwargs): raise e return _lambda + + +# try: +# from wrapt import decorator as wrapt_decorator +# except ImportError: +# def wrapt_decorator(f): + +# @functools.wraps(f) +# def wrapper_f(g): + +# @functools.wraps(g) +# def wrapper_g(*args, **kwargs): +# return f(g, None, args, kwargs) + +# return wrapper_g + +# return wrapper_f + +# def maybe_partial(n): + +# @wrapt_decorator +# def wrapper(wrapped_f, instance, args, kwargs): + +# if len(args) < n: +# return Partial(lambda s: wrapped_f(*(args + (s,)), **kwargs)) +# else: +# return wrapped_f(*args, **kwargs) + +# return wrapper \ No newline at end of file diff --git a/tests/test_pr.py b/tests/test_pr.py index b72acba..b227d8b 100644 --- a/tests/test_pr.py +++ b/tests/test_pr.py @@ -77,6 +77,53 @@ def test_map_square(nums): assert nums_pl == nums_py +@hp.given(nums = st.lists(st.integers())) +@hp.settings(max_examples=MAX_EXAMPLES) +def test_map_square_event_start(nums): + + + nums_py = map(lambda x: x ** 2, nums) + nums_py = list(nums_py) + + + namespace = pr._get_namespace() + namespace.x = 0 + + def set_1(): + namespace.x = 1 + + nums_pl = pr.map(lambda x: x ** 2, nums, on_start=set_1) + nums_pl = list(nums_pl) + + assert nums_pl == nums_py + assert namespace.x == 1 + + +@hp.given(nums = st.lists(st.integers())) +@hp.settings(max_examples=MAX_EXAMPLES) +def test_map_square_event_end(nums): + + namespace = pr._get_namespace() + namespace.x = 0 + namespace.done = False + namespace.active_workers = -1 + + def set_1(): + namespace.x = 1 + + def set_2(stage_status): + namespace.x = 2 + namespace.active_workers = stage_status.active_workers + namespace.done = stage_status.done + + nums_pl = pr.map(lambda x: x ** 2, nums, workers=3, on_start=set_1, on_done=set_2) + nums_pl = list(nums_pl) + + assert namespace.x == 2 + assert namespace.done == True + assert namespace.active_workers == 0 + + @hp.given(nums = st.lists(st.integers())) @hp.settings(max_examples=MAX_EXAMPLES) def test_map_square_workers(nums):