diff --git a/contentcuration/contentcuration/management/commands/reconcile_change_tasks.py b/contentcuration/contentcuration/management/commands/reconcile_change_tasks.py index 333ca221a8..6fde77c781 100644 --- a/contentcuration/contentcuration/management/commands/reconcile_change_tasks.py +++ b/contentcuration/contentcuration/management/commands/reconcile_change_tasks.py @@ -19,11 +19,7 @@ def handle(self, *args, **options): from contentcuration.tasks import apply_channel_changes_task from contentcuration.tasks import apply_user_changes_task - active_task_ids = [] - for worker_name, tasks in app.control.inspect().active().items(): - active_task_ids.extend(task['id'] for task in tasks) - for worker_name, tasks in app.control.inspect().reserved().items(): - active_task_ids.extend(task['id'] for task in tasks) + active_task_ids = [task['id'] for task in app.get_active_and_reserved_tasks()] channel_changes = Change.objects.filter(channel_id__isnull=False, applied=False, errored=False) \ .order_by('channel_id', 'created_by_id') \ diff --git a/contentcuration/contentcuration/urls.py b/contentcuration/contentcuration/urls.py index 4f0e5358e6..a39f750883 100644 --- a/contentcuration/contentcuration/urls.py +++ b/contentcuration/contentcuration/urls.py @@ -78,6 +78,8 @@ def get_redirect_url(self, *args, **kwargs): re_path(r'^api/probers/get_prober_channel', views.get_prober_channel, name='get_prober_channel'), re_path(r'^api/probers/publishing_status', views.publishing_status, name='publishing_status'), re_path(r'^api/probers/celery_worker_status', views.celery_worker_status, name='celery_worker_status'), + re_path(r'^api/probers/task_queue_status', views.task_queue_status, name='task_queue_status'), + re_path(r'^api/probers/unapplied_changes_status', views.unapplied_changes_status, name='unapplied_changes_status'), re_path(r'^api/sync/$', SyncView.as_view(), name="sync"), ] diff --git a/contentcuration/contentcuration/utils/celery/app.py b/contentcuration/contentcuration/utils/celery/app.py index 521584bd91..9d8c260700 100644 --- a/contentcuration/contentcuration/utils/celery/app.py +++ b/contentcuration/contentcuration/utils/celery/app.py @@ -1,9 +1,11 @@ import base64 import json +import redis.exceptions from celery import Celery from contentcuration.utils.celery.tasks import CeleryTask +from contentcuration.utils.sentry import report_exception class CeleryApp(Celery): @@ -50,10 +52,30 @@ def count_queued_tasks(self, queue_name="celery"): :param queue_name: The queue name, defaults to the default "celery" queue :return: int """ - with self.pool.acquire(block=True) as conn: - count = conn.default_channel.client.llen(queue_name) + count = 0 + try: + with self.pool.acquire(block=True) as conn: + count = conn.default_channel.client.llen(queue_name) + except redis.exceptions.RedisError as e: + # log these so we can get visibility into the reliability of the redis connection + report_exception(e) + pass return count + def get_active_and_reserved_tasks(self): + """ + Iterate over active and reserved tasks + :return: A list of dictionaries + """ + active = self.control.inspect().active() or {} + for _, tasks in active.items(): + for task in tasks: + yield task + reserved = self.control.inspect().reserved() or {} + for _, tasks in reserved.items(): + for task in tasks: + yield task + def decode_result(self, result, status=None): """ Decodes the celery result, like the raw result from the database, using celery tools diff --git a/contentcuration/contentcuration/views/base.py b/contentcuration/contentcuration/views/base.py index 3347513239..71b99a6fcd 100644 --- a/contentcuration/contentcuration/views/base.py +++ b/contentcuration/contentcuration/views/base.py @@ -173,6 +173,39 @@ def celery_worker_status(request): return Response(app.control.inspect().ping() or {}) +@api_view(["GET"]) +@authentication_classes((TokenAuthentication, SessionAuthentication)) +@permission_classes((IsAuthenticated,)) +def task_queue_status(request): + if not request.user.is_admin: + return HttpResponseForbidden() + + from contentcuration.celery import app + + return Response({ + 'queued_task_count': app.count_queued_tasks(), + }) + + +@api_view(["GET"]) +@authentication_classes((TokenAuthentication, SessionAuthentication)) +@permission_classes((IsAuthenticated,)) +def unapplied_changes_status(request): + if not request.user.is_admin: + return HttpResponseForbidden() + + from contentcuration.celery import app + + active_task_count = 0 + for _ in app.get_active_and_reserved_tasks(): + active_task_count += 1 + + return Response({ + 'active_task_count': active_task_count, + 'unapplied_changes_count': Change.objects.filter(applied=False, errored=False).count(), + }) + + """ END HEALTH CHECKS """ diff --git a/deploy/cloudprober.cfg b/deploy/cloudprober.cfg index 57bd0084db..c5a129455e 100644 --- a/deploy/cloudprober.cfg +++ b/deploy/cloudprober.cfg @@ -160,4 +160,28 @@ probe { timeout_msec: 10000 # 10s } +probe { + name: "unapplied_changes_status" + type: EXTERNAL + targets { dummy_targets {} } + external_probe { + mode: ONCE + command: "./probers/unapplied_changes_probe.py" + } + interval_msec: 1800000 # 30 minutes + timeout_msec: 20000 # 20s +} + +probe { + name: "task_queue_status" + type: EXTERNAL + targets { dummy_targets {} } + external_probe { + mode: ONCE + command: "./probers/task_queue_probe.py" + } + interval_msec: 600000 # 10 minutes + timeout_msec: 10000 # 10s +} + # Note: When deploying on GKE, the error logs can be found under GCE VM instance. diff --git a/deploy/probers/task_queue_probe.py b/deploy/probers/task_queue_probe.py new file mode 100755 index 0000000000..3a3b02cfed --- /dev/null +++ b/deploy/probers/task_queue_probe.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +from base import BaseProbe + + +class TaskQueueProbe(BaseProbe): + + metric = "task_queue_ping_latency_msec" + threshold = 50 + + def do_probe(self): + r = self.request("api/probers/task_queue_status/") + r.raise_for_status() + results = r.json() + + task_count = results.get('queued_task_count', 0) + if task_count >= self.threshold: + raise Exception("Task queue length is over threshold! {} > {}".format(task_count, self.threshold)) + + +if __name__ == "__main__": + TaskQueueProbe().run() diff --git a/deploy/probers/unapplied_changes_probe.py b/deploy/probers/unapplied_changes_probe.py new file mode 100755 index 0000000000..a3ceee915e --- /dev/null +++ b/deploy/probers/unapplied_changes_probe.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +from base import BaseProbe + + +class UnappliedChangesProbe(BaseProbe): + + metric = "unapplied__changes_ping_latency_msec" + + def do_probe(self): + r = self.request("api/probers/unapplied_changes_status/") + r.raise_for_status() + results = r.json() + + active_task_count = results.get('active_task_count', 0) + unapplied_changes_count = results.get('unapplied_changes_count', 0) + + if active_task_count == 0 and unapplied_changes_count > 0: + raise Exception("There are unapplied changes and no active tasks! {} unapplied changes".format(unapplied_changes_count)) + + +if __name__ == "__main__": + UnappliedChangesProbe().run()