Skip to content

Commit

Permalink
[learningequality#3827][learningequality#3828] Add probers for alerti…
Browse files Browse the repository at this point in the history
…ng to task and change abnormalities
  • Loading branch information
bjester committed Dec 2, 2022
1 parent 9d84374 commit bafe68c
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ def handle(self, *args, **options):
from contentcuration.tasks import apply_channel_changes_task
from contentcuration.tasks import apply_user_changes_task

active_task_ids = []
for worker_name, tasks in app.control.inspect().active().items():
active_task_ids.extend(task['id'] for task in tasks)
for worker_name, tasks in app.control.inspect().reserved().items():
active_task_ids.extend(task['id'] for task in tasks)
active_task_ids = [task['id'] for task in app.get_active_and_reserved_tasks()]

channel_changes = Change.objects.filter(channel_id__isnull=False, applied=False, errored=False) \
.order_by('channel_id', 'created_by_id') \
Expand Down
2 changes: 2 additions & 0 deletions contentcuration/contentcuration/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def get_redirect_url(self, *args, **kwargs):
re_path(r'^api/probers/get_prober_channel', views.get_prober_channel, name='get_prober_channel'),
re_path(r'^api/probers/publishing_status', views.publishing_status, name='publishing_status'),
re_path(r'^api/probers/celery_worker_status', views.celery_worker_status, name='celery_worker_status'),
re_path(r'^api/probers/task_queue_status', views.task_queue_status, name='task_queue_status'),
re_path(r'^api/probers/unapplied_changes_status', views.unapplied_changes_status, name='unapplied_changes_status'),
re_path(r'^api/sync/$', SyncView.as_view(), name="sync"),
]

Expand Down
26 changes: 24 additions & 2 deletions contentcuration/contentcuration/utils/celery/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import base64
import json

import redis.exceptions
from celery import Celery

from contentcuration.utils.celery.tasks import CeleryTask
from contentcuration.utils.sentry import report_exception


class CeleryApp(Celery):
Expand Down Expand Up @@ -50,10 +52,30 @@ def count_queued_tasks(self, queue_name="celery"):
:param queue_name: The queue name, defaults to the default "celery" queue
:return: int
"""
with self.pool.acquire(block=True) as conn:
count = conn.default_channel.client.llen(queue_name)
count = 0
try:
with self.pool.acquire(block=True) as conn:
count = conn.default_channel.client.llen(queue_name)
except redis.exceptions.RedisError as e:
# log these so we can get visibility into the reliability of the redis connection
report_exception(e)
pass
return count

def get_active_and_reserved_tasks(self):
"""
Iterate over active and reserved tasks
:return: A list of dictionaries
"""
active = self.control.inspect().active() or {}
for _, tasks in active.items():
for task in tasks:
yield task
reserved = self.control.inspect().reserved() or {}
for _, tasks in reserved.items():
for task in tasks:
yield task

def decode_result(self, result, status=None):
"""
Decodes the celery result, like the raw result from the database, using celery tools
Expand Down
33 changes: 33 additions & 0 deletions contentcuration/contentcuration/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,39 @@ def celery_worker_status(request):
return Response(app.control.inspect().ping() or {})


@api_view(["GET"])
@authentication_classes((TokenAuthentication, SessionAuthentication))
@permission_classes((IsAuthenticated,))
def task_queue_status(request):
if not request.user.is_admin:
return HttpResponseForbidden()

from contentcuration.celery import app

return Response({
'queued_task_count': app.count_queued_tasks(),
})


@api_view(["GET"])
@authentication_classes((TokenAuthentication, SessionAuthentication))
@permission_classes((IsAuthenticated,))
def unapplied_changes_status(request):
if not request.user.is_admin:
return HttpResponseForbidden()

from contentcuration.celery import app

active_task_count = 0
for _ in app.get_active_and_reserved_tasks():
active_task_count += 1

return Response({
'active_task_count': active_task_count,
'unapplied_changes_count': Change.objects.filter(applied=False, errored=False).count(),
})


""" END HEALTH CHECKS """


Expand Down
24 changes: 24 additions & 0 deletions deploy/cloudprober.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,28 @@ probe {
timeout_msec: 10000 # 10s
}

probe {
name: "unapplied_changes_status"
type: EXTERNAL
targets { dummy_targets {} }
external_probe {
mode: ONCE
command: "./probers/unapplied_changes_probe.py"
}
interval_msec: 1800000 # 30 minutes
timeout_msec: 20000 # 20s
}

probe {
name: "task_queue_status"
type: EXTERNAL
targets { dummy_targets {} }
external_probe {
mode: ONCE
command: "./probers/task_queue_probe.py"
}
interval_msec: 600000 # 10 minutes
timeout_msec: 10000 # 10s
}

# Note: When deploying on GKE, the error logs can be found under GCE VM instance.
21 changes: 21 additions & 0 deletions deploy/probers/task_queue_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python
from base import BaseProbe


class TaskQueueProbe(BaseProbe):

metric = "task_queue_ping_latency_msec"
threshold = 50

def do_probe(self):
r = self.request("api/probers/task_queue_status/")
r.raise_for_status()
results = r.json()

task_count = results.get('queued_task_count', 0)
if task_count >= self.threshold:
raise Exception("Task queue length is over threshold! {} > {}".format(task_count, self.threshold))


if __name__ == "__main__":
TaskQueueProbe().run()
22 changes: 22 additions & 0 deletions deploy/probers/unapplied_changes_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env python
from base import BaseProbe


class UnappliedChangesProbe(BaseProbe):

metric = "unapplied__changes_ping_latency_msec"

def do_probe(self):
r = self.request("api/probers/unapplied_changes_status/")
r.raise_for_status()
results = r.json()

active_task_count = results.get('active_task_count', 0)
unapplied_changes_count = results.get('unapplied_changes_count', 0)

if active_task_count == 0 and unapplied_changes_count > 0:
raise Exception("There are unapplied changes and no active tasks! {} unapplied changes".format(unapplied_changes_count))


if __name__ == "__main__":
UnappliedChangesProbe().run()

0 comments on commit bafe68c

Please sign in to comment.