-
-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use a custom Task Router to route tasks dynamically
- Loading branch information
Showing
4 changed files
with
227 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import logging | ||
|
||
from django.conf import settings | ||
from django.db.models import Avg | ||
|
||
from readthedocs.builds.models import Build, Version | ||
from readthedocs.projects.models import Feature | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
class TaskRouter: | ||
|
||
""" | ||
Celery tasks router. | ||
It allows us to decide which queue is where we want to execute the task | ||
based on project's settings but also in queue availability. | ||
1. the project is using conda | ||
2. new project with less than N successful builds | ||
3. last N successful builds have a high time average | ||
It ignores projects that have already set ``build_queue`` attribute. | ||
https://docs.celeryproject.org/en/stable/userguide/routing.html#manual-routing | ||
https://docs.celeryproject.org/en/stable/userguide/configuration.html#std:setting-task_routes | ||
""" | ||
|
||
n_builds = 5 | ||
n_last_builds = 15 | ||
time_average = 350 | ||
|
||
BUILD_DEFAULT_QUEUE = 'build:default' | ||
BUILD_LARGE_QUEUE = 'build:large' | ||
|
||
|
||
def route_for_task(self, task, args, kwargs, **__): | ||
log.info('Executing TaskRouter. task=%s', task) | ||
if task not in ( | ||
'readthedocs.projects.tasks.update_docs_task', | ||
'readthedocs.projects.tasks.sync_repository_task', | ||
): | ||
log.info('Skipping routing non-build task. task=%s', task) | ||
return | ||
|
||
version = self._get_version(task, args, kwargs) | ||
if not version: | ||
log.info('No Build/Version found. No routing task. task=%s', task) | ||
return settings.CELERY_DEFAULT_QUEUE | ||
|
||
# Do no route tasks for projects without the feature flag | ||
if not version.project.has_feature(Feature.CELERY_ROUTER): | ||
log.info('Project does not have the feature flag. No routing task. task=%s', task) | ||
return version.project.build_queue or settings.CELERY_DEFAULT_QUEUE | ||
|
||
# Do not override the queue defined in the project itself | ||
if version.project.build_queue: | ||
log.info( | ||
'Skipping routing task because project has a custom queue. queue=%s', | ||
version.project.build_queue, | ||
) | ||
return version.project.build_queue | ||
|
||
queryset = version.builds.filter(success=True).order_by('-date') | ||
last_builds = queryset[:self.n_last_builds] | ||
|
||
# Version has used conda in previous builds | ||
for build in last_builds.iterator(): | ||
if build.config.get('conda', None): | ||
log.info( | ||
'Routing task because project uses conda. queue=%s', | ||
self.BUILD_LARGE_QUEUE, | ||
) | ||
return self.BUILD_LARGE_QUEUE | ||
|
||
# We do not have enough builds for this version yet | ||
if queryset.count() < self.n_builds: | ||
log.info( | ||
'Routing task because it does not have enough success builds yet. queue=%s', | ||
self.BUILD_LARGE_QUEUE, | ||
) | ||
return self.BUILD_LARGE_QUEUE | ||
|
||
# Build time average is high | ||
length_avg = queryset.filter(pk__in=last_builds).aggregate(Avg('length')).get('length__avg') | ||
if length_avg > self.time_average: | ||
log.info( | ||
'Routing task because project has high time average. queue=%s', | ||
self.BUILD_LARGE_QUEUE, | ||
) | ||
return self.BUILD_LARGE_QUEUE | ||
|
||
log.info( | ||
'Routing task to default queue because no conditions were met. queue=%s', | ||
settings.CELERY_DEFAULT_QUEUE, | ||
) | ||
return settings.CELERY_DEFAULT_QUEUE | ||
|
||
def _get_version(self, task, args, kwargs): | ||
if task == 'readthedocs.projects.tasks.update_docs_task': | ||
build_pk = kwargs.get('build_pk') | ||
try: | ||
build = Build.objects.get(pk=build_pk) | ||
version = build.version | ||
except Build.DoesNotExist: | ||
log.info( | ||
'Build does not exist. Routing task to default queue. build_pk=%s queue=%s', | ||
build_pk, | ||
settings.CELERY_DEFAULT_QUEUE, | ||
) | ||
return | ||
|
||
elif task == 'readthedocs.projects.tasks.sync_repository_task': | ||
version_pk = args[0] | ||
try: | ||
version = Version.objects.get(pk=version_pk) | ||
except Version.DoesNotExist: | ||
log.info( | ||
'Version does not exist. Routing task to default queue. version_pk=%s queue=%s', | ||
version_pk, | ||
settings.CELERY_DEFAULT_QUEUE, | ||
) | ||
return | ||
return version |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import django_dynamic_fixture as fixture | ||
from django.conf import settings | ||
from django.test import TestCase | ||
from django.test.utils import override_settings | ||
|
||
from readthedocs.builds.tasks import TaskRouter | ||
from readthedocs.builds.models import Build, Version | ||
from readthedocs.projects.models import Project, Feature | ||
|
||
|
||
|
||
class TaskRouterTests(TestCase): | ||
|
||
def setUp(self): | ||
self.project = fixture.get( | ||
Project, | ||
build_queue=None, | ||
) | ||
self.feature = fixture.get( | ||
Feature, | ||
feature_id=Feature.CELERY_ROUTER, | ||
) | ||
self.feature.projects.add(self.project) | ||
self.version = self.project.versions.first() | ||
self.build = fixture.get( | ||
Build, | ||
version=self.version, | ||
) | ||
for _ in range(TaskRouter.n_builds + 5): | ||
fixture.get( | ||
Build, | ||
version=self.version, | ||
|
||
) | ||
|
||
self.task = 'readthedocs.projects.tasks.update_docs_task' | ||
self.args = ( | ||
self.version.pk, | ||
) | ||
self.kwargs = { | ||
'build_pk': self.build.pk, | ||
} | ||
self.router = TaskRouter() | ||
|
||
def test_not_under_feature_flag(self): | ||
self.feature.projects.remove(self.project) | ||
self.assertEqual( | ||
self.router.route_for_task(self.task, self.args, self.kwargs), | ||
settings.CELERY_DEFAULT_QUEUE, | ||
) | ||
|
||
def test_project_custom_queue(self): | ||
self.project.build_queue = 'build:custom' | ||
self.project.save() | ||
|
||
self.assertEqual( | ||
self.router.route_for_task(self.task, self.args, self.kwargs), | ||
'build:custom', | ||
) | ||
|
||
def test_used_conda_in_last_builds(self): | ||
self.build._config = {'conda': {'file': 'docs/environment.yml'}} | ||
self.build.save() | ||
|
||
self.assertEqual( | ||
self.router.route_for_task(self.task, self.args, self.kwargs), | ||
TaskRouter.BUILD_LARGE_QUEUE, | ||
) | ||
|
||
def test_more_than_n_builds(self): | ||
self.assertEqual( | ||
self.router.route_for_task(self.task, self.args, self.kwargs), | ||
settings.CELERY_DEFAULT_QUEUE, | ||
) | ||
|
||
def test_non_build_task(self): | ||
self.assertEqual( | ||
self.router.route_for_task('non_build_task', self.args, self.kwargs), | ||
settings.CELERY_DEFAULT_QUEUE, | ||
) | ||
|
||
def test_no_build_pk(self): | ||
self.assertEqual( | ||
self.router.route_for_task(self.task, self.args, {}), | ||
settings.CELERY_DEFAULT_QUEUE, | ||
) | ||
|
||
def test_build_length_high_average(self): | ||
high_length = TaskRouter.time_average + 50 | ||
self.version.builds.update(length=high_length) | ||
self.assertEqual( | ||
self.router.route_for_task(self.task, self.args, self.kwargs), | ||
TaskRouter.BUILD_LARGE_QUEUE, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters