Skip to content

Commit

Permalink
Use a custom Task Router to route tasks dynamically
Browse files Browse the repository at this point in the history
  • Loading branch information
humitos committed Apr 2, 2020
1 parent c969b18 commit e876613
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 1 deletion.
7 changes: 6 additions & 1 deletion dockerfiles/settings/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@


class WebDevSettings(DockerBaseSettings):
pass

# Router is useful from webs only because they have access to the database.
# Builders will use the same queue that was assigned the first time on retry
CELERY_ROUTES = (
'readthedocs.builds.tasks.TaskRouter',
)


WebDevSettings.load_settings(__name__)
124 changes: 124 additions & 0 deletions readthedocs/builds/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import logging

from django.conf import settings
from django.db.models import Avg

from readthedocs.builds.models import Build, Version
from readthedocs.projects.models import Feature

log = logging.getLogger(__name__)

class TaskRouter:

"""
Celery tasks router.
It allows us to decide which queue is where we want to execute the task
based on project's settings but also in queue availability.
1. the project is using conda
2. new project with less than N successful builds
3. last N successful builds have a high time average
It ignores projects that have already set ``build_queue`` attribute.
https://docs.celeryproject.org/en/stable/userguide/routing.html#manual-routing
https://docs.celeryproject.org/en/stable/userguide/configuration.html#std:setting-task_routes
"""

n_builds = 5
n_last_builds = 15
time_average = 350

BUILD_DEFAULT_QUEUE = 'build:default'
BUILD_LARGE_QUEUE = 'build:large'


def route_for_task(self, task, args, kwargs, **__):
log.info('Executing TaskRouter. task=%s', task)
if task not in (
'readthedocs.projects.tasks.update_docs_task',
'readthedocs.projects.tasks.sync_repository_task',
):
log.info('Skipping routing non-build task. task=%s', task)
return

version = self._get_version(task, args, kwargs)
if not version:
log.info('No Build/Version found. No routing task. task=%s', task)
return settings.CELERY_DEFAULT_QUEUE

# Do no route tasks for projects without the feature flag
if not version.project.has_feature(Feature.CELERY_ROUTER):
log.info('Project does not have the feature flag. No routing task. task=%s', task)
return version.project.build_queue or settings.CELERY_DEFAULT_QUEUE

# Do not override the queue defined in the project itself
if version.project.build_queue:
log.info(
'Skipping routing task because project has a custom queue. queue=%s',
version.project.build_queue,
)
return version.project.build_queue

queryset = version.builds.filter(success=True).order_by('-date')
last_builds = queryset[:self.n_last_builds]

# Version has used conda in previous builds
for build in last_builds.iterator():
if build.config.get('conda', None):
log.info(
'Routing task because project uses conda. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

# We do not have enough builds for this version yet
if queryset.count() < self.n_builds:
log.info(
'Routing task because it does not have enough success builds yet. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

# Build time average is high
length_avg = queryset.filter(pk__in=last_builds).aggregate(Avg('length')).get('length__avg')
if length_avg > self.time_average:
log.info(
'Routing task because project has high time average. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

log.info(
'Routing task to default queue because no conditions were met. queue=%s',
settings.CELERY_DEFAULT_QUEUE,
)
return settings.CELERY_DEFAULT_QUEUE

def _get_version(self, task, args, kwargs):
if task == 'readthedocs.projects.tasks.update_docs_task':
build_pk = kwargs.get('build_pk')
try:
build = Build.objects.get(pk=build_pk)
version = build.version
except Build.DoesNotExist:
log.info(
'Build does not exist. Routing task to default queue. build_pk=%s queue=%s',
build_pk,
settings.CELERY_DEFAULT_QUEUE,
)
return

elif task == 'readthedocs.projects.tasks.sync_repository_task':
version_pk = args[0]
try:
version = Version.objects.get(pk=version_pk)
except Version.DoesNotExist:
log.info(
'Version does not exist. Routing task to default queue. version_pk=%s queue=%s',
version_pk,
settings.CELERY_DEFAULT_QUEUE,
)
return
return version
94 changes: 94 additions & 0 deletions readthedocs/builds/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import django_dynamic_fixture as fixture
from django.conf import settings
from django.test import TestCase
from django.test.utils import override_settings

from readthedocs.builds.tasks import TaskRouter
from readthedocs.builds.models import Build, Version
from readthedocs.projects.models import Project, Feature



class TaskRouterTests(TestCase):

def setUp(self):
self.project = fixture.get(
Project,
build_queue=None,
)
self.feature = fixture.get(
Feature,
feature_id=Feature.CELERY_ROUTER,
)
self.feature.projects.add(self.project)
self.version = self.project.versions.first()
self.build = fixture.get(
Build,
version=self.version,
)
for _ in range(TaskRouter.n_builds + 5):
fixture.get(
Build,
version=self.version,

)

self.task = 'readthedocs.projects.tasks.update_docs_task'
self.args = (
self.version.pk,
)
self.kwargs = {
'build_pk': self.build.pk,
}
self.router = TaskRouter()

def test_not_under_feature_flag(self):
self.feature.projects.remove(self.project)
self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
settings.CELERY_DEFAULT_QUEUE,
)

def test_project_custom_queue(self):
self.project.build_queue = 'build:custom'
self.project.save()

self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
'build:custom',
)

def test_used_conda_in_last_builds(self):
self.build._config = {'conda': {'file': 'docs/environment.yml'}}
self.build.save()

self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
TaskRouter.BUILD_LARGE_QUEUE,
)

def test_more_than_n_builds(self):
self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
settings.CELERY_DEFAULT_QUEUE,
)

def test_non_build_task(self):
self.assertEqual(
self.router.route_for_task('non_build_task', self.args, self.kwargs),
settings.CELERY_DEFAULT_QUEUE,
)

def test_no_build_pk(self):
self.assertEqual(
self.router.route_for_task(self.task, self.args, {}),
settings.CELERY_DEFAULT_QUEUE,
)

def test_build_length_high_average(self):
high_length = TaskRouter.time_average + 50
self.version.builds.update(length=high_length)
self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
TaskRouter.BUILD_LARGE_QUEUE,
)
5 changes: 5 additions & 0 deletions readthedocs/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,7 @@ def add_features(sender, **kwargs):
SKIP_SYNC_TAGS = 'skip_sync_tags'
SKIP_SYNC_BRANCHES = 'skip_sync_branches'
CACHED_ENVIRONMENT = 'cached_environment'
CELERY_ROUTER = 'celery_router'

FEATURES = (
(USE_SPHINX_LATEST, _('Use latest version of Sphinx')),
Expand Down Expand Up @@ -1585,6 +1586,10 @@ def add_features(sender, **kwargs):
CACHED_ENVIRONMENT,
_('Cache the environment (virtualenv, conda, pip cache, repository) in storage'),
),
(
CELERY_ROUTER,
_('Route tasks using our custom task router'),
),
)

projects = models.ManyToManyField(
Expand Down

0 comments on commit e876613

Please sign in to comment.