Skip to content

Commit

Permalink
Designate a queue for running the pipeline tasks
Browse files Browse the repository at this point in the history
Fix syntax error and PEP8

add space

hardcode worker

fixup! hardcode worker

fix test

fix another test

fix test funtino

fixup! fix test funtino

fix monkeypatch

fixup! fix monkeypatch

fixup! fix monkeypatch

fix test again

fix test again

fix test again

fix test again

remove settings

fix routing

more fixes

more fixes

more fixes

more fixes on task

more fixes and new way of routing

re add ,

remove unused param

remove unused param 2

allow figures to route tasks to specific queues

undo task changes

undo settings changes

undo settings changes

fix settings name

fix settings name second time

test task

more concrete definition of tasks

fix method

fix method

try apply async

test order

use group

fix

fix

fix settings

try our router

fix char

fix char

try our router

try our router

remove comments

fix flake8

fix flake8 v2

remove unused args

add settings test

add default None
  • Loading branch information
melvinsoft committed Jun 4, 2021
1 parent 5a105a8 commit 0cfdcad
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 12 deletions.
10 changes: 9 additions & 1 deletion devsite/devsite/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
update_celerybeat_schedule,
# TODO: https://appsembler.atlassian.net/browse/RED-673
# update_webpack_loader,
update_celery_routes,
)


Expand Down Expand Up @@ -148,6 +149,13 @@ def root(*args):
'FIGURES': {}, # This variable is patched by the Figures' `lms_production.py` settings module.
}

PRJ_SETTINGS = {
'CELERY_ROUTES': "app.celery.routes"
}

FIGURES_PIPELINE_TASKS_ROUTING_KEY = ""

# TODO: https://appsembler.atlassian.net/browse/RED-673
# update_webpack_loader(WEBPACK_LOADER, ENV_TOKENS)
update_celerybeat_schedule(CELERYBEAT_SCHEDULE, ENV_TOKENS)
update_celerybeat_schedule(CELERYBEAT_SCHEDULE, ENV_TOKENS, FIGURES_PIPELINE_TASKS_ROUTING_KEY)
update_celery_routes(PRJ_SETTINGS, ENV_TOKENS, FIGURES_PIPELINE_TASKS_ROUTING_KEY)
44 changes: 42 additions & 2 deletions figures/settings/lms_production.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
from celery.schedules import crontab


class FiguresRouter(object):

def __init__(self, figures_tasks_queue_name):
self.figures_tasks_queue_name = figures_tasks_queue_name

def route_for_task(self, task):
if task.startswith("figures.tasks."):
return self.figures_tasks_queue_name

return None


