Skip to content

Commit

Permalink
Updated documentation and added support for multiple queues.
Browse files Browse the repository at this point in the history
  • Loading branch information
ipartola committed Jan 5, 2017
1 parent 2a5a98d commit ddcf235
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 35 deletions.
45 changes: 37 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,66 @@ is that you have already configured the Django cache.
...
}

You can use whatever method you'd like for sending out the notification, but `mail_admins` is one of the simplest.
You can use whatever method you'd like for sending out the notification, but
`mail_admins` is one of the simplest.

**Step 4.**: Configure the timestamp update task.

You can do this using either your system's cron daemon, or using Celery's built-in scheduler (Celery beat). To do it using cron:
You can do this using either your system's cron daemon, or using Celery's
built-in scheduler (Celery beat). To do it using cron:

* * * * * APP_USER_NAME /path/to/manage.py trigger_celery_heartbeat

If you would prefer to set it up using Celery beat, add this to your settings.py:


CELERYBEAT_SCHEDULE = {
...
'heartbeat': {
'task': 'endorfyn.heartbeat',
'task': 'celery_heartbeat.start_update_heartbeat',
'schedule': crontab(minute='*'),
},
...
}

**Step 5.**: Configure the heartbeat check command to run periodically.

This command cannot be run by Celery beat because if celery is falling behind, it would not be run with the correct frequency. Your system's cron daemon is probably the simplest choice:
This command cannot be run by Celery beat because if Celery is falling behind,
it would not be run with the correct frequency, always producing false
negatives. Your system's cron daemon is probably the simplest choice:

* * * * * APP_USER_NAME /path/to/manage.py check_celery_heartbeat

NOTE: You will want to run this command only one at a time. If you run it in
parallel on multiple servers, you will likely get multiple notifications about
a queue falling behind.

## Running without cron

