Skip to content

Commit

Permalink
Merge branch 'robertatakenaka-celery_beat'
Browse files Browse the repository at this point in the history
  • Loading branch information
gitnnolabs committed Oct 16, 2022
2 parents b1e29db + c1c765a commit 67aff2e
Show file tree
Hide file tree
Showing 40 changed files with 4,757 additions and 2 deletions.
10 changes: 9 additions & 1 deletion config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,17 @@
CELERY_TASK_TIME_LIMIT = 5 * 60
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-soft-time-limit
# TODO: set to whatever value is adequate in your circumstances
CELERY_TASK_SOFT_TIME_LIMIT = 60
CELERY_TASK_SOFT_TIME_LIMIT = 36000
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#beat-scheduler
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

# Celery Results
# ------------------------------------------------------------------------------
# https: // django-celery-results.readthedocs.io/en/latest/getting_started.html
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_RESULT_EXTENDED = True

# django-allauth
# ------------------------------------------------------------------------------
ACCOUNT_ALLOW_REGISTRATION = env.bool("DJANGO_ACCOUNT_ALLOW_REGISTRATION", True)
Expand Down
35 changes: 35 additions & 0 deletions django_celery_beat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Database-backed Periodic Tasks."""
# :copyright: (c) 2016, Ask Solem.
# All rights reserved.
# :license: BSD (3 Clause), see LICENSE for more details.
import re

from collections import namedtuple

import django

__version__ = '2.2.1'
__author__ = 'Asif Saif Uddin, Ask Solem'
__contact__ = '[email protected], [email protected]'
__homepage__ = 'https://github.com/celery/django-celery-beat'
__docformat__ = 'restructuredtext'

# -eof meta-

version_info_t = namedtuple('version_info_t', (
'major', 'minor', 'micro', 'releaselevel', 'serial',
))

# bumpversion can only search for {current_version}
# so we have to parse the version here.
_temp = re.match(
r'(\d+)\.(\d+).(\d+)(.+)?', __version__).groups()
VERSION = version_info = version_info_t(
int(_temp[0]), int(_temp[1]), int(_temp[2]), _temp[3] or '', '')
del(_temp)
del(re)

__all__ = []

if django.VERSION < (3, 2):
default_app_config = 'django_celery_beat.apps.BeatConfig'
253 changes: 253 additions & 0 deletions django_celery_beat/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
"""Periodic Task Admin interface."""
from django import forms
from django.conf import settings
from django.contrib import admin, messages
from django.db.models import When, Value, Case
from django.forms.widgets import Select
from django.template.defaultfilters import pluralize
from django.utils.translation import gettext_lazy as _

from celery import current_app
from celery.utils import cached_property
from kombu.utils.json import loads

from .models import (
PeriodicTask, PeriodicTasks,
IntervalSchedule, CrontabSchedule,
SolarSchedule, ClockedSchedule
)
from .utils import is_database_scheduler


class TaskSelectWidget(Select):
"""Widget that lets you choose between task names."""

celery_app = current_app
_choices = None

def tasks_as_choices(self):
_ = self._modules # noqa
tasks = list(sorted(name for name in self.celery_app.tasks
if not name.startswith('celery.')))
return (('', ''), ) + tuple(zip(tasks, tasks))

@property
def choices(self):
if self._choices is None:
self._choices = self.tasks_as_choices()
return self._choices

@choices.setter
def choices(self, _):
# ChoiceField.__init__ sets ``self.choices = choices``
# which would override ours.
pass

@cached_property
def _modules(self):
self.celery_app.loader.import_default_modules()


class TaskChoiceField(forms.ChoiceField):
"""Field that lets you choose between task names."""

widget = TaskSelectWidget

def valid_value(self, value):
return True


class PeriodicTaskForm(forms.ModelForm):
"""Form that lets you create and modify periodic tasks."""

regtask = TaskChoiceField(
label=_('Task (registered)'),
required=False,
)
task = forms.CharField(
label=_('Task (custom)'),
required=False,
max_length=200,
)

class Meta:
"""Form metadata."""

model = PeriodicTask
exclude = ()

def clean(self):
data = super().clean()
regtask = data.get('regtask')
if regtask:
data['task'] = regtask
if not data['task']:
exc = forms.ValidationError(_('Need name of task'))
self._errors['task'] = self.error_class(exc.messages)
raise exc

if data.get('expire_seconds') is not None and data.get('expires'):
raise forms.ValidationError(
_('Only one can be set, in expires and expire_seconds')
)
return data

def _clean_json(self, field):
value = self.cleaned_data[field]
try:
loads(value)
except ValueError as exc:
raise forms.ValidationError(
_('Unable to parse JSON: %s') % exc,
)
return value

def clean_args(self):
return self._clean_json('args')

def clean_kwargs(self):
return self._clean_json('kwargs')


class PeriodicTaskAdmin(admin.ModelAdmin):
"""Admin-interface for periodic tasks."""

form = PeriodicTaskForm
model = PeriodicTask
celery_app = current_app
date_hierarchy = 'start_time'
list_display = ('__str__', 'enabled', 'interval', 'start_time',
'last_run_at', 'one_off')
list_filter = ['enabled', 'one_off', 'task', 'start_time', 'last_run_at']
actions = ('enable_tasks', 'disable_tasks', 'toggle_tasks', 'run_tasks')
search_fields = ('name',)
fieldsets = (
(None, {
'fields': ('name', 'regtask', 'task', 'enabled', 'description',),
'classes': ('extrapretty', 'wide'),
}),
('Schedule', {
'fields': ('interval', 'crontab', 'solar', 'clocked',
'start_time', 'last_run_at', 'one_off'),
'classes': ('extrapretty', 'wide'),
}),
('Arguments', {
'fields': ('args', 'kwargs'),
'classes': ('extrapretty', 'wide', 'collapse', 'in'),
}),
('Execution Options', {
'fields': ('expires', 'expire_seconds', 'queue', 'exchange',
'routing_key', 'priority', 'headers'),
'classes': ('extrapretty', 'wide', 'collapse', 'in'),
}),
)
readonly_fields = (
'last_run_at',
)

def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
scheduler = getattr(settings, 'CELERY_BEAT_SCHEDULER', None)
extra_context['wrong_scheduler'] = not is_database_scheduler(scheduler)
return super(PeriodicTaskAdmin, self).changelist_view(
request, extra_context)

def get_queryset(self, request):
qs = super().get_queryset(request)
return qs.select_related('interval', 'crontab', 'solar', 'clocked')

def _message_user_about_update(self, request, rows_updated, verb):
"""Send message about action to user.
`verb` should shortly describe what have changed (e.g. 'enabled').
"""
self.message_user(
request,
_('{0} task{1} {2} successfully {3}').format(
rows_updated,
pluralize(rows_updated),
pluralize(rows_updated, _('was,were')),
verb,
),
)

def enable_tasks(self, request, queryset):
rows_updated = queryset.update(enabled=True)
PeriodicTasks.update_changed()
self._message_user_about_update(request, rows_updated, 'enabled')
enable_tasks.short_description = _('Enable selected tasks')

def disable_tasks(self, request, queryset):
rows_updated = queryset.update(enabled=False)
PeriodicTasks.update_changed()
self._message_user_about_update(request, rows_updated, 'disabled')
disable_tasks.short_description = _('Disable selected tasks')

def _toggle_tasks_activity(self, queryset):
return queryset.update(enabled=Case(
When(enabled=True, then=Value(False)),
default=Value(True),
))

def toggle_tasks(self, request, queryset):
rows_updated = self._toggle_tasks_activity(queryset)
PeriodicTasks.update_changed()
self._message_user_about_update(request, rows_updated, 'toggled')
toggle_tasks.short_description = _('Toggle activity of selected tasks')

def run_tasks(self, request, queryset):
self.celery_app.loader.import_default_modules()
tasks = [(self.celery_app.tasks.get(task.task),
loads(task.args),
loads(task.kwargs),
task.queue)
for task in queryset]

if any(t[0] is None for t in tasks):
for i, t in enumerate(tasks):
if t[0] is None:
break

# variable "i" will be set because list "tasks" is not empty
not_found_task_name = queryset[i].task

self.message_user(
request,
_('task "{0}" not found'.format(not_found_task_name)),
level=messages.ERROR,
)
return

task_ids = [task.apply_async(args=args, kwargs=kwargs, queue=queue)
if queue and len(queue)
else task.apply_async(args=args, kwargs=kwargs)
for task, args, kwargs, queue in tasks]
tasks_run = len(task_ids)
self.message_user(
request,
_('{0} task{1} {2} successfully run').format(
tasks_run,
pluralize(tasks_run),
pluralize(tasks_run, _('was,were')),
),
)
run_tasks.short_description = _('Run selected tasks')


class ClockedScheduleAdmin(admin.ModelAdmin):
"""Admin-interface for clocked schedules."""

fields = (
'clocked_time',
)
list_display = (
'clocked_time',
)


admin.site.register(IntervalSchedule)
admin.site.register(CrontabSchedule)
admin.site.register(SolarSchedule)
admin.site.register(ClockedSchedule, ClockedScheduleAdmin)
admin.site.register(PeriodicTask, PeriodicTaskAdmin)
14 changes: 14 additions & 0 deletions django_celery_beat/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Django Application configuration."""
from django.apps import AppConfig
from django.utils.translation import gettext_lazy as _