def update_webpack_loader(webpack_loader_settings, figures_env_tokens):
"""
Update the WEBPACK_LOADER in the settings.
Expand All @@ -23,14 +35,21 @@ def update_webpack_loader(webpack_loader_settings, figures_env_tokens):
})


def update_celerybeat_schedule(celerybeat_schedule_settings, figures_env_tokens):
def update_celerybeat_schedule(
celerybeat_schedule_settings,
figures_env_tokens,
figures_tasks_queue):
"""
Figures pipeline job schedule configuration in CELERYBEAT_SCHEDULE.
Daily metrics pipeline scheduler is on by default
Course MAU metrics pipeline scheduler is off by default
TODO: Language improvement: Change the "IMPORT" to "CAPTURE" or "EXTRACT"
We need to set the celery queue for each scheduled task again here, celery
beat does not check CELERY_ROUTES for tasks scheduling:
https://stackoverflow.com/questions/51631455/how-to-route-tasks-to-different-queues-with-celery-and-django
"""
if figures_env_tokens.get('ENABLE_DAILY_METRICS_IMPORT', True):
celerybeat_schedule_settings['figures-populate-daily-metrics'] = {
Expand All @@ -39,6 +58,7 @@ def update_celerybeat_schedule(celerybeat_schedule_settings, figures_env_tokens)
hour=figures_env_tokens.get('DAILY_METRICS_IMPORT_HOUR', 2),
minute=figures_env_tokens.get('DAILY_METRICS_IMPORT_MINUTE', 0),
),
'options': {'queue': figures_tasks_queue},
}

if figures_env_tokens.get('ENABLE_DAILY_MAU_IMPORT', False):
Expand All @@ -48,15 +68,26 @@ def update_celerybeat_schedule(celerybeat_schedule_settings, figures_env_tokens)
hour=figures_env_tokens.get('DAILY_MAU_IMPORT_HOUR', 0),
minute=figures_env_tokens.get('DAILY_MAU_IMPORT_MINUTE', 0),
),
'options': {'queue': figures_tasks_queue},
}

if figures_env_tokens.get('ENABLE_FIGURES_MONTHLY_METRICS', True):
celerybeat_schedule_settings['figures-monthly-metrics'] = {
'task': 'figures.tasks.run_figures_monthly_metrics',
'schedule': crontab(0, 0, day_of_month=1),
'options': {'queue': figures_tasks_queue},
}


def update_celery_routes(platform_settings, figures_env_tokens, celery_tasks_queue):
"""
https://docs.celeryproject.org/en/3.1/userguide/routing.html#manual-routing
"""
if figures_env_tokens.get('FIGURES_PIPELINE_TASKS_ROUTING_KEY', False):
figures_router = FiguresRouter(celery_tasks_queue)
platform_settings.CELERY_ROUTES = (platform_settings.CELERY_ROUTES, figures_router)


def plugin_settings(settings):
"""
Update the LMS/Production (aka AWS) settings to use Figures properly.
Expand All @@ -78,8 +109,17 @@ def plugin_settings(settings):
"""
settings.ENV_TOKENS.setdefault('FIGURES', {})
figures_tasks_default_queue = settings.ENV_TOKENS['FIGURES'].get(
'FIGURES_PIPELINE_TASKS_ROUTING_KEY',
settings.CELERY_DEFAULT_ROUTING_KEY
)
update_webpack_loader(settings.WEBPACK_LOADER, settings.ENV_TOKENS['FIGURES'])
update_celerybeat_schedule(settings.CELERYBEAT_SCHEDULE, settings.ENV_TOKENS['FIGURES'])
update_celerybeat_schedule(
settings.CELERYBEAT_SCHEDULE,
settings.ENV_TOKENS['FIGURES'],
figures_tasks_default_queue
)
update_celery_routes(settings, settings.ENV_TOKENS['FIGURES'], figures_tasks_default_queue)

settings.CELERY_IMPORTS += (
"figures.tasks",
Expand Down
6 changes: 3 additions & 3 deletions figures/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from django.contrib.sites.models import Site
from django.utils.timezone import utc

from celery import chord
from celery import chord, group
from celery.app import shared_task
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -379,5 +379,5 @@ def run_figures_monthly_metrics():
Populate monthly metrics for all sites.
"""
logger.info('Starting figures.tasks.run_figures_monthly_metrics...')
for site in get_sites():
populate_monthly_metrics_for_site.delay(site_id=site.id)
all_sites_jobs = group(populate_monthly_metrics_for_site.s(site.id) for site in get_sites())
all_sites_jobs.delay()
10 changes: 5 additions & 5 deletions tests/tasks/test_monthly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ def test_run_figures_monthly_metrics_with_faked_subtask(transactional_db, monkey
Faking the subtask for the function under test
"""
expected_sites = Site.objects.all()
assert expected_sites.count()
assert expected_sites
sites_visited = []

def fake_populate_monthly_metrics_for_site(site_id):
sites_visited.append(site_id)
def fake_populate_monthly_metrics_for_site(celery_task_group):
for t in celery_task_group.tasks:
sites_visited.extend(t.args)

monkeypatch.setattr('figures.tasks.populate_monthly_metrics_for_site.delay',
fake_populate_monthly_metrics_for_site)
monkeypatch.setattr('celery.group.delay', fake_populate_monthly_metrics_for_site)

run_figures_monthly_metrics()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_daily_mau_pipeline_flag_enabled(self):
self.settings.ENV_TOKENS['FIGURES'] = { 'ENABLE_DAILY_MAU_IMPORT': True }
plugin_settings(self.settings)
assert self.TASK_NAME in self.settings.CELERYBEAT_SCHEDULE
assert set(['task', 'schedule']) == set(
assert set(['task', 'schedule', 'options']) == set(
self.settings.CELERYBEAT_SCHEDULE[self.TASK_NAME].keys())

def test_daily_mau_pipeline_flag_disabled(self):
Expand Down

0 comments on commit 0cfdcad

Please sign in to comment.