If your application runs in an environment that does not allow you to use
cron, you will need to find another task scheduling service. For example,
Heroku offers the [Heroku Scheduler](https://devcenter.heroku.com/articles/scheduler)
as well as [Custom Clock Processes](https://devcenter.heroku.com/articles/scheduled-jobs-custom-clock-processes).

Either of these are fine choices for both the trigger command and the
check command.

## Monitoring multiple queues

By default, django-celery-heartbeat will try to monitor all the queues it
can find. If you use more than one queue, you should specify which ones you
want to be monitored using the `CELERY_HEARTBEAT_MONITORED_QUEUES` options
in your settings.py:

CELERY_HEARTBEAT_MONITORED_QUEUES = ['celery', 'image_processing', 'video_encoding']

## Additional options

django-celery-heartbeat has the following options you can specify in your settings.py
django-celery-heartbeat has the following options you can specify in your
settings.py

`CELERY_HEARTBEAT_CACHE_KEY` specifies the name of the key in your cache to use for storing the timestamp. The default is `CELERY_HEARTBEAT_CACHE_KEY`.
`CELERY_HEARTBEAT_CACHE_KEY` specifies the name of the key in your cache to
use for storing the timestamp. The default is `CELERY_HEARTBEAT_CACHE_KEY`.

`CELERY_HEARTBEAT_DELAY_THRESHOLD` specifies the number of seconds by which celery can fall behind before the error is logged. The default is 5 minutes.
`CELERY_HEARTBEAT_DELAY_THRESHOLD` specifies the number of seconds by which
celery can fall behind before the error is logged. The default is 5 minutes.

21 changes: 2 additions & 19 deletions celery_heartbeat/management/commands/check_celery_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,10 @@
from django.core.cache import cache
from django.conf import settings

from celery_heartbeat.utils import get_cache_key

import time
import logging

logger = logging.getLogger('celery_heartbeat')
from celery_heartbeat.utils import check_and_log_heartbeat


class Command(BaseCommand):
DEFAULT_CELERY_HEARTBEAT_DELAY_THRESHOLD = 5# * 60

def handle(self, *args, **options):
cache_key = get_cache_key()
heartbeat = cache.get(cache_key)
if not heartbeat:
print('hello')
return

delta = long(time.time() - heartbeat)
threshold = getattr(settings, 'CELERY_HEARTBEAT_DELAY_THRESHOLD', self.DEFAULT_CELERY_HEARTBEAT_DELAY_THRESHOLD) or self.DEFAULT_CELERY_HEARTBEAT_DELAY_THRESHOLD

if delta > threshold:
logger.error('Celery is behind by {} seconds.'.format(delta))

check_and_log_heartbeat()
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import unicode_literals, print_function, absolute_import, division

from django.core.management.base import BaseCommand
from celery_heartbeat.tasks import update_heartbeat
from celery_heartbeat.tasks import start_update_heartbeat


class Command(BaseCommand):

def handle(self, *args, **options):
update_heartbeat.delay()
start_update_heartbeat.delay()
18 changes: 14 additions & 4 deletions celery_heartbeat/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@

import time

from celery_heartbeat.utils import get_cache_key
from celery_heartbeat.utils import get_cache_key, get_known_queues


@shared_task(ignore_result=True, name='celery_heartbeat.shared_task')
def update_heartbeat():
cache_key = get_cache_key()
@shared_task(ignore_result=True, name='celery_heartbeat.update_heartbeat')
def update_heartbeat(queue_name):
'''Updates the given queue heartbeat value in the cache with the current timestamp.'''

cache_key = get_cache_key(queue_name)
cache.set(cache_key, long(time.time()), None)


@shared_task(ignore_result=True, name='celery_heartbeat.start_update_heartbeat')
def start_update_heartbeat():
'''Schedules all heartbeat updates for all the queues.'''

for queue in get_known_queues():
update_heartbeat.apply_async(args=[queue], queue=queue)
69 changes: 67 additions & 2 deletions celery_heartbeat/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,71 @@
from __future__ import unicode_literals, print_function, absolute_import, division

from django.conf import settings
from django.core.cache import cache
from celery import Celery

import time
import logging

def get_cache_key():
return getattr(settings, 'CELERY_HEARTBEAT_CACHE_KEY', 'CELERY_HEARTBEAT_CACHE_KEY') or 'CELERY_HEARTBEAT_CACHE_KEY'

logger = logging.getLogger('celery_heartbeat')
DEFAULT_CELERY_HEARTBEAT_DELAY_THRESHOLD = 5 * 60


def get_cache_key(queue_name):
'''Returns the name of the cache key to use for the given queue.'''

prefix = getattr(settings, 'CELERY_HEARTBEAT_CACHE_KEY', 'CELERY_HEARTBEAT_CACHE_KEY')
prefix = prefix or 'CELERY_HEARTBEAT_CACHE_KEY'

return '{}-{}'.format(prefix, queue_name)


def get_known_queues():
'''Returns a list of monitored celery queues.'''

app = Celery()
app.config_from_object('django.conf:settings')

queues = list(set(getattr(settings, 'CELERY_HEARTBEAT_MONITORED_QUEUES', []) or []))
if not queues:
queues = list(app.amqp.queues.keys())

return queues


def check_heartbeat(threshold):
'''Checks if any of the queues have fallen behind by more than threshold
seconds and returns human-readable status.'''

errors = []

for queue in get_known_queues():

cache_key = get_cache_key(queue)
heartbeat = cache.get(cache_key)
if not heartbeat:
continue

delta = long(time.time() - heartbeat)

if delta > threshold:
errors.append('Celery queue "{}" has fallen behind by {} seconds.'.format(queue, delta))

return errors


def check_and_log_heartbeat():
'''Checks if any of the queues have fallen behind by more than threshold
seconds and logs any queues that did.'''

errors = check_heartbeat(get_threshold())
if errors:
logger.error('\n'.join(errors))


def get_threshold():
'''Returns the configured or default threshold for queues falling behind, in seconds.'''

threshold = getattr(settings, 'CELERY_HEARTBEAT_DELAY_THRESHOLD', DEFAULT_CELERY_HEARTBEAT_DELAY_THRESHOLD)
return threshold or DEFAULT_CELERY_HEARTBEAT_DELAY_THRESHOLD

0 comments on commit ddcf235

Please sign in to comment.