Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Updates the worker process to handle its own timeout, removing the ne…
Browse files Browse the repository at this point in the history
…ed for some work in the guard loop. Also removes the double duty of timer as a timer and a status, status is now a dedicated enum
  • Loading branch information
stumpylog committed Aug 10, 2022
1 parent 9838d0d commit 0612a70
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 31 deletions.
58 changes: 38 additions & 20 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Standard
import ast
import enum
import inspect
import pydoc
import signal
Expand Down Expand Up @@ -46,6 +47,15 @@
from django_q.signals import post_execute, pre_execute
from django_q.signing import BadSignature, SignedPackage
from django_q.status import Stat, Status
from django_q.timeouts import JobTimeoutException, UnixSignalDeathPenalty


class WorkerStatus(enum.IntEnum):
IDLE = 1
BUSY = 2
RECYCLE = 3
TIMEOUT = 4
STARTING = 5


class Cluster:
Expand Down Expand Up @@ -192,7 +202,7 @@ def spawn_process(self, target, *args) -> Process:
p.daemon = True
if target == worker:
p.daemon = Conf.DAEMONIZE_WORKERS
p.timer = args[2]
p.status = args[2]
self.pool.append(p)
p.start()
return p
Expand All @@ -202,7 +212,7 @@ def spawn_pusher(self) -> Process:

def spawn_worker(self):
self.spawn_process(
worker, self.task_queue, self.result_queue, Value("f", -1), self.timeout
worker, self.task_queue, self.result_queue, Value('I', WorkerStatus.IDLE.value), self.timeout
)

def spawn_monitor(self) -> Process:
Expand All @@ -225,11 +235,11 @@ def reincarnate(self, process):
else:
self.pool.remove(process)
self.spawn_worker()
if process.timer.value == 0:
if process.status.value == WorkerStatus.TIMEOUT.value:
# only need to terminate on timeout, otherwise we risk destabilizing the queues
process.terminate()
logger.warning(_(f"reincarnated worker {process.name} after timeout"))
elif int(process.timer.value) == -2:
elif process.status.value == WorkerStatus.RECYCLE.value:
logger.info(_(f"recycled worker {process.name}"))
else:
logger.error(_(f"reincarnated worker {process.name} after death"))
Expand Down Expand Up @@ -267,14 +277,11 @@ def guard(self):
while not self.stop_event.is_set() or not counter:
# Check Workers
for p in self.pool:
with p.timer.get_lock():
with p.status.get_lock():
# Are you alive?
if not p.is_alive() or p.timer.value == 0:
if not p.is_alive() or p.status.value in {WorkerStatus.TIMEOUT.value, WorkerStatus.RECYCLE.value}:
self.reincarnate(p)
continue
# Decrement timer if work is being done
if p.timer.value > 0:
p.timer.value -= cycle
# Check Monitor
if not self.monitor.is_alive():
self.reincarnate(self.monitor)
Expand Down Expand Up @@ -401,24 +408,27 @@ def monitor(result_queue: Queue, broker: Broker = None):


