Skip to content

Commit

Permalink
on_start, on_done, StageStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
cgarciae committed Sep 27, 2018
1 parent 66d6246 commit 5dfa5a9
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 81 deletions.
208 changes: 159 additions & 49 deletions pypeln/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ def _get_namespace():

class _Stage(utils.BaseStage):

def __init__(self, worker_constructor, workers, maxsize, target, args, dependencies):
def __init__(self, worker_constructor, workers, maxsize, on_start, on_done, target, args, dependencies):
self.worker_constructor = worker_constructor
self.workers = workers
self.maxsize = maxsize
self.on_start = on_start
self.on_done = on_done
self.target = target
self.args = args
self.dependencies = dependencies
Expand All @@ -62,6 +64,9 @@ def __repr__(self):
dependencies = len(self.dependencies),
)

class _StageParams(namedtuple("_StageParams",
["input_queue", "output_queues", "on_start", "on_done", "stage_namespace", "stage_lock"])):
pass

class _InputQueue(object):

Expand Down Expand Up @@ -116,30 +121,49 @@ def done(self):
# map
###########

def _map(f, input_queue, output_queues):
def _map(f, params):

for x in input_queue:
y = f(x)
output_queues.put(y)
args = params.on_start() if params.on_start is not None else None

if args is None:
args = ()

elif not isinstance(args, tuple):
args = (args,)

for x in params.input_queue:
y = f(x, *args)
params.output_queues.put(y)

params.output_queues.done()

if params.on_done is not None:
with params.stage_lock:
params.stage_namespace.active_workers -= 1

stage_status = utils.StageStatus(
namespace = params.stage_namespace,
lock = params.stage_lock,
)

output_queues.done()
params.on_done(stage_status, *args)


# @utils.maybe_partial(2)
def map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0):
def map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None):
"""
"""

if utils.is_undefined(stage):
return utils.Partial(lambda stage: map(f, stage, workers=workers, maxsize=maxsize))
return utils.Partial(lambda stage: map(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done))

stage = _to_stage(stage)

return _Stage(
worker_constructor = WORKER,
workers = workers,
maxsize = maxsize,
on_start = on_start,
on_done = on_done,
target = _map,
args = (f,),
dependencies = [stage],
Expand All @@ -149,27 +173,50 @@ def map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0):
# flat_map
###########

def _flat_map(f, input_queue, output_queues):

for x in input_queue:
def _flat_map(f, params):

args = params.on_start() if params.on_start is not None else None

if args is None:
args = ()

elif not isinstance(args, tuple):
args = (args,)

for x in params.input_queue:
for y in f(x):
output_queues.put(y)
params.output_queues.put(y)

output_queues.done()
params.output_queues.done()

if params.on_done is not None:
with params.stage_lock:
params.stage_namespace.active_workers -= 1

# @utils.maybe_partial(2)
def flat_map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0):
stage_status = utils.StageStatus(
namespace = params.stage_namespace,
lock = params.stage_lock,
)

params.on_done(stage_status, *args)


def flat_map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None):
"""
"""

if utils.is_undefined(stage):
return utils.Partial(lambda stage: flat_map(f, stage, workers=workers, maxsize=maxsize))
return utils.Partial(lambda stage: flat_map(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done))

stage = _to_stage(stage)

