From 3d9f3450a2dfae68e7e5dfc14dce7062490ae802 Mon Sep 17 00:00:00 2001 From: Attumm Date: Fri, 2 Aug 2024 01:15:41 +0200 Subject: [PATCH 01/12] Added start of decorators --- examples/example_decorator.py | 35 ++++++++++++++++++++++++++++++++++ examples/example_multi_func.py | 4 ++++ examples/example_produce.py | 4 ++++ meesee.py | 26 +++++++++++++++++++++++-- 4 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 examples/example_decorator.py diff --git a/examples/example_decorator.py b/examples/example_decorator.py new file mode 100644 index 0000000..cb7ebb0 --- /dev/null +++ b/examples/example_decorator.py @@ -0,0 +1,35 @@ +import os +import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from meesee import startapp +from meesee import Meesee + +config = { + "namespace": "removeme", + "key": "tasks", + "redis_config": {}, + "maxsize": 100, + "timeout": 1, +} + + +@Meesee.worker() +def func_a(item, worker_id): + print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item)) + + +@Meesee.worker() +def func_b(item, worker_id): + print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item)) + + +@Meesee.worker() +def func_c(item, worker_id): + print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item)) + + +if __name__ == '__main__': + workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 + Meesee.start_workers(workers=workers, config=config) diff --git a/examples/example_multi_func.py b/examples/example_multi_func.py index 6343716..8874f2a 100644 --- a/examples/example_multi_func.py +++ b/examples/example_multi_func.py @@ -1,4 +1,8 @@ +import os import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from meesee import startapp config = { diff --git a/examples/example_produce.py b/examples/example_produce.py index 92b2617..63cf72e 100644 --- a/examples/example_produce.py +++ b/examples/example_produce.py @@ -1,4 +1,8 @@ +import os import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from meesee import RedisQueue config = { diff --git a/meesee.py b/meesee.py index e7d4969..42c20df 100644 --- a/meesee.py +++ b/meesee.py @@ -15,6 +15,7 @@ class RedisQueue: + def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None): # TCP check if connection is alive # redis_config.setdefault('socket_timeout', 30) @@ -88,6 +89,27 @@ def __len__(self): return self.r.llen(self.list_key) +class Meesee: + worker_funcs = [] + + @classmethod + def worker(cls): + def decorator(func): + cls.worker_funcs.append(func) + return func + return decorator + + @classmethod + def start_workers(cls, workers=10, config=config): + n_workers = len(cls.worker_funcs) + if n_workers == 0: + print("No workers have been assigned with a decorator") + if n_workers > workers: + print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}") + workers = n_workers + startapp(cls.worker_funcs, workers=workers, config=config) + + class InitFail(Exception): pass @@ -143,8 +165,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=None, init_kwargs={}): with Pool(workers) as p: - args = ((func, func_kwargs, on_failure_func, config, worker_id, init_kwargs) - for worker_id in range(1, workers + 1)) + args = [(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs) + for worker_id in range(1, workers + 1)] try: p.starmap(run_worker, args) except (KeyboardInterrupt, SystemExit): From c48120baaf526e50f733c830286048b0a4f981f0 Mon Sep 17 00:00:00 2001 From: Attumm Date: Fri, 2 Aug 2024 13:47:57 +0200 Subject: [PATCH 02/12] Changed removed the pylama check for imports after sys path update --- examples/example_decorator.py | 3 +-- examples/example_multi_func.py | 3 ++- examples/example_produce.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/example_decorator.py b/examples/example_decorator.py index cb7ebb0..b83dec4 100644 --- a/examples/example_decorator.py +++ b/examples/example_decorator.py @@ -3,8 +3,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from meesee import startapp -from meesee import Meesee +from meesee import Meesee # noqa: E402 config = { "namespace": "removeme", diff --git a/examples/example_multi_func.py b/examples/example_multi_func.py index 8874f2a..23fa995 100644 --- a/examples/example_multi_func.py +++ b/examples/example_multi_func.py @@ -3,7 +3,8 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from meesee import startapp +from meesee import startapp # noqa: E402 + config = { "namespace": "removeme", diff --git a/examples/example_produce.py b/examples/example_produce.py index 63cf72e..2a0f963 100644 --- a/examples/example_produce.py +++ b/examples/example_produce.py @@ -3,7 +3,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from meesee import RedisQueue +from meesee import RedisQueue # noqa: E402 config = { "namespace": "removeme", From 62f71e7ddfa19b0972e8857267823e2da7b0c30f Mon Sep 17 00:00:00 2001 From: Attumm Date: Fri, 2 Aug 2024 13:49:37 +0200 Subject: [PATCH 03/12] Changed Revert to generator compression from list compression --- meesee.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/meesee.py b/meesee.py index 42c20df..d9785f8 100644 --- a/meesee.py +++ b/meesee.py @@ -165,8 +165,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=None, init_kwargs={}): with Pool(workers) as p: - args = [(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs) - for worker_id in range(1, workers + 1)] + args = ((func, func_kwargs, on_failure_func, config, worker_id, init_kwargs) + for worker_id in range(1, workers + 1)) try: p.starmap(run_worker, args) except (KeyboardInterrupt, SystemExit): From 14508f87b07bfe3bd0d021fbfaa24b221ff83d5f Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 3 Aug 2024 01:36:04 +0200 Subject: [PATCH 04/12] Added python magic for decorators --- examples/example_decorator_magic.py | 50 ++++++++++++++++++++++++++ meesee.py | 56 ++++++++++++++++++++++++++--- 2 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 examples/example_decorator_magic.py diff --git a/examples/example_decorator_magic.py b/examples/example_decorator_magic.py new file mode 100644 index 0000000..279f42d --- /dev/null +++ b/examples/example_decorator_magic.py @@ -0,0 +1,50 @@ +import os +import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from meesee import Meesee # noqa: E402 + + +config = { + "namespace": "removeme", + "key": "tasks", + "redis_config": {}, + "maxsize": 100, + "timeout": 1, +} + +box = Meesee(config) + + +@box.worker() +def foobar(item, worker_id): + print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item)) + + +@box.worker() +def name_of_the_function(item, worker_id): + print('func: name_of_the_function, worker_id: {}, item: {}'.format(worker_id, item)) + + +@box.worker(queue="passed_name") +def passed_name_not_this_one(item, worker_id): + print('func: passed_name_not_this_one, worker_id: {}, item: {}'.format(worker_id, item)) + + +@box.produce(queue="passed_name") +def produce_some_items(amount): + yield from range(amount) + + +@box.produce() +def produce_to_foobar(items): + return items + + +if __name__ == '__main__': + workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 + produce_some_items(10) + items = [{"name": f"name{i}"} for i in range(10)] + produce_to_foobar(items) + box.push_button(workers, wait=1) diff --git a/meesee.py b/meesee.py index d9785f8..aa2caa5 100644 --- a/meesee.py +++ b/meesee.py @@ -7,7 +7,7 @@ from multiprocessing import Pool config = { - "namespace": "removeme", + "namespace": "main", "key": "tasks", "redis_config": {}, "maxsize": 1000, @@ -90,12 +90,41 @@ def __len__(self): class Meesee: - worker_funcs = [] + worker_funcs = {} + + def __init__(self, workers=10, namespace="main", timeout=None, redis_config={}): + self.workers = workers + self.namespace = namespace + self.timeout = timeout + self.redis_config = redis_config @classmethod - def worker(cls): + def produce(cls, queue=None): def decorator(func): - cls.worker_funcs.append(func) + def wrapper(*args, **kwargs): + if queue: + config["key"] = queue + if "produce_to_" in func.__name__: + config["key"] = func.__name__[len("produce_to_"):] + redis_queue = RedisQueue(**config) + + for item in func(*args, **kwargs): + if isinstance(item, (list, dict)): + item = json.dumps(item) + redis_queue.send(item) + + return wrapper + return decorator + + @staticmethod + def parse_func_name(func): + return func.__name__ + + @classmethod + def worker(cls, queue=None): + def decorator(func): + parsed_name = queue if queue is not None else cls.parse_func_name(func) + cls.worker_funcs[parsed_name] = func return func return decorator @@ -107,7 +136,24 @@ def start_workers(cls, workers=10, config=config): if n_workers > workers: print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}") workers = n_workers - startapp(cls.worker_funcs, workers=workers, config=config) + + startapp(list(cls.worker_funcs.values()), workers=workers, config=config) + + def push_button(self, workers=None, wait=None): + if workers is not None: + self.workers = workers + configs = [ + { + "key": queue, + "namespace": self.namespace, + "redis_config": self.redis_config, + } for queue in self.__class__.worker_funcs.keys() + ] + if self.timeout is not None or wait is not None: + for config in configs: + config["timeout"] = self.timeout or wait + + startapp(list(self.__class__.worker_funcs.values()), workers=self.workers, config=configs) class InitFail(Exception): From 185bdf671cae7a6acc1e631323d2c4d8f2577ab3 Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 3 Aug 2024 12:12:56 +0200 Subject: [PATCH 05/12] Added producer_worker and changed config setup --- examples/example_decorator_magic.py | 13 +++++--- meesee.py | 52 ++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/examples/example_decorator_magic.py b/examples/example_decorator_magic.py index 279f42d..56d2136 100644 --- a/examples/example_decorator_magic.py +++ b/examples/example_decorator_magic.py @@ -8,8 +8,7 @@ config = { "namespace": "removeme", - "key": "tasks", - "redis_config": {}, + "key": "tasks", "redis_config": {}, "maxsize": 100, "timeout": 1, } @@ -38,13 +37,19 @@ def produce_some_items(amount): @box.produce() -def produce_to_foobar(items): +def produce_to_foo(items): return items +@box.worker_producer(input_queue="foo", output_queue="foobar") +def foo(item, worker_id): + print(f"{worker_id} {item} foo pass it too foobar") + return [item,] + + if __name__ == '__main__': workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 produce_some_items(10) items = [{"name": f"name{i}"} for i in range(10)] - produce_to_foobar(items) + produce_to_foo(items) box.push_button(workers, wait=1) diff --git a/meesee.py b/meesee.py index aa2caa5..069c075 100644 --- a/meesee.py +++ b/meesee.py @@ -6,6 +6,8 @@ from multiprocessing import Pool +from functools import wraps + config = { "namespace": "main", "key": "tasks", @@ -92,16 +94,57 @@ def __len__(self): class Meesee: worker_funcs = {} - def __init__(self, workers=10, namespace="main", timeout=None, redis_config={}): + def __init__(self, workers=10, namespace="main", timeout=None, queue="main", redis_config={}): self.workers = workers self.namespace = namespace self.timeout = timeout + self.queue = queue self.redis_config = redis_config - @classmethod - def produce(cls, queue=None): + def create_produce_config(self): + return { + "key": self.queue, + "namespace": self.namespace, + "redis_config": self.redis_config, + } + + def worker_producer(self, input_queue=None, output_queue=None): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + # Producer logic + config = self.create_produce_config() + if output_queue: + config["key"] = output_queue + elif "produce_to_" in func.__name__: + config["key"] = func.__name__[len("produce_to_"):] + + redis_queue = RedisQueue(**config) + result = func(*args, **kwargs) + + if isinstance(result, (list, tuple)): + for item in result: + if isinstance(item, (list, dict)): + item = json.dumps(item) + redis_queue.send(item) + elif result is not None: + if isinstance(result, (list, dict)): + result = json.dumps(result) + redis_queue.send(result) + + return result + + # Worker registration + parsed_name = input_queue if input_queue is not None else self.parse_func_name(func) + self.worker_funcs[parsed_name] = wrapper + + return wrapper + return decorator + + def produce(self, queue=None): def decorator(func): def wrapper(*args, **kwargs): + config = self.create_produce_config() if queue: config["key"] = queue if "produce_to_" in func.__name__: @@ -185,7 +228,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg try: func_kwargs = init_add(func_kwargs, init_items, init_kwargs) r = RedisQueue(**config) # TODO rename r - sys.stdout.write('worker {worker_id} started\n'.format(worker_id=worker_id)) + sys.stdout.write('worker {worker_id} started. {func_name} listening to {queue} \n'.format( + worker_id=worker_id, func_name=func.__name__, queue=config["key"])) for key_name, item in r: _, item = func(item.decode('utf-8'), worker_id, **func_kwargs), None except InitFail: From 8aebbfe4d58146cf258a84e59672e4f343f0b129 Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 3 Aug 2024 14:03:59 +0200 Subject: [PATCH 06/12] Added testing and signal for when workers stop due to timeout --- meesee.py | 6 +- meesee_methods_tests.py | 57 +++++++++++++++++ meesee_types_tests.py | 138 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 198 insertions(+), 3 deletions(-) create mode 100644 meesee_methods_tests.py create mode 100644 meesee_types_tests.py diff --git a/meesee.py b/meesee.py index 069c075..aea25b0 100644 --- a/meesee.py +++ b/meesee.py @@ -110,9 +110,10 @@ def create_produce_config(self): def worker_producer(self, input_queue=None, output_queue=None): def decorator(func): + @wraps(func) def wrapper(*args, **kwargs): - # Producer logic + config = self.create_produce_config() if output_queue: config["key"] = output_queue @@ -133,8 +134,6 @@ def wrapper(*args, **kwargs): redis_queue.send(result) return result - - # Worker registration parsed_name = input_queue if input_queue is not None else self.parse_func_name(func) self.worker_funcs[parsed_name] = wrapper @@ -250,6 +249,7 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg time.sleep(0.1) # Throttle restarting if config.get('timeout') is not None: + sys.stdout.write('timeout reached worker {worker_id} stopped\n'.format(worker_id=worker_id)) break diff --git a/meesee_methods_tests.py b/meesee_methods_tests.py new file mode 100644 index 0000000..57c8dce --- /dev/null +++ b/meesee_methods_tests.py @@ -0,0 +1,57 @@ +import unittest +from meesee import Meesee, config +import redis +import uuid + + +box = Meesee(workers=1, namespace="test1", timeout=2) + + +@box.worker() +def foobar(item, worker_id): + redis_client = redis.Redis(**config['redis_config']) + key = f"test1:result_test:foobar:{worker_id}:{uuid.uuid4()}" + redis_client.set(key, item) + + +class TestMeesee(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.box = box + cls.redis_client = redis.Redis(**config['redis_config']) + + def setUp(self): + self.clean_up_redis() + + def tearDown(self): + self.clean_up_redis() + + def clean_up_redis(self): + patterns = [ + "test1:result_test:foobar:*" + ] + for pattern in patterns: + for key in self.redis_client.scan_iter(pattern): + self.redis_client.delete(key) + + def test_produce_to_functionality(self): + expected = ["item1", "item2", "item3"] + + @self.box.produce() + def produce_to_foobar(items): + return items + + produce_to_foobar(expected) + self.box.push_button(workers=5, wait=3) + + # Collect results from Redis + results = [] + for key in self.redis_client.scan_iter("test1:result_test:foobar:*"): + value = self.redis_client.get(key).decode('utf-8') + results.append(value) + + self.assertEqual(sorted(results), sorted(expected)) + + +if __name__ == '__main__': + unittest.main() diff --git a/meesee_types_tests.py b/meesee_types_tests.py new file mode 100644 index 0000000..1d651be --- /dev/null +++ b/meesee_types_tests.py @@ -0,0 +1,138 @@ +import unittest +import redis +from meesee import Meesee, config + +box = Meesee(workers=5, namespace="test", timeout=2) + + +@box.worker(queue="strings") +def consume_strings(item, worker_id): + redis_client = redis.Redis(**config['redis_config']) + key = f"result_test:consume_strings:{worker_id}:{redis_client.incr('result_test:consume_strings:counter')}" + redis_client.set(key, item) + + +@box.worker(queue="integers") +def consume_integers(item, worker_id): + redis_client = redis.Redis(**config['redis_config']) + key = f"result_test:consume_integers:{worker_id}:{redis_client.incr('result_test:consume_integers:counter')}" + redis_client.set(key, str(item)) + + +@box.worker(queue="lists") +def consume_lists(item, worker_id): + redis_client = redis.Redis(**config['redis_config']) + key = f"result_test:consume_lists:{worker_id}:{redis_client.incr('result_test:consume_lists:counter')}" + redis_client.set(key, str(item)) + + +@box.worker(queue="dicts") +def consume_dicts(item, worker_id): + redis_client = redis.Redis(**config['redis_config']) + key = f"result_test:consume_dicts:{worker_id}:{redis_client.incr('result_test:consume_dicts:counter')}" + redis_client.set(key, str(item)) + + +class TestMeesee(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.box = box + cls.redis_client = redis.Redis(**config['redis_config']) + + def setUp(self): + self.clean_up_redis() + + def tearDown(self): + self.clean_up_redis() + + def clean_up_redis(self): + patterns = [ + "result_test:consume_dicts:*", + "result_test:consume_strings:*", + "result_test:consume_integers:*", + "result_test:consume_lists:*", + ] + for pattern in patterns: + for key in self.redis_client.scan_iter(pattern): + self.redis_client.delete(key) + + def test_produce_and_consume_strings(self): + expected = ["apple", "banana", "cherry"] + + @self.box.produce(queue="strings") + def produce_strings(): + return expected + + produce_strings() + self.box.push_button(workers=5, wait=1) + + results = [] + for key in self.redis_client.scan_iter("result_test:consume_strings:*"): + if key != b"result_test:consume_strings:counter": + value = self.redis_client.get(key).decode('utf-8') + results.append(value) + + self.assertEqual(sorted(results), sorted(expected)) + + def test_produce_and_consume_integers(self): + expected = [1, 2, 3, 4, 5] + + @self.box.produce(queue="integers") + def produce_integers(): + return expected + + produce_integers() + self.box.push_button(workers=5, wait=1) + + results = [] + for key in self.redis_client.scan_iter("result_test:consume_integers:*"): + if key != b"result_test:consume_integers:counter": + value = int(self.redis_client.get(key)) + results.append(value) + + self.assertEqual(sorted(results), sorted(expected)) + + def test_produce_and_consume_lists(self): + expected = [[1, 2], [3, 4], [5, 6]] + + @self.box.produce(queue="lists") + def produce_lists(): + return expected + + produce_lists() + self.box.push_button(workers=5, wait=1) + + results = [] + for key in self.redis_client.scan_iter("result_test:consume_lists:*"): + if key != b"result_test:consume_lists:counter": + value = eval(self.redis_client.get(key)) + results.append(value) + + sorted_results = sorted(results) + sorted_expected = sorted(expected) + self.assertEqual(sorted_results, sorted_expected) + + def test_produce_and_consume_dicts(self): + expected = [{"a": 1}, {"b": 2}, {"c": 3}] + + @self.box.produce(queue="dicts") + def produce_dicts(): + return expected + + produce_dicts() + self.box.push_button(workers=5, wait=1) + + results = [] + for key in self.redis_client.scan_iter("result_test:consume_dicts:*"): + if key != b"result_test:consume_dicts:counter": + value = self.redis_client.get(key) + results.append(eval(value)) # Convert string back to dict + + sorted_results = sorted(results, key=lambda x: list(x.keys())[0]) + sorted_expected = sorted(expected, key=lambda x: list(x.keys())[0]) + + self.assertEqual(sorted_results, sorted_expected) + + +if __name__ == '__main__': + unittest.main() From 05e4e1dc4efa7cddbf7f408c615a823f11484dce Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 3 Aug 2024 15:12:13 +0200 Subject: [PATCH 07/12] Added test for worker producer --- meesee_methods_tests.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/meesee_methods_tests.py b/meesee_methods_tests.py index 57c8dce..453012f 100644 --- a/meesee_methods_tests.py +++ b/meesee_methods_tests.py @@ -4,7 +4,15 @@ import uuid -box = Meesee(workers=1, namespace="test1", timeout=2) +box = Meesee(workers=2, namespace="test1", timeout=2) + + +@box.worker_producer(input_queue="foo", output_queue="foobar") +def foo(item, worker_id): + redis_client = redis.Redis(**config['redis_config']) + key = f"test1:result_test:foo:{worker_id}:{uuid.uuid4()}" + redis_client.set(key, f"foo_processed_{item}") + return [f"foo_processed_{item}"] @box.worker() @@ -28,6 +36,7 @@ def tearDown(self): def clean_up_redis(self): patterns = [ + "test1:result_test:foo:*", "test1:result_test:foobar:*" ] for pattern in patterns: @@ -44,7 +53,6 @@ def produce_to_foobar(items): produce_to_foobar(expected) self.box.push_button(workers=5, wait=3) - # Collect results from Redis results = [] for key in self.redis_client.scan_iter("test1:result_test:foobar:*"): value = self.redis_client.get(key).decode('utf-8') @@ -52,6 +60,29 @@ def produce_to_foobar(items): self.assertEqual(sorted(results), sorted(expected)) + def test_worker_producer_functionality(self): + expected = ["item1", "item2", "item3"] + + @self.box.produce(queue="foo") + def produce_to_foo(items): + return items + + produce_to_foo(expected) + self.box.push_button(workers=10, wait=3) + + foo_results = [] + for key in self.redis_client.scan_iter("test1:result_test:foo:*"): + value = self.redis_client.get(key).decode('utf-8') + foo_results.append(value) + + foobar_results = [] + for key in self.redis_client.scan_iter("test1:result_test:foobar:*"): + value = self.redis_client.get(key).decode('utf-8') + foobar_results.append(value) + + self.assertEqual(sorted(foo_results), sorted([f"foo_processed_{item}" for item in expected])) + self.assertEqual(sorted(foobar_results), sorted([f"foo_processed_{item}" for item in expected])) + if __name__ == '__main__': unittest.main() From 36797a63d3081518e1fc4b459e0707020313b3c9 Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 3 Aug 2024 15:25:25 +0200 Subject: [PATCH 08/12] Added updated ci --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dea36bb..fac008c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,7 @@ jobs: - name: Run Unit Tests run: | - coverage run -m unittest discover + coverage run --omit="*test*" -m unittest discover -p "*test*.py" - name: Upload coverage reports to Codecov, send only once if: matrix.python-version == '3.12' From e0b8f32f4b96ae0de5f7fe0bcc3cbefa6e66872a Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 3 Aug 2024 16:46:51 +0200 Subject: [PATCH 09/12] Added start_worker_tests.py --- start_worker_tests.py | 140 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 start_worker_tests.py diff --git a/start_worker_tests.py b/start_worker_tests.py new file mode 100644 index 0000000..0f34a82 --- /dev/null +++ b/start_worker_tests.py @@ -0,0 +1,140 @@ +import unittest +import sys +import io +import time + +import meesee +from meesee import run_worker, InitFail, RedisQueue + +# Stub functions +def stub_setup_init_items(func_kwargs, init_kwargs): + return {name: func_kwargs[name] for name in init_kwargs.keys()} + +def stub_init_add(func_kwargs, init_items, init_kwargs): + for name, value in init_items.items(): + if callable(value): + func_kwargs[name] = value() + else: + func_kwargs[name] = value + return func_kwargs + +class StubRedisQueue: + def __init__(self, **config): + self.config = config + self.items = [] + self.state = "normal" + self.iteration_count = 0 + + def __iter__(self): + return self + + def __next__(self): + if self.state == "exception" and self.iteration_count > 0: + raise Exception("Simulated failure") + if self.iteration_count >= len(self.items): + raise StopIteration + item = self.items[self.iteration_count] + self.iteration_count += 1 + return ("key", item.encode('utf-8')) + + def first_inline_send(self, item): + self.items.append(item) + + +class TestRunWorker(unittest.TestCase): + def setUp(self): + self.func_kwargs = {"arg1": "value1", "arg2": "value2", "init_arg": lambda: "init_value"} + self.init_kwargs = {"init_arg": "init_value"} + self.config = {"key": "test_queue", "timeout": 0.1} # Short timeout for testing + self.original_stdout = sys.stdout + sys.stdout = io.StringIO() + + # Save original implementations + self.original_RedisQueue = meesee.RedisQueue + self.original_setup_init_items = meesee.setup_init_items + self.original_init_add = meesee.init_add + + # Apply patches + meesee.RedisQueue = StubRedisQueue + meesee.setup_init_items = stub_setup_init_items + meesee.init_add = stub_init_add + + def tearDown(self): + + sys.stdout = self.original_stdout + + # Restore original implementations + meesee.RedisQueue = self.original_RedisQueue + meesee.setup_init_items = self.original_setup_init_items + meesee.init_add = self.original_init_add + + def test_normal_execution(self): + def stub_func(item, worker_id, **kwargs): + return f"Processed: {item}" + + StubRedisQueue.items = ["item1", "item2"] + StubRedisQueue.state = "normal" + run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) + + output = sys.stdout.getvalue() + self.assertIn("worker 1 started", output) + self.assertIn("listening to test_queue", output) + self.assertIn("timeout reached worker 1 stopped", output) + + def test_init_fail(self): + def stub_func(item, worker_id, **kwargs): + return "This should not be reached" + + def failing_init(): + raise Exception("Init failure") + + self.func_kwargs["init_arg"] = failing_init + StubRedisQueue.items = ["item1"] + StubRedisQueue.state = "normal" + run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) + + output = sys.stdout.getvalue() + self.assertIn("worker 1 failed reason Init failure", output) + + def test_keyboard_interrupt(self): + def stub_func(item, worker_id, **kwargs): + raise KeyboardInterrupt() + + StubRedisQueue.items = ["item1"] + StubRedisQueue.state = "normal" + run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) + + output = sys.stdout.getvalue() + self.assertIn("worker 1 stopped", output) + + def test_general_exception(self): + def stub_func(item, worker_id, **kwargs): + return f"Processed: {item}" + + def stub_on_failure_func(item, e, r, worker_id): + sys.stdout.write(f"Failure handled for item: {item}\n") + + StubRedisQueue.items = ["item1", "item2"] + StubRedisQueue.state = "exception" + StubRedisQueue.iteration_count = 0 + run_worker(stub_func, self.func_kwargs, stub_on_failure_func, self.config, 1, self.init_kwargs) + + output = sys.stdout.getvalue() + self.assertIn("worker 1 started", output) + self.assertIn("stub_func listening to test_queue", output) + self.assertIn("timeout reached worker 1 stopped", output) + + def test_timeout(self): + def stub_func(item, worker_id, **kwargs): + time.sleep(0.2) # Sleep longer than the timeout + return f"Processed: {item}" + + StubRedisQueue.items = ["item1", "item2"] + StubRedisQueue.state = "timeout" + run_worker(stub_func, self.func_kwargs, None, self.config, 1, self.init_kwargs) + + output = sys.stdout.getvalue() + self.assertIn("timeout reached worker 1 stopped", output) + +if __name__ == '__main__': + unittest.main() From a04be385f41d107de61f3c0d0ef8e14b17c227ec Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 10 Aug 2024 14:56:08 +0200 Subject: [PATCH 10/12] Pep8 unto thyself:) --- start_worker_tests.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/start_worker_tests.py b/start_worker_tests.py index 0f34a82..cbdadf9 100644 --- a/start_worker_tests.py +++ b/start_worker_tests.py @@ -4,12 +4,13 @@ import time import meesee -from meesee import run_worker, InitFail, RedisQueue +from meesee import run_worker + -# Stub functions def stub_setup_init_items(func_kwargs, init_kwargs): return {name: func_kwargs[name] for name in init_kwargs.keys()} + def stub_init_add(func_kwargs, init_items, init_kwargs): for name, value in init_items.items(): if callable(value): @@ -18,6 +19,7 @@ def stub_init_add(func_kwargs, init_items, init_kwargs): func_kwargs[name] = value return func_kwargs + class StubRedisQueue: def __init__(self, **config): self.config = config @@ -45,7 +47,7 @@ class TestRunWorker(unittest.TestCase): def setUp(self): self.func_kwargs = {"arg1": "value1", "arg2": "value2", "init_arg": lambda: "init_value"} self.init_kwargs = {"init_arg": "init_value"} - self.config = {"key": "test_queue", "timeout": 0.1} # Short timeout for testing + self.config = {"key": "test_queue", "timeout": 0.1} self.original_stdout = sys.stdout sys.stdout = io.StringIO() @@ -117,7 +119,9 @@ def stub_on_failure_func(item, e, r, worker_id): StubRedisQueue.items = ["item1", "item2"] StubRedisQueue.state = "exception" StubRedisQueue.iteration_count = 0 + self.config["timeout"] = 2 run_worker(stub_func, self.func_kwargs, stub_on_failure_func, self.config, 1, self.init_kwargs) + self.config["timeout"] = 0.1 output = sys.stdout.getvalue() self.assertIn("worker 1 started", output) @@ -126,7 +130,8 @@ def stub_on_failure_func(item, e, r, worker_id): def test_timeout(self): def stub_func(item, worker_id, **kwargs): - time.sleep(0.2) # Sleep longer than the timeout + # Sleep longer than the timeout + time.sleep(0.2) return f"Processed: {item}" StubRedisQueue.items = ["item1", "item2"] From 9a287514900e5b79b5d94b4792b13a99315e2b67 Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 10 Aug 2024 14:57:23 +0200 Subject: [PATCH 11/12] Added test for multiple functions and configs --- start_worker_tests.py | 44 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/start_worker_tests.py b/start_worker_tests.py index cbdadf9..eb9909f 100644 --- a/start_worker_tests.py +++ b/start_worker_tests.py @@ -141,5 +141,49 @@ def stub_func(item, worker_id, **kwargs): output = sys.stdout.getvalue() self.assertIn("timeout reached worker 1 stopped", output) + def test_multiple_funcs_and_configs(self): + def func_a(item, worker_id, **kwargs): + return f"func_a processed {item} with worker {worker_id}" + + def func_b(item, worker_id, **kwargs): + return f"func_b processed {item} with worker {worker_id}" + + def func_c(item, worker_id, **kwargs): + return f"func_c processed {item} with worker {worker_id}" + + funcs = [func_a, func_b, func_c] + + config_a = {"key": "queue_a", "timeout": 0.1} + config_b = {"key": "queue_b", "timeout": 0.1} + config_c = {"key": "queue_c", "timeout": 0.1} + + configs = [config_a, config_b, config_c] + + def stub_on_failure_func(item, e, r, worker_id): + sys.stdout.write(f"Failure handled for item: {item} on worker {worker_id}\n") + + StubRedisQueue.items = ["item1", "item2"] + StubRedisQueue.state = "normal" + + # Reset stdout capture for each iteration + sys.stdout = io.StringIO() + + for worker_id in range(5): + run_worker(funcs, self.func_kwargs, stub_on_failure_func, configs, worker_id, self.init_kwargs) + + output = sys.stdout.getvalue() + expected_func = funcs[worker_id % len(funcs)].__name__ + expected_queue = configs[worker_id % len(configs)]["key"] + + self.assertIn(f"worker {worker_id} started", output) + self.assertIn(f"{expected_func} listening to {expected_queue}", output) + self.assertIn("timeout reached worker", output) + + if "item1" in output: + self.assertIn(f"{expected_func} processed item1 with worker {worker_id}", output) + if "item2" in output: + self.assertIn(f"{expected_func} processed item2 with worker {worker_id}", output) + + if __name__ == '__main__': unittest.main() From daad10520f2be3d210a45994d7c0b8199e9da8d1 Mon Sep 17 00:00:00 2001 From: Attumm Date: Sat, 10 Aug 2024 22:04:54 +0200 Subject: [PATCH 12/12] Added new section for decorators magic --- README.md | 109 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 106 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a3dae16..e56b163 100644 --- a/README.md +++ b/README.md @@ -58,15 +58,118 @@ produce(10) Great, the placement of both scripts can be on any machine with connectivity to the redis instance. +## Example Usage + +Let's use Python to make writing workers and producers more fun. +Here's a simple example demonstrating how to use Meesee the pythonic way. + +```python +from meesee import Meesee + +box = Meesee() + +@box.worker() +def foobar(item, worker_id): + print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item)) + +@box.produce() +def produce_to_foobar(items): + return items + +if __name__ == '__main__': + items = [{"name": f"name{i}"} for i in range(10)] + produce_to_foobar(items) + box.push_button(workers=5, wait=1) +``` + +This example demonstrates: +1. Creating a Meesee instance +2. Defining a worker function using the `@box.worker()` decorator +3. Defining a producer function using the `@box.produce()` decorator +4. Producing items to the queue +5. Starting workers to process the items + + + +Example output +```bash +worker 1 started. foobar listening to foobar +worker 2 started. foobar listening to foobar +worker 3 started. foobar listening to foobar +worker 4 started. foobar listening to foobar +func: foobar, worker_id: 1, item: {"name": "name0"} +func: foobar, worker_id: 1, item: {"name": "name1"} +worker 5 started. foobar listening to foobar +func: foobar, worker_id: 2, item: {"name": "name4"} +func: foobar, worker_id: 3, item: {"name": "name2"} +func: foobar, worker_id: 4, item: {"name": "name3"} +func: foobar, worker_id: 1, item: {"name": "name5"} +func: foobar, worker_id: 1, item: {"name": "name6"} +func: foobar, worker_id: 3, item: {"name": "name7"} +func: foobar, worker_id: 4, item: {"name": "name8"} +func: foobar, worker_id: 2, item: {"name": "name9"} +timeout reached worker 5 stopped +timeout reached worker 2 stopped +timeout reached worker 1 stopped +timeout reached worker 4 stopped +timeout reached worker 3 stopped +Clean shut down +``` + +This output shows: +- Workers starting and listening to the 'foobar' queue +- Items being processed by different workers +- Workers shutting down after the timeout is reached + +## Usage explained + + +Producers produce to workers, hence the name. They can either pass iterable values or iter themselves. For instance: + +```python +@box.produce() +def produce(): + return [1, 2, 3] + +# or + +@box.produce() +def produce_yield(): + yield from [1, 2, 3] +``` + +We can control which queue they will message to in two ways: + +1. Specify the queue in the decorator: +```python +@box.produce(queue="foobar") +def produce_yield(): + yield from [1, 2, 3] +``` +This will produce to the "foobar" queue. + +2. Use magic naming: +```python +@box.produce() +def produce_to_foobar(): + yield from [1, 2, 3] +``` +By naming our function `produce_to_foobar`, the function will also send the data to the "foobar" queue. + +For workers, ours are special in that they will start during multiprocessing. Here's an example to start 5 workers. Since we only set up one worker, all workers will be of that type: + +```python +box.push_button(workers=5, wait=1) +``` + +This will start 5 worker processes, each listening to the queue specified in the worker function. ### Installing -Create a virtualenv for your project. Install meesee: ``` -$ . /path/to/virtualenv/bin/activate -$ pip install meesee +$ pip install meesee ``` ### Prerequisites