Skip to content

Commit

Permalink
Use a custom Task Router to route tasks dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
humitos committed Apr 2, 2020
1 parent c969b18 commit 8be9f85
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
116 changes: 116 additions & 0 deletions readthedocs/builds/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import logging

from django.conf import settings
from django.db.models import Avg

from readthedocs.builds.models import Build, Version

log = logging.getLogger(__name__)

class TaskRouter:

"""
Celery tasks router.
It allows us to decide which queue is where we want to execute the task
based on project's settings but also in queue availability.
1. the project is using conda
2. new project with less than N successful builds
3. last N successful builds have a high time average
It ignores projects that have already set ``build_queue`` attribute.
https://docs.celeryproject.org/en/stable/userguide/routing.html#manual-routing
https://docs.celeryproject.org/en/stable/userguide/configuration.html#std:setting-task_routes
"""

n_builds = 5
n_last_builds = 15
time_average = 350

BUILD_DEFAULT_QUEUE = 'build:default'
BUILD_LARGE_QUEUE = 'build:large'


def route_for_task(self, task, args, kwargs, **__):
log.info('Executing TaskRouter. task=%s', task)
if task not in (
'readthedocs.projects.tasks.update_docs_task',
'readthedocs.projects.tasks.sync_repository_task',
):
log.info('No routing non-build task. task=%s', task)
return

version = self._get_version(task, args, kwargs)

# Do no route tasks for projects without the feature flag
if not version.project.has_feature(Feature.CELERY_ROUTER):
log.info('Project does not have the feature flag. No routing task. task=%s', task)
return

# Do not override the queue defined in the project itself
if version.project.build_queue:
log.info(
'Skipping routing task because project has a custom queue. queue=%s',
version.project.build_queue,
)
return version.project.build_queue

queryset = version.builds.filter(success=True).order_by('-date')
last_builds = queryset[:self.n_last_builds]

# Version has used conda in previous builds
for build in last_builds.iterator():
if build.config.get('conda', None):
log.info(
'Routing task because project uses conda. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

# We do not have enough builds for this version yet
if queryset.count() < self.n_builds:
log.info(
'Routing task because it does not have enough success builds yet. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

# Build time average is high
length_avg = queryset.filter(pk__in=last_builds).aggregate(Avg('length')).get('length__avg')
if length_avg > self.time_average:
log.info(
'Routing task because project has high time average. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

log.info('Routing task to default queue. queue=%s', settings.CELERY_DEFAULT_QUEUE)
return

def _get_version(self, task, args, kwargs):
if task == 'readthedocs.projects.tasks.update_docs_task':
build_pk = kwargs.get('build_pk')
try:
build = Build.objects.get(pk=build_pk)
version = build.version
except Build.DoesNotExist:
log.info(
'Build does not exist. Routing task to default queue. build_pk=%s queue=%s',
build_pk,
settings.CELERY_DEFAULT_QUEUE,
)
return
elif task == 'readthedocs.projects.tasks.sync_repository_task':
version_pk = args[0]
try:
version = Version.objects.get(pk=version_pk)
except Version.DoesNotExist:
log.info(
'Version does not exist. Routing task to default queue. version_pk=%s queue=%s',
version_pk,
settings.CELERY_DEFAULT_QUEUE,
)
return
return version
5 changes: 5 additions & 0 deletions readthedocs/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,7 @@ def add_features(sender, **kwargs):
SKIP_SYNC_TAGS = 'skip_sync_tags'
SKIP_SYNC_BRANCHES = 'skip_sync_branches'
CACHED_ENVIRONMENT = 'cached_environment'
CELERY_ROUTER = 'celery_router'

FEATURES = (
(USE_SPHINX_LATEST, _('Use latest version of Sphinx')),
Expand Down Expand Up @@ -1585,6 +1586,10 @@ def add_features(sender, **kwargs):
CACHED_ENVIRONMENT,
_('Cache the environment (virtualenv, conda, pip cache, repository) in storage'),
),
(
CELERY_ROUTER,
_('Route tasks using our custom task router'),
),
)

projects = models.ManyToManyField(
Expand Down
4 changes: 4 additions & 0 deletions readthedocs/settings/docker_compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ def DATABASES(self): # noqa
CELERY_ALWAYS_EAGER = False
CELERY_TASK_IGNORE_RESULT = False

CELERY_ROUTES = (
'readthedocs.builds.tasks.TaskRouter',
)

EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"

# Avoid syncing to the web servers
Expand Down

0 comments on commit 8be9f85

Please sign in to comment.