return _Stage(
worker_constructor = WORKER,
workers = workers,
maxsize = maxsize,
on_start = on_start,
on_done = on_done,
target = _flat_map,
args = (f,),
dependencies = [stage],
Expand All @@ -180,27 +227,49 @@ def flat_map(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0):
# filter
###########

def _filter(f, input_queue, output_queues):
def _filter(f, params):

for x in input_queue:
if f(x):
output_queues.put(x)
args = params.on_start() if params.on_start is not None else None

output_queues.done()
if args is None:
args = ()

elif not isinstance(args, tuple):
args = (args,)

for x in params.input_queue:
if f(x, *args):
params.output_queues.put(x)

params.output_queues.done()

if params.on_done is not None:
with params.stage_lock:
params.stage_namespace.active_workers -= 1

stage_status = utils.StageStatus(
namespace = params.stage_namespace,
lock = params.stage_lock,
)

params.on_done(stage_status, *args)

# @utils.maybe_partial(2)
def filter(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0):

def filter(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None):
"""
"""

if utils.is_undefined(stage):
return utils.Partial(lambda stage: filter(f, stage, workers=workers, maxsize=maxsize))
return utils.Partial(lambda stage: filter(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done))

stage = _to_stage(stage)

return _Stage(
worker_constructor = WORKER,
workers = workers,
maxsize = maxsize,
on_start = on_start,
on_done = on_done,
target = _filter,
args = (f,),
dependencies = [stage],
Expand All @@ -211,26 +280,48 @@ def filter(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0):
# each
###########

def _each(f, input_queue, output_queues):
def _each(f, params):

for x in input_queue:
f(x)
args = params.on_start() if params.on_start is not None else None

if args is None:
args = ()

elif not isinstance(args, tuple):
args = (args,)

for x in params.input_queue:
f(x, *args)

params.output_queues.done()

if params.on_done is not None:
with params.stage_lock:
params.stage_namespace.active_workers -= 1

stage_status = utils.StageStatus(
namespace = params.stage_namespace,
lock = params.stage_lock,
)

output_queues.done()
params.on_done(stage_status, *args)


# @utils.maybe_partial(2)
def each(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, run = True):
def each(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, on_start = None, on_done = None, run = False):
"""
"""

if utils.is_undefined(stage):
return utils.Partial(lambda stage: each(f, stage, workers=workers, maxsize=maxsize, run=run))
return utils.Partial(lambda stage: each(f, stage, workers=workers, maxsize=maxsize, on_start=on_start, on_done=on_done))

stage = _to_stage(stage)

stage = _Stage(
worker_constructor = WORKER,
workers = workers,
maxsize = maxsize,
on_start = on_start,
on_done = on_done,
target = _each,
args = (f,),
dependencies = [stage],
Expand All @@ -247,12 +338,12 @@ def each(f, stage = utils.UNDEFINED, workers = 1, maxsize = 0, run = True):
# concat
###########

def _concat(input_queue, output_queues):
def _concat(params):

for x in input_queue:
output_queues.put(x)
for x in params.input_queue:
params.output_queues.put(x)

output_queues.done()
params.output_queues.done()


def concat(stages, maxsize = 0):
Expand All @@ -263,6 +354,8 @@ def concat(stages, maxsize = 0):
worker_constructor = WORKER,
workers = 1,
maxsize = maxsize,
on_start = None,
on_done = None,
target = _concat,
args = tuple(),
dependencies = stages,
Expand Down Expand Up @@ -309,12 +402,12 @@ def _to_stage(obj):
# from_iterable
################

def _from_iterable(iterable, input_queue, output_queues):
def _from_iterable(iterable, params):

for x in iterable:
output_queues.put(x)
params.output_queues.put(x)

output_queues.done()
params.output_queues.done()

# @utils.maybe_partial(1)
def from_iterable(iterable = utils.UNDEFINED, worker_constructor = Thread):
Expand All @@ -326,6 +419,8 @@ def from_iterable(iterable = utils.UNDEFINED, worker_constructor = Thread):
worker_constructor = worker_constructor,
workers = 1,
maxsize = None,
on_start = None,
on_done = None,
target = _from_iterable,
args = (iterable,),
dependencies = [],
Expand Down Expand Up @@ -386,19 +481,34 @@ def _to_iterable(stage, maxsize):
)

stage_output_queues[stage] = _OutputQueues([ input_queue ])

processes = []
for _stage in stage_output_queues:

if _stage.on_done is not None:
stage_lock = Lock()
stage_namespace = _get_namespace()
stage_namespace.active_workers = _stage.workers
else:
stage_lock = None
stage_namespace = None

for _ in range(_stage.workers):

processes = [
_stage.worker_constructor(
target = _stage.target,
args = _stage.args,
kwargs = dict(
stage_params = _StageParams(
output_queues = stage_output_queues[_stage],
input_queue = stage_input_queue.get(_stage, None),
),
)
for _stage in stage_output_queues
for _ in range(_stage.workers)
]
on_start = _stage.on_start,
on_done = _stage.on_done,
stage_lock = stage_lock,
stage_namespace = stage_namespace
)
process = _stage.worker_constructor(
target = _stage.target,
args = _stage.args + (stage_params,)
)

processes.append(process)

for p in processes:
p.daemon = True
Expand Down
Loading

0 comments on commit 5dfa5a9

Please sign in to comment.