Skip to content

Commit

Permalink
Added testing and signal for when workers stop due to timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Attumm committed Aug 3, 2024
1 parent 185bdf6 commit 8aebbfe
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 3 deletions.
6 changes: 3 additions & 3 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ def create_produce_config(self):

def worker_producer(self, input_queue=None, output_queue=None):
def decorator(func):

@wraps(func)
def wrapper(*args, **kwargs):
# Producer logic

config = self.create_produce_config()
if output_queue:
config["key"] = output_queue
Expand All @@ -133,8 +134,6 @@ def wrapper(*args, **kwargs):
redis_queue.send(result)

return result

# Worker registration
parsed_name = input_queue if input_queue is not None else self.parse_func_name(func)
self.worker_funcs[parsed_name] = wrapper

Expand Down Expand Up @@ -250,6 +249,7 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg
time.sleep(0.1) # Throttle restarting

if config.get('timeout') is not None:
sys.stdout.write('timeout reached worker {worker_id} stopped\n'.format(worker_id=worker_id))
break


Expand Down
57 changes: 57 additions & 0 deletions meesee_methods_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import unittest
from meesee import Meesee, config
import redis
import uuid


box = Meesee(workers=1, namespace="test1", timeout=2)


@box.worker()
def foobar(item, worker_id):
redis_client = redis.Redis(**config['redis_config'])
key = f"test1:result_test:foobar:{worker_id}:{uuid.uuid4()}"
redis_client.set(key, item)


class TestMeesee(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.box = box
cls.redis_client = redis.Redis(**config['redis_config'])

def setUp(self):
self.clean_up_redis()

def tearDown(self):
self.clean_up_redis()

def clean_up_redis(self):
patterns = [
"test1:result_test:foobar:*"
]
for pattern in patterns:
for key in self.redis_client.scan_iter(pattern):
self.redis_client.delete(key)

def test_produce_to_functionality(self):
expected = ["item1", "item2", "item3"]

@self.box.produce()
def produce_to_foobar(items):
return items

produce_to_foobar(expected)
self.box.push_button(workers=5, wait=3)

# Collect results from Redis
results = []
for key in self.redis_client.scan_iter("test1:result_test:foobar:*"):
value = self.redis_client.get(key).decode('utf-8')
results.append(value)

self.assertEqual(sorted(results), sorted(expected))


if __name__ == '__main__':
unittest.main()
138 changes: 138 additions & 0 deletions meesee_types_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import unittest
import redis
from meesee import Meesee, config

box = Meesee(workers=5, namespace="test", timeout=2)


@box.worker(queue="strings")
def consume_strings(item, worker_id):
redis_client = redis.Redis(**config['redis_config'])
key = f"result_test:consume_strings:{worker_id}:{redis_client.incr('result_test:consume_strings:counter')}"
redis_client.set(key, item)


@box.worker(queue="integers")
def consume_integers(item, worker_id):
redis_client = redis.Redis(**config['redis_config'])
key = f"result_test:consume_integers:{worker_id}:{redis_client.incr('result_test:consume_integers:counter')}"
redis_client.set(key, str(item))


@box.worker(queue="lists")
def consume_lists(item, worker_id):
redis_client = redis.Redis(**config['redis_config'])
key = f"result_test:consume_lists:{worker_id}:{redis_client.incr('result_test:consume_lists:counter')}"
redis_client.set(key, str(item))


@box.worker(queue="dicts")
def consume_dicts(item, worker_id):
redis_client = redis.Redis(**config['redis_config'])
key = f"result_test:consume_dicts:{worker_id}:{redis_client.incr('result_test:consume_dicts:counter')}"
redis_client.set(key, str(item))


class TestMeesee(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.box = box
cls.redis_client = redis.Redis(**config['redis_config'])

def setUp(self):
self.clean_up_redis()

def tearDown(self):
self.clean_up_redis()

def clean_up_redis(self):
patterns = [
"result_test:consume_dicts:*",
"result_test:consume_strings:*",
"result_test:consume_integers:*",
"result_test:consume_lists:*",
]
for pattern in patterns:
for key in self.redis_client.scan_iter(pattern):
self.redis_client.delete(key)

def test_produce_and_consume_strings(self):
expected = ["apple", "banana", "cherry"]

@self.box.produce(queue="strings")
def produce_strings():
return expected

produce_strings()
self.box.push_button(workers=5, wait=1)

results = []
for key in self.redis_client.scan_iter("result_test:consume_strings:*"):
if key != b"result_test:consume_strings:counter":
value = self.redis_client.get(key).decode('utf-8')
results.append(value)

self.assertEqual(sorted(results), sorted(expected))

def test_produce_and_consume_integers(self):
expected = [1, 2, 3, 4, 5]

@self.box.produce(queue="integers")
def produce_integers():
return expected

produce_integers()
self.box.push_button(workers=5, wait=1)

results = []
for key in self.redis_client.scan_iter("result_test:consume_integers:*"):
if key != b"result_test:consume_integers:counter":
value = int(self.redis_client.get(key))
results.append(value)

self.assertEqual(sorted(results), sorted(expected))

def test_produce_and_consume_lists(self):
expected = [[1, 2], [3, 4], [5, 6]]

@self.box.produce(queue="lists")
def produce_lists():
return expected

produce_lists()
self.box.push_button(workers=5, wait=1)

results = []
for key in self.redis_client.scan_iter("result_test:consume_lists:*"):
if key != b"result_test:consume_lists:counter":
value = eval(self.redis_client.get(key))
results.append(value)

sorted_results = sorted(results)
sorted_expected = sorted(expected)
self.assertEqual(sorted_results, sorted_expected)

def test_produce_and_consume_dicts(self):
expected = [{"a": 1}, {"b": 2}, {"c": 3}]

@self.box.produce(queue="dicts")
def produce_dicts():
return expected

produce_dicts()
self.box.push_button(workers=5, wait=1)

results = []
for key in self.redis_client.scan_iter("result_test:consume_dicts:*"):
if key != b"result_test:consume_dicts:counter":
value = self.redis_client.get(key)
results.append(eval(value)) # Convert string back to dict

sorted_results = sorted(results, key=lambda x: list(x.keys())[0])
sorted_expected = sorted(expected, key=lambda x: list(x.keys())[0])

self.assertEqual(sorted_results, sorted_expected)


if __name__ == '__main__':
unittest.main()

0 comments on commit 8aebbfe

Please sign in to comment.