From 0cfdcade74cfffb98f831f46c6681f5f2a33dde2 Mon Sep 17 00:00:00 2001 From: melvinsoft Date: Fri, 21 May 2021 12:51:55 -0300 Subject: [PATCH] Designate a queue for running the pipeline tasks Fix syntax error and PEP8 add space hardcode worker fixup! hardcode worker fix test fix another test fix test funtino fixup! fix test funtino fix monkeypatch fixup! fix monkeypatch fixup! fix monkeypatch fix test again fix test again fix test again fix test again remove settings fix routing more fixes more fixes more fixes more fixes on task more fixes and new way of routing re add , remove unused param remove unused param 2 allow figures to route tasks to specific queues undo task changes undo settings changes undo settings changes fix settings name fix settings name second time test task more concrete definition of tasks fix method fix method try apply async test order use group fix fix fix settings try our router fix char fix char try our router try our router remove comments fix flake8 fix flake8 v2 remove unused args add settings test add default None --- devsite/devsite/test_settings.py | 10 ++++++- figures/settings/lms_production.py | 44 ++++++++++++++++++++++++++++-- figures/tasks.py | 6 ++-- tests/tasks/test_monthly_tasks.py | 10 +++---- tests/test_settings.py | 2 +- 5 files changed, 60 insertions(+), 12 deletions(-) diff --git a/devsite/devsite/test_settings.py b/devsite/devsite/test_settings.py index 3e03d0b8..3ff3acd9 100644 --- a/devsite/devsite/test_settings.py +++ b/devsite/devsite/test_settings.py @@ -13,6 +13,7 @@ update_celerybeat_schedule, # TODO: https://appsembler.atlassian.net/browse/RED-673 # update_webpack_loader, + update_celery_routes, ) @@ -148,6 +149,13 @@ def root(*args): 'FIGURES': {}, # This variable is patched by the Figures' `lms_production.py` settings module. } +PRJ_SETTINGS = { + 'CELERY_ROUTES': "app.celery.routes" +} + +FIGURES_PIPELINE_TASKS_ROUTING_KEY = "" + # TODO: https://appsembler.atlassian.net/browse/RED-673 # update_webpack_loader(WEBPACK_LOADER, ENV_TOKENS) -update_celerybeat_schedule(CELERYBEAT_SCHEDULE, ENV_TOKENS) +update_celerybeat_schedule(CELERYBEAT_SCHEDULE, ENV_TOKENS, FIGURES_PIPELINE_TASKS_ROUTING_KEY) +update_celery_routes(PRJ_SETTINGS, ENV_TOKENS, FIGURES_PIPELINE_TASKS_ROUTING_KEY) diff --git a/figures/settings/lms_production.py b/figures/settings/lms_production.py index 5f9217f5..cfb70fe8 100644 --- a/figures/settings/lms_production.py +++ b/figures/settings/lms_production.py @@ -6,6 +6,18 @@ from celery.schedules import crontab +class FiguresRouter(object): + + def __init__(self, figures_tasks_queue_name): + self.figures_tasks_queue_name = figures_tasks_queue_name + + def route_for_task(self, task): + if task.startswith("figures.tasks."): + return self.figures_tasks_queue_name + + return None + + def update_webpack_loader(webpack_loader_settings, figures_env_tokens): """ Update the WEBPACK_LOADER in the settings. @@ -23,7 +35,10 @@ def update_webpack_loader(webpack_loader_settings, figures_env_tokens): }) -def update_celerybeat_schedule(celerybeat_schedule_settings, figures_env_tokens): +def update_celerybeat_schedule( + celerybeat_schedule_settings, + figures_env_tokens, + figures_tasks_queue): """ Figures pipeline job schedule configuration in CELERYBEAT_SCHEDULE. @@ -31,6 +46,10 @@ def update_celerybeat_schedule(celerybeat_schedule_settings, figures_env_tokens) Course MAU metrics pipeline scheduler is off by default TODO: Language improvement: Change the "IMPORT" to "CAPTURE" or "EXTRACT" + + We need to set the celery queue for each scheduled task again here, celery + beat does not check CELERY_ROUTES for tasks scheduling: + https://stackoverflow.com/questions/51631455/how-to-route-tasks-to-different-queues-with-celery-and-django """ if figures_env_tokens.get('ENABLE_DAILY_METRICS_IMPORT', True): celerybeat_schedule_settings['figures-populate-daily-metrics'] = { @@ -39,6 +58,7 @@ def update_celerybeat_schedule(celerybeat_schedule_settings, figures_env_tokens) hour=figures_env_tokens.get('DAILY_METRICS_IMPORT_HOUR', 2), minute=figures_env_tokens.get('DAILY_METRICS_IMPORT_MINUTE', 0), ), + 'options': {'queue': figures_tasks_queue}, } if figures_env_tokens.get('ENABLE_DAILY_MAU_IMPORT', False): @@ -48,15 +68,26 @@ def update_celerybeat_schedule(celerybeat_schedule_settings, figures_env_tokens) hour=figures_env_tokens.get('DAILY_MAU_IMPORT_HOUR', 0), minute=figures_env_tokens.get('DAILY_MAU_IMPORT_MINUTE', 0), ), + 'options': {'queue': figures_tasks_queue}, } if figures_env_tokens.get('ENABLE_FIGURES_MONTHLY_METRICS', True): celerybeat_schedule_settings['figures-monthly-metrics'] = { 'task': 'figures.tasks.run_figures_monthly_metrics', 'schedule': crontab(0, 0, day_of_month=1), + 'options': {'queue': figures_tasks_queue}, } +def update_celery_routes(platform_settings, figures_env_tokens, celery_tasks_queue): + """ + https://docs.celeryproject.org/en/3.1/userguide/routing.html#manual-routing + """ + if figures_env_tokens.get('FIGURES_PIPELINE_TASKS_ROUTING_KEY', False): + figures_router = FiguresRouter(celery_tasks_queue) + platform_settings.CELERY_ROUTES = (platform_settings.CELERY_ROUTES, figures_router) + + def plugin_settings(settings): """ Update the LMS/Production (aka AWS) settings to use Figures properly. @@ -78,8 +109,17 @@ def plugin_settings(settings): """ settings.ENV_TOKENS.setdefault('FIGURES', {}) + figures_tasks_default_queue = settings.ENV_TOKENS['FIGURES'].get( + 'FIGURES_PIPELINE_TASKS_ROUTING_KEY', + settings.CELERY_DEFAULT_ROUTING_KEY + ) update_webpack_loader(settings.WEBPACK_LOADER, settings.ENV_TOKENS['FIGURES']) - update_celerybeat_schedule(settings.CELERYBEAT_SCHEDULE, settings.ENV_TOKENS['FIGURES']) + update_celerybeat_schedule( + settings.CELERYBEAT_SCHEDULE, + settings.ENV_TOKENS['FIGURES'], + figures_tasks_default_queue + ) + update_celery_routes(settings, settings.ENV_TOKENS['FIGURES'], figures_tasks_default_queue) settings.CELERY_IMPORTS += ( "figures.tasks", diff --git a/figures/tasks.py b/figures/tasks.py index bba37868..e5624e48 100644 --- a/figures/tasks.py +++ b/figures/tasks.py @@ -13,7 +13,7 @@ from django.contrib.sites.models import Site from django.utils.timezone import utc -from celery import chord +from celery import chord, group from celery.app import shared_task from celery.utils.log import get_task_logger @@ -379,5 +379,5 @@ def run_figures_monthly_metrics(): Populate monthly metrics for all sites. """ logger.info('Starting figures.tasks.run_figures_monthly_metrics...') - for site in get_sites(): - populate_monthly_metrics_for_site.delay(site_id=site.id) + all_sites_jobs = group(populate_monthly_metrics_for_site.s(site.id) for site in get_sites()) + all_sites_jobs.delay() diff --git a/tests/tasks/test_monthly_tasks.py b/tests/tasks/test_monthly_tasks.py index 5f39877f..742fb08e 100644 --- a/tests/tasks/test_monthly_tasks.py +++ b/tests/tasks/test_monthly_tasks.py @@ -79,14 +79,14 @@ def test_run_figures_monthly_metrics_with_faked_subtask(transactional_db, monkey Faking the subtask for the function under test """ expected_sites = Site.objects.all() - assert expected_sites.count() + assert expected_sites sites_visited = [] - def fake_populate_monthly_metrics_for_site(site_id): - sites_visited.append(site_id) + def fake_populate_monthly_metrics_for_site(celery_task_group): + for t in celery_task_group.tasks: + sites_visited.extend(t.args) - monkeypatch.setattr('figures.tasks.populate_monthly_metrics_for_site.delay', - fake_populate_monthly_metrics_for_site) + monkeypatch.setattr('celery.group.delay', fake_populate_monthly_metrics_for_site) run_figures_monthly_metrics() diff --git a/tests/test_settings.py b/tests/test_settings.py index 4771727a..e9192377 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -118,7 +118,7 @@ def test_daily_mau_pipeline_flag_enabled(self): self.settings.ENV_TOKENS['FIGURES'] = { 'ENABLE_DAILY_MAU_IMPORT': True } plugin_settings(self.settings) assert self.TASK_NAME in self.settings.CELERYBEAT_SCHEDULE - assert set(['task', 'schedule']) == set( + assert set(['task', 'schedule', 'options']) == set( self.settings.CELERYBEAT_SCHEDULE[self.TASK_NAME].keys()) def test_daily_mau_pipeline_flag_disabled(self):