Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Feature: Worker timeout improvements #2

Merged
merged 4 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 54 additions & 29 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Standard
import ast
import enum
import inspect
import pydoc
import signal
Expand Down Expand Up @@ -46,6 +47,15 @@
from django_q.signals import post_execute, pre_execute
from django_q.signing import BadSignature, SignedPackage
from django_q.status import Stat, Status
from django_q.timeouts import JobTimeoutException, UnixSignalDeathPenalty


class WorkerStatus(enum.IntEnum):
IDLE = 1
BUSY = 2
RECYCLE = 3
TIMEOUT = 4
STARTING = 5


class Cluster:
Expand Down Expand Up @@ -192,7 +202,7 @@ def spawn_process(self, target, *args) -> Process:
p.daemon = True
if target == worker:
p.daemon = Conf.DAEMONIZE_WORKERS
p.timer = args[2]
p.status = args[2]
self.pool.append(p)
p.start()
return p
Expand All @@ -202,7 +212,7 @@ def spawn_pusher(self) -> Process:

def spawn_worker(self):
self.spawn_process(
worker, self.task_queue, self.result_queue, Value("f", -1), self.timeout
worker, self.task_queue, self.result_queue, Value('I', WorkerStatus.IDLE.value), self.timeout
)

def spawn_monitor(self) -> Process:
Expand All @@ -225,11 +235,11 @@ def reincarnate(self, process):
else:
self.pool.remove(process)
self.spawn_worker()
if process.timer.value == 0:
if process.status.value == WorkerStatus.TIMEOUT.value:
# only need to terminate on timeout, otherwise we risk destabilizing the queues
process.terminate()
logger.warning(_(f"reincarnated worker {process.name} after timeout"))
elif int(process.timer.value) == -2:
elif process.status.value == WorkerStatus.RECYCLE.value:
logger.info(_(f"recycled worker {process.name}"))
else:
logger.error(_(f"reincarnated worker {process.name} after death"))
Expand Down Expand Up @@ -267,14 +277,11 @@ def guard(self):
while not self.stop_event.is_set() or not counter:
# Check Workers
for p in self.pool:
with p.timer.get_lock():
with p.status.get_lock():
# Are you alive?
if not p.is_alive() or p.timer.value == 0:
if not p.is_alive() or p.status.value in {WorkerStatus.TIMEOUT.value, WorkerStatus.RECYCLE.value}:
self.reincarnate(p)
continue
# Decrement timer if work is being done
if p.timer.value > 0:
p.timer.value -= cycle
# Check Monitor
if not self.monitor.is_alive():
self.reincarnate(self.monitor)
Expand Down Expand Up @@ -401,24 +408,24 @@ def monitor(result_queue: Queue, broker: Broker = None):