def worker(
task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT
task_queue: Queue, result_queue: Queue, status: Value, timeout: int = Conf.TIMEOUT
):
"""
Takes a task from the task queue, tries to execute it and puts the result back in the result queue
:param timeout: number of seconds wait for a worker to finish.
:type task_queue: multiprocessing.Queue
:type result_queue: multiprocessing.Queue
:type timer: multiprocessing.Value
:type timer: multiprocessing.Value wrapping an unsigned int
"""
name = current_process().name
logger.info(_(f"{name} ready for work at {current_process().pid}"))
task_count = 0
if timeout is None:
timeout = -1
# If signal.alarm timeout is 0, no alarm will be scheduled
# by signal.alarm
timeout = 0
# Start reading the task queue
for task in iter(task_queue.get, "STOP"):
result = None
timer.value = -1 # Idle
# Got a task package, but have not yet called the work
status.value = WorkerStatus.STARTING.value
task_count += 1
# Get the function from the task
logger.info(_(f'{name} processing [{task["name"]}]'))
Expand All @@ -427,30 +437,38 @@ def worker(
if not callable(task["func"]):
f = pydoc.locate(f)
close_old_django_connections()
timer_value = task.pop("timeout", timeout)
timeout = task.pop("timeout", timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
timer.value = timer_value # Busy
status.value = WorkerStatus.BUSY.value
try:
res = f(*task["args"], **task["kwargs"])
result = (res, True)
with UnixSignalDeathPenalty(timeout=timeout):
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except Exception as e:
if isinstance(e, JobTimeoutException):
status.value = WorkerStatus.TIMEOUT.value
result = (f"{e} : {traceback.format_exc()}", False)
if error_reporter:
error_reporter.report()
if task.get("sync", False):
raise
with timer.get_lock():
with status.get_lock():
# Process result
task["result"] = result[0]
task["success"] = result[1]
task["stopped"] = timezone.now()
result_queue.put(task)
timer.value = -1 # Idle
# If the worker didn't timeout, go back to idle
# Otherwise, break out of the loop
if status.value != WorkerStatus.TIMEOUT.value:
status.value = WorkerStatus.IDLE.value
else:
break
# Recycle
if task_count == Conf.RECYCLE or rss_check():
timer.value = -2 # Recycled
status.value = WorkerStatus.RECYCLE.value
break
logger.info(_(f"{name} stopped doing work"))

Expand Down
3 changes: 2 additions & 1 deletion django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from django_q.queues import Queue
from django_q.signals import pre_enqueue
from django_q.signing import SignedPackage
from django_q.cluster import WorkerStatus


def async_task(func, *args, **kwargs):
Expand Down Expand Up @@ -762,7 +763,7 @@ def _sync(pack):
task = SignedPackage.loads(pack)
task_queue.put(task)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue)
task_queue.close()
Expand Down
16 changes: 8 additions & 8 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
sys.path.insert(0, myPath + "/../")

from django_q.brokers import Broker, get_broker
from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker
from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker, WorkerStatus
from django_q.conf import Conf
from django_q.humanhash import DEFAULT_WORDLIST, uuid
from django_q.models import Success, Task
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_cluster(broker):
assert queue_size(broker=broker) == 0
# Test work
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert task_queue.qsize() == 0
assert result_queue.qsize() == 1
# Test monitor
Expand Down Expand Up @@ -227,7 +227,7 @@ def test_enqueue(broker, admin_user):
assert fetch_group("test_j", count=2, wait=10) is None
# let a worker handle them
result_queue = Queue()
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert result_queue.qsize() == task_count
result_queue.put("STOP")
# store the results
Expand Down Expand Up @@ -437,7 +437,7 @@ def test_recycle(broker, monkeypatch):
pusher(task_queue, stop_event, broker=broker)
pusher(task_queue, stop_event, broker=broker)
# worker should exit on recycle
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
# check if the work has been done
assert result_queue.qsize() == 2
# save_limit test
Expand Down Expand Up @@ -472,7 +472,7 @@ def test_max_rss(broker, monkeypatch):
# push the task
pusher(task_queue, stop_event, broker=broker)
# worker should exit on recycle
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
# check if the work has been done
assert result_queue.qsize() == 1
# save_limit test
Expand Down Expand Up @@ -508,7 +508,7 @@ def test_bad_secret(broker, monkeypatch):
worker(
task_queue,
result_queue,
Value("f", -1),
Value("I", WorkerStatus.IDLE.value),
)
assert result_queue.qsize() == 0
broker.delete_queue()
Expand Down Expand Up @@ -693,7 +693,7 @@ def handler(sender, task, func, **kwargs):
event.set()
pusher(task_queue, event, broker=broker)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue, broker)
broker.delete_queue()
Expand Down Expand Up @@ -722,7 +722,7 @@ def handler(sender, task, **kwargs):
event.set()
pusher(task_queue, event, broker=broker)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue, broker)
broker.delete_queue()
Expand Down
4 changes: 2 additions & 2 deletions django_q/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.utils.timezone import is_naive

from django_q.brokers import Broker, get_broker
from django_q.cluster import monitor, pusher, scheduler, worker, localtime
from django_q.cluster import WorkerStatus, localtime, monitor, pusher, scheduler, worker
from django_q.conf import Conf
from django_q.queues import Queue
from django_q.tasks import Schedule, fetch
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_scheduler(broker, monkeypatch):
task_queue.put("STOP")
# let a worker handle them
result_queue = Queue()
worker(task_queue, result_queue, Value("b", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert result_queue.qsize() == 1
result_queue.put("STOP")
# store the results
Expand Down
66 changes: 66 additions & 0 deletions django_q/timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Using signal, implements a alarm based callback after a certain amount of time.
Borrowed from rq: https://github.com/rq/rq/blob/master/rq/timeouts.py
"""
import signal


class JobTimeoutException(Exception):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value.
"""
pass


class BaseDeathPenalty:
"""Base class to setup job timeouts."""

def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
self._timeout = timeout
self._exception = exception

def __enter__(self):
self.setup_death_penalty()

def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except JobTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
pass

# __exit__ may return True to supress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, BaseTimeoutException being handled normally to the
# invoking context.
return False

def setup_death_penalty(self):
raise NotImplementedError()

def cancel_death_penalty(self):
raise NotImplementedError()


class UnixSignalDeathPenalty(BaseDeathPenalty):

def handle_death_penalty(self, signum, frame):
raise self._exception('Task exceeded maximum timeout value '
'({0} seconds)'.format(self._timeout))

def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
an exception after the timeout amount (expressed in seconds).
"""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)

def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)

0 comments on commit 0612a70

Please sign in to comment.