__all__ = ['BeatConfig']


class BeatConfig(AppConfig):
"""Default configuration for django_celery_beat app."""

name = 'django_celery_beat'
label = 'django_celery_beat'
verbose_name = _('Periodic Tasks')
default_auto_field = 'django.db.models.AutoField'
36 changes: 36 additions & 0 deletions django_celery_beat/button_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from django.utils.translation import gettext as _
from wagtail.contrib.modeladmin.helpers import ButtonHelper

from django.urls import reverse


class PeriodicTaskHelper(ButtonHelper):

# Define classes for our button, here we can set an icon for example
run_button_classnames = ["button-small", "icon", ]

def run_button(self, obj):
# Define a label for our button
text = _("Run")
return {
"url": reverse("django_celery_beat:task_run") + "?task_id=%s" % str(obj.id),
"label": text,
"classname": self.finalise_classname(self.run_button_classnames),
"title": text,
}


def get_buttons_for_obj(
self, obj, exclude=None, classnames_add=None, classnames_exclude=None
):
"""
This function is used to gather all available buttons.
We append our custom button to the btns list.
"""
btns = super().get_buttons_for_obj(
obj, exclude, classnames_add, classnames_exclude
)
if "run" not in (exclude or []):
btns.append(self.run_button(obj))

return btns
Loading

0 comments on commit 67aff2e

Please sign in to comment.