-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create cleanup mechanism #13
Comments
Thanks @mkarmona ! What kind of test have you ran? I know that if you try something like this: iterable = iter(stage)
next(iterable)
# then do nothing that doesn't fully consume the stage's iterator will leave the background processes/threads hanging. I've thought about this and the proper way to do it would be
About try:
# to_iterable code
finally:
for queue in stage_input_queue.values():
queue.done() in If you wish to try fix this I can help you along the way! |
@cgarciae thanks! Basically I am doing this in the main section of my pipeline logger.debug('create an iterable of handles from filenames %s', str(filenames))
in_handles = itertools.imap(from_source_for_reading, filenames)
logger.debug('create a iterable of lines from all file handles')
chained_handles = itertools.chain.from_iterable(itertools.ifilter(lambda e: e is not None, in_handles))
evs = more_itertools.take(first_n, chained_handles) \
if first_n else chained_handles
logger.debug('load LUTs')
lookup_data = make_lookup_data(es_client, redis_client)
logger.info('declare pipeline to run')
write_evidences_on_start_f = functools.partial(write_evidences_on_start, enable_output_to_es, output_folder)
validate_evidence_on_start_f = functools.partial(process_evidence_on_start, lookup_data)
# here the pipeline definition
pl_stage = pr.map(process_evidence, evs, workers=num_workers, maxsize=10000,
on_start=validate_evidence_on_start_f, on_done=process_evidence_on_done)
pl_stage = pr.map(write_evidences, pl_stage, workers=num_writers, maxsize=10000, on_start=write_evidences_on_start_f)
logger.info('run evidence processing pipeline')
results = reduce_tuple_with_sum(pr.to_iterable(pl_stage))
logger.info('done evidence processing pipeline')
return results So yes, I know is lazy evaluated and I work with iterators as inputs and outputs. But if you |
@mkarmona I see. But my comment was actually about modifying Pypeline's I'll try to take a shot at it during the weekend, but if you have time in the previous comment I left some ideas as how you can implement it if you want to create a PR. |
@cgarciae let me see this weekend if I find the proper time slot to push this forward |
@mkarmona I just realized that during the implementation I create all Threads and Processes with from pypeln import process as pr
import time
def do_print(x):
time.sleep(1)
print(x)
stage = pr.map(do_print, range(1000), workers = 5)
pr.run(stage) In |
@cgarciae that was a really simple example, in my pipeline one step fills the queue as the second one is trying to cope with the messages at a slower pace so when it fills the
and it keeps stuck there ad infinitum |
@mkarmona I see. I think this can be solved by fixing the |
@mkarmona this was actually a quick fix. Can you upgrade via
and try the new code? |
@cgarciae cool! how quick thanks. Here my attempt to test it with real code ^CTraceback (most recent call last):
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/home/mkarmona/src/github/opent/data_pipeline_refactor/mrtarget/CommandLine.py", line 378, in <module>
sys.exit(main())
File "/home/mkarmona/src/github/opent/data_pipeline_refactor/mrtarget/CommandLine.py", line 316, in main
num_writers=args.num_writers)
File "mrtarget/modules/Evidences.py", line 357, in process_evidences_pipeline
results = reduce_tuple_with_sum(pr.to_iterable(pl_stage))
File "mrtarget/common/EvidencesHelpers.py", line 142, in reduce_tuple_with_sum
return functools.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]), iterable, (0, 0))
File "/home/mkarmona/.virtualenvs/mrtarget/local/lib/python2.7/site-packages/pypeln/process.py", line 848, in _to_iterable
for x in input_queue:
File "/home/mkarmona/.virtualenvs/mrtarget/local/lib/python2.7/site-packages/pypeln/process.py", line 233, in __iter__
while not self.is_done():
File "/home/mkarmona/.virtualenvs/mrtarget/local/lib/python2.7/site-packages/pypeln/process.py", line 258, in is_done
return self.namespace.remaining == 0 and self.queue.empty()
KeyboardInterrupt
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
Process Process-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
self.run()
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
self._target(*self._args, **self._kwargs)
File "/home/mkarmona/.virtualenvs/mrtarget/local/lib/python2.7/site-packages/pypeln/process.py", line 341, in _map
File "/home/mkarmona/.virtualenvs/mrtarget/local/lib/python2.7/site-packages/pypeln/process.py", line 341, in _map
_run_task(f_task, params)
_run_task(f_task, params)
File "/home/mkarmona/.virtualenvs/mrtarget/local/lib/python2.7/site-packages/pypeln/process.py", line 326, in _run_task
File "/home/mkarmona/.virtualenvs/mrtarget/local/lib/python2.7/site-packages/pypeln/process.py", line 326, in _run_task
params.pipeline_namespace.error = True
params.pipeline_namespace.error = True
File "/usr/lib/python2.7/multiprocessing/managers.py", line 1028, in __setattr__
File "/usr/lib/python2.7/multiprocessing/managers.py", line 1028, in __setattr__
return callmethod('__setattr__', (key, value))
return callmethod('__setattr__', (key, value))
File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
conn.send((self._id, methodname, args, kwds))
conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
IOError: [Errno 32] Broken pipe
2018-11-15 22:31:57,485 - mrtarget.common.EvidencesHelpers_31400 - DEBUG - closing files ./evidences-valid_259360b92bf743078e01b83b12fe4f89.json.gz ./evidences-invalid_e634a404df8447838ccc2ccc5ad05f4f.json.gz
2018-11-15 22:31:57,485 - mrtarget.common.EvidencesHelpers_31396 - DEBUG - closing files ./evidences-valid_bc4d6ac7f2364d9f959f823d8203c680.json.gz ./evidences-invalid_e753a055787a4910a77514be2665750d.json.gz |
@mkarmona thanks for the feedback!
|
@cgarciae I tried again and I now got a broken pipe. I found this on StackOverflow which may be worth to have a look.
I will try to simplify my main function as a working example, I am afraid it won't help anyway though as it works if I put a limit in the number of lines to be processed or I leave it to finish. |
__del__
The text was updated successfully, but these errors were encountered: