Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a custom Task Router to route tasks dynamically #6849

Merged
merged 11 commits into from
Apr 16, 2020
7 changes: 6 additions & 1 deletion dockerfiles/settings/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@


class WebDevSettings(DockerBaseSettings):
pass

# Router is useful from webs only because they have access to the database.
# Builders will use the same queue that was assigned the first time on retry
CELERY_ROUTES = (
'readthedocs.builds.tasks.TaskRouter',
)


WebDevSettings.load_settings(__name__)
118 changes: 118 additions & 0 deletions readthedocs/builds/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging

from django.db.models import Avg

from readthedocs.builds.models import Build, Version
from readthedocs.projects.models import Feature

log = logging.getLogger(__name__)


class TaskRouter:

"""
Celery tasks router.

It allows us to decide which queue is where we want to execute the task
based on project's settings but also in queue availability.

1. the project is using conda
2. new project with less than N successful builds
3. last N successful builds have a high time average

It ignores projects that have already set ``build_queue`` attribute.

https://docs.celeryproject.org/en/stable/userguide/routing.html#manual-routing
https://docs.celeryproject.org/en/stable/userguide/configuration.html#std:setting-task_routes
"""

N_BUILDS = 5
N_LAST_BUILDS = 15
TIME_AVERAGE = 350

BUILD_DEFAULT_QUEUE = 'build:default'
BUILD_LARGE_QUEUE = 'build:large'

def route_for_task(self, task, args, kwargs, **__):
log.info('Executing TaskRouter. task=%s', task)
if task not in (
'readthedocs.projects.tasks.update_docs_task',
'readthedocs.projects.tasks.sync_repository_task',
):
log.info('Skipping routing non-build task. task=%s', task)
return

version = self._get_version(task, args, kwargs)
humitos marked this conversation as resolved.
Show resolved Hide resolved
if not version:
log.info('No Build/Version found. No routing task. task=%s', task)
return

# Do no route tasks for projects without the feature flag
if not version.project.has_feature(Feature.CELERY_ROUTER):
log.info('Project does not have the feature flag. No routing task. task=%s', task)
return version.project.build_queue or None

# Do not override the queue defined in the project itself
if version.project.build_queue:
log.info(
'Skipping routing task because project has a custom queue. queue=%s',
version.project.build_queue,
)
return version.project.build_queue

queryset = version.builds.filter(success=True).order_by('-date')
last_builds = queryset[:self.N_LAST_BUILDS]

# Version has used conda in previous builds
for build in last_builds.iterator():
if build.config.get('conda', None):
log.info(
'Routing task because project uses conda. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

# We do not have enough builds for this version yet
if queryset.count() < self.N_BUILDS:
log.info(
'Routing task because it does not have enough success builds yet. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

# Build time average is high
length_avg = queryset.filter(pk__in=last_builds).aggregate(Avg('length')).get('length__avg')
if length_avg > self.TIME_AVERAGE:
log.info(
'Routing task because project has high time average. queue=%s',
self.BUILD_LARGE_QUEUE,
)
return self.BUILD_LARGE_QUEUE

log.info('No routing task because no conditions were met.')
return

def _get_version(self, task, args, kwargs):
if task == 'readthedocs.projects.tasks.update_docs_task':
build_pk = kwargs.get('build_pk')
try:
build = Build.objects.get(pk=build_pk)
version = build.version
except Build.DoesNotExist:
log.info(
'Build does not exist. Routing task to default queue. build_pk=%s',
build_pk,
)
return

elif task == 'readthedocs.projects.tasks.sync_repository_task':
version_pk = args[0]
try:
version = Version.objects.get(pk=version_pk)
except Version.DoesNotExist:
log.info(
'Version does not exist. Routing task to default queue. version_pk=%s',
version_pk,
)
return
return version
90 changes: 90 additions & 0 deletions readthedocs/builds/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import django_dynamic_fixture as fixture
from django.conf import settings
from django.test import TestCase
from django.test.utils import override_settings

from readthedocs.builds.tasks import TaskRouter
from readthedocs.builds.models import Build, Version
from readthedocs.projects.models import Project, Feature



class TaskRouterTests(TestCase):

def setUp(self):
self.project = fixture.get(
Project,
build_queue=None,
)
self.feature = fixture.get(
Feature,
feature_id=Feature.CELERY_ROUTER,
)
self.feature.projects.add(self.project)
self.version = self.project.versions.first()
self.build = fixture.get(
Build,
version=self.version,
)
for _ in range(TaskRouter.N_BUILDS + 5):
fixture.get(
Build,
version=self.version,

)

self.task = 'readthedocs.projects.tasks.update_docs_task'
self.args = (
self.version.pk,
)
self.kwargs = {
'build_pk': self.build.pk,
}
self.router = TaskRouter()

def test_not_under_feature_flag(self):
self.feature.projects.remove(self.project)
self.assertIsNone(
self.router.route_for_task(self.task, self.args, self.kwargs),
)

def test_project_custom_queue(self):
self.project.build_queue = 'build:custom'
self.project.save()

self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
'build:custom',
)

def test_used_conda_in_last_builds(self):
self.build._config = {'conda': {'file': 'docs/environment.yml'}}
self.build.save()

self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
TaskRouter.BUILD_LARGE_QUEUE,
)

def test_more_than_n_builds(self):
self.assertIsNone(
self.router.route_for_task(self.task, self.args, self.kwargs),
)

def test_non_build_task(self):
self.assertIsNone(
self.router.route_for_task('non_build_task', self.args, self.kwargs),
)

def test_no_build_pk(self):
self.assertIsNone(
self.router.route_for_task(self.task, self.args, {}),
)

def test_build_length_high_average(self):
high_length = TaskRouter.TIME_AVERAGE + 50
self.version.builds.update(length=high_length)
self.assertEqual(
self.router.route_for_task(self.task, self.args, self.kwargs),
TaskRouter.BUILD_LARGE_QUEUE,
)
5 changes: 5 additions & 0 deletions readthedocs/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,7 @@ def add_features(sender, **kwargs):
SKIP_SYNC_TAGS = 'skip_sync_tags'
SKIP_SYNC_BRANCHES = 'skip_sync_branches'
CACHED_ENVIRONMENT = 'cached_environment'
CELERY_ROUTER = 'celery_router'
LIMIT_CONCURRENT_BUILDS = 'limit_concurrent_builds'

FEATURES = (
Expand Down Expand Up @@ -1585,6 +1586,10 @@ def add_features(sender, **kwargs):
CACHED_ENVIRONMENT,
_('Cache the environment (virtualenv, conda, pip cache, repository) in storage'),
),
(
CELERY_ROUTER,
_('Route tasks using our custom task router'),
),
(
LIMIT_CONCURRENT_BUILDS,
_('Limit the amount of concurrent builds'),
Expand Down