Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task signature and locking when processing fetch_or_enqueue #3875

Merged
merged 3 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions contentcuration/contentcuration/constants/locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Constants for locking behaviors, like advisory locking in Postgres, and mutexes
"""
TREE_LOCK = 1001
TASK_LOCK = 1002
32 changes: 29 additions & 3 deletions contentcuration/contentcuration/db/advisory_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,36 @@

logging = logger.getLogger(__name__)

# signed limits are 2**32 or 2**64, so one less power of 2
# to become unsigned limits (half above 0, half below 0)
INT_32BIT = 2**31
INT_64BIT = 2**63


class AdvisoryLockBusy(RuntimeError):
pass


def _prepare_keys(keys):
"""
Ensures that integers do not exceed postgres constraints:
- signed 64bit allowed with single key
- signed 32bit allowed with two keys
:param keys: A list of unsigned integers
:return: A list of signed integers
"""
limit = INT_64BIT if len(keys) == 1 else INT_32BIT
new_keys = []
for key in keys:
# if key is over the limit, convert to negative int since key should be unsigned int
if key >= limit:
key = limit - key
if key < -limit or key >= limit:
raise OverflowError(f"Advisory lock key '{key}' is too large")
new_keys.append(key)
return new_keys


@contextmanager
def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wait=True):
"""
Expand All @@ -32,6 +57,7 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys = [key1]
if key2 is not None:
keys.append(key2)
keys = _prepare_keys(keys)

query = "SELECT pg{_try}_advisory_{xact_}{lock}{_shared}({keys}) AS lock;".format(
_try="" if wait else "_try",
Expand All @@ -41,11 +67,11 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys=", ".join(["%s" for i in range(0, 2 if key2 is not None else 1)])
)

log_query = "'{}' with params {}".format(query, keys)
logging.debug("Acquiring advisory lock: {}".format(query, log_query))
log_query = f"'{query}' with params {keys}"
logging.debug(f"Acquiring advisory lock: {log_query}")
with connection.cursor() as c:
c.execute(query, keys)
logging.debug("Acquired advisory lock: {}".format(query, log_query))
logging.debug(f"Acquired advisory lock: {log_query}")
yield c


Expand Down
2 changes: 1 addition & 1 deletion contentcuration/contentcuration/db/models/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from mptt.managers import TreeManager
from mptt.signals import node_moved

from contentcuration.constants.locking import TREE_LOCK
from contentcuration.db.advisory_lock import advisory_lock
from contentcuration.db.models.query import CustomTreeQuerySet
from contentcuration.utils.cache import ResourceSizeCache
Expand All @@ -32,7 +33,6 @@
# The exact optimum batch size is probably highly dependent on tree
# topology also, so these rudimentary tests are likely insufficient
BATCH_SIZE = 100
TREE_LOCK = 1001


class CustomManager(Manager.from_queryset(CTEQuerySet)):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 3.2.14 on 2022-12-09 16:09
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

def __init__(self, name, app_label):
super(Migration, self).__init__(name, 'django_celery_results')

dependencies = [
('contentcuration', '0140_delete_task'),
('django_celery_results', '0011_taskresult_periodic_task_name'),
]

operations = [
migrations.AddField(
model_name='taskresult',
name='signature',
field=models.CharField(max_length=32, null=True),
),
migrations.AddIndex(
model_name='taskresult',
index=models.Index(condition=models.Q(('status__in', frozenset(['STARTED', 'REJECTED', 'RETRY', 'RECEIVED', 'PENDING']))), fields=['signature'], name='task_result_signature_idx'),
),
]
40 changes: 39 additions & 1 deletion contentcuration/contentcuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime

import pytz
from celery import states as celery_states
from django.conf import settings
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.base_user import BaseUserManager
Expand Down Expand Up @@ -74,6 +75,7 @@
from contentcuration.db.models.manager import CustomManager
from contentcuration.statistics import record_channel_stats
from contentcuration.utils.cache import delete_public_channel_cache_keys
from contentcuration.utils.celery.tasks import generate_task_signature
from contentcuration.utils.parser import load_json_string
from contentcuration.viewsets.sync.constants import ALL_CHANGES
from contentcuration.viewsets.sync.constants import ALL_TABLES
Expand Down Expand Up @@ -2436,13 +2438,20 @@ def serialize_to_change_dict(self):
class TaskResultCustom(object):
"""
Custom fields to add to django_celery_results's TaskResult model

If adding fields to this class, run `makemigrations` then move the generated migration from the
`django_celery_results` app to the `contentcuration` app and override the constructor to change
the app_label. See `0141_add_task_signature` for an example
"""
# user shouldn't be null, but in order to append the field, this needs to be allowed
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="tasks", on_delete=models.CASCADE, null=True)
channel_id = DjangoUUIDField(db_index=True, null=True, blank=True)
progress = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0), MaxValueValidator(100)])
# a hash of the task name and kwargs for identifying repeat tasks
signature = models.CharField(null=True, blank=False, max_length=32)

super_as_dict = TaskResult.as_dict
super_save = TaskResult.save

def as_dict(self):
"""
Expand All @@ -2456,16 +2465,45 @@ def as_dict(self):
)
return super_dict

def set_signature(self):
"""
Generates and sets the signature for the task if it isn't set
"""
if self.signature is not None:
# nothing to do
return
self.signature = generate_task_signature(self.task_name, task_kwargs=self.task_kwargs, channel_id=self.channel_id)

def save(self, *args, **kwargs):
"""
Override save to ensure signature is generated
"""
self.set_signature()
return self.super_save(*args, **kwargs)

@classmethod
def contribute_to_class(cls, model_class=TaskResult):
"""
Adds fields to model, by default TaskResult
:param model_class: TaskResult model
"""
for field in dir(cls):
if not field.startswith("_"):
if not field.startswith("_") and field not in ('contribute_to_class', 'Meta'):
model_class.add_to_class(field, getattr(cls, field))

# manually add Meta afterwards
setattr(model_class._meta, 'indexes', getattr(model_class._meta, 'indexes', []) + cls.Meta.indexes)

class Meta:
indexes = [
# add index that matches query usage for signature
models.Index(
fields=['signature'],
name='task_result_signature_idx',
condition=Q(status__in=celery_states.UNREADY_STATES),
),
]


# trigger class contributions immediately
TaskResultCustom.contribute_to_class()
3 changes: 2 additions & 1 deletion contentcuration/contentcuration/tests/test_asynctask.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ def test_fetch_or_enqueue_task__channel_id__uuid_then_hex(self):
self.assertEqual(expected_task.task_id, async_result.task_id)

def test_requeue_task(self):
existing_task_ids = requeue_test_task.find_ids()
signature = requeue_test_task._generate_signature({})
existing_task_ids = requeue_test_task.find_ids(signature)
self.assertEqual(len(existing_task_ids), 0)

first_async_result = requeue_test_task.enqueue(self.user, requeue=True)
Expand Down
Loading