def worker(
task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT
task_queue: Queue, result_queue: Queue, status: Value, timeout: int = Conf.TIMEOUT
):
"""
Takes a task from the task queue, tries to execute it and puts the result back in the result queue
:param timeout: number of seconds wait for a worker to finish.
:type task_queue: multiprocessing.Queue
:type result_queue: multiprocessing.Queue
:type timer: multiprocessing.Value
:type timer: multiprocessing.Value wrapping an unsigned int
"""
name = current_process().name
logger.info(_(f"{name} ready for work at {current_process().pid}"))
task_count = 0
if timeout is None:
timeout = -1
# Start reading the task queue
for task in iter(task_queue.get, "STOP"):
result = None
timer.value = -1 # Idle
timed_out = False
# Got a task package, but have not yet called the work
status.value = WorkerStatus.STARTING.value
task_count += 1
# Get the function from the task
logger.info(_(f'{name} processing [{task["name"]}]'))
Expand All @@ -427,31 +434,49 @@ def worker(
if not callable(task["func"]):
f = pydoc.locate(f)
close_old_django_connections()
timer_value = task.pop("timeout", timeout)
timeout = task.pop("timeout", timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
timer.value = timer_value # Busy
status.value = WorkerStatus.BUSY.value
try:
res = f(*task["args"], **task["kwargs"])
result = (res, True)
with UnixSignalDeathPenalty(timeout=timeout):
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except Exception as e:
# Minor QoL, parse the exception chain as far as
# possible to check if this was a timeout or just an error
exception_chain = [e]
next_exception = e.__cause__
while next_exception is not None:
exception_chain.append(next_exception)
next_exception = next_exception.__cause__
if any(isinstance(x, JobTimeoutException) for x in exception_chain):
timed_out = True
result = (f"{e} : {traceback.format_exc()}", False)
if error_reporter:
error_reporter.report()
if task.get("sync", False):
raise
with timer.get_lock():
# Process result
task["result"] = result[0]
task["success"] = result[1]
task["stopped"] = timezone.now()
result_queue.put(task)
timer.value = -1 # Idle
# Recycle
if task_count == Conf.RECYCLE or rss_check():
timer.value = -2 # Recycled
break
finally:
with status.get_lock():
# Process result
if result is None:
result = (None, False)
task["result"] = result[0]
task["success"] = result[1]
task["stopped"] = timezone.now()
result_queue.put(task)

if timed_out:
status.value = WorkerStatus.TIMEOUT.value
break
elif task_count == Conf.RECYCLE or rss_check():
status.value = WorkerStatus.RECYCLE.value
break
else:
status.value = WorkerStatus.IDLE.value

logger.info(_(f"{name} stopped doing work"))


Expand Down
3 changes: 2 additions & 1 deletion django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,13 +756,14 @@ def fetch_group(self, failures=True, wait=0, count=None):
def _sync(pack):
"""Simulate a package travelling through the cluster."""
from django_q.cluster import monitor, worker
from django_q.cluster import WorkerStatus

task_queue = Queue()
result_queue = Queue()
task = SignedPackage.loads(pack)
task_queue.put(task)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue)
task_queue.close()
Expand Down
16 changes: 8 additions & 8 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
sys.path.insert(0, myPath + "/../")

from django_q.brokers import Broker, get_broker
from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker
from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker, WorkerStatus
from django_q.conf import Conf
from django_q.humanhash import DEFAULT_WORDLIST, uuid
from django_q.models import Success, Task
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_cluster(broker):
assert queue_size(broker=broker) == 0
# Test work
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert task_queue.qsize() == 0
assert result_queue.qsize() == 1
# Test monitor
Expand Down Expand Up @@ -227,7 +227,7 @@ def test_enqueue(broker, admin_user):
assert fetch_group("test_j", count=2, wait=10) is None
# let a worker handle them
result_queue = Queue()
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert result_queue.qsize() == task_count
result_queue.put("STOP")
# store the results
Expand Down Expand Up @@ -437,7 +437,7 @@ def test_recycle(broker, monkeypatch):
pusher(task_queue, stop_event, broker=broker)
pusher(task_queue, stop_event, broker=broker)
# worker should exit on recycle
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
# check if the work has been done
assert result_queue.qsize() == 2
# save_limit test
Expand Down Expand Up @@ -472,7 +472,7 @@ def test_max_rss(broker, monkeypatch):
# push the task
pusher(task_queue, stop_event, broker=broker)
# worker should exit on recycle
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
# check if the work has been done
assert result_queue.qsize() == 1
# save_limit test
Expand Down Expand Up @@ -508,7 +508,7 @@ def test_bad_secret(broker, monkeypatch):
worker(
task_queue,
result_queue,
Value("f", -1),
Value("I", WorkerStatus.IDLE.value),
)
assert result_queue.qsize() == 0
broker.delete_queue()
Expand Down Expand Up @@ -693,7 +693,7 @@ def handler(sender, task, func, **kwargs):
event.set()
pusher(task_queue, event, broker=broker)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue, broker)
broker.delete_queue()
Expand Down Expand Up @@ -722,7 +722,7 @@ def handler(sender, task, **kwargs):
event.set()
pusher(task_queue, event, broker=broker)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue, broker)
broker.delete_queue()
Expand Down
4 changes: 2 additions & 2 deletions django_q/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.utils.timezone import is_naive

from django_q.brokers import Broker, get_broker
from django_q.cluster import monitor, pusher, scheduler, worker, localtime
from django_q.cluster import WorkerStatus, localtime, monitor, pusher, scheduler, worker
from django_q.conf import Conf
from django_q.queues import Queue
from django_q.tasks import Schedule, fetch
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_scheduler(broker, monkeypatch):
task_queue.put("STOP")
# let a worker handle them
result_queue = Queue()
worker(task_queue, result_queue, Value("b", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert result_queue.qsize() == 1
result_queue.put("STOP")
# store the results
Expand Down
70 changes: 70 additions & 0 deletions django_q/timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Using signal, implements a alarm based callback after a certain amount of time.
Borrowed from rq: https://github.com/rq/rq/blob/master/rq/timeouts.py
"""
import signal


class JobTimeoutException(Exception):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value.
"""
pass


class BaseDeathPenalty:
"""Base class to setup job timeouts."""

def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
# If signal.alarm timeout is 0, no alarm will be scheduled
# by signal.alarm
if timeout is None:
timeout = 0
self._timeout = timeout
self._exception = exception

def __enter__(self):
self.setup_death_penalty()

def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except JobTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
pass

# __exit__ may return True to supress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, BaseTimeoutException being handled normally to the
# invoking context.
return False

def setup_death_penalty(self):
raise NotImplementedError()

def cancel_death_penalty(self):
raise NotImplementedError()


class UnixSignalDeathPenalty(BaseDeathPenalty):

def handle_death_penalty(self, signum, frame):
raise self._exception('Task exceeded maximum timeout value '
'({0} seconds)'.format(self._timeout))

def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
an exception after the timeout amount (expressed in seconds).
"""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)

def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)