Skip to content

Commit

Permalink
Merge pull request #3936 from vkWeb/hot-to-un
Browse files Browse the repository at this point in the history
Merge "hotfixes" to "unstable" with tsvector command incompatibility fix
  • Loading branch information
bjester authored Jan 30, 2023
2 parents b448703 + b8efa5e commit 4cfcb13
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 126 deletions.
5 changes: 5 additions & 0 deletions contentcuration/contentcuration/constants/locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Constants for locking behaviors, like advisory locking in Postgres, and mutexes
"""
TREE_LOCK = 1001
TASK_LOCK = 1002
32 changes: 29 additions & 3 deletions contentcuration/contentcuration/db/advisory_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,36 @@

logging = logger.getLogger(__name__)

# signed limits are 2**32 or 2**64, so one less power of 2
# to become unsigned limits (half above 0, half below 0)
INT_32BIT = 2**31
INT_64BIT = 2**63


class AdvisoryLockBusy(RuntimeError):
pass


def _prepare_keys(keys):
"""
Ensures that integers do not exceed postgres constraints:
- signed 64bit allowed with single key
- signed 32bit allowed with two keys
:param keys: A list of unsigned integers
:return: A list of signed integers
"""
limit = INT_64BIT if len(keys) == 1 else INT_32BIT
new_keys = []
for key in keys:
# if key is over the limit, convert to negative int since key should be unsigned int
if key >= limit:
key = limit - key
if key < -limit or key >= limit:
raise OverflowError(f"Advisory lock key '{key}' is too large")
new_keys.append(key)
return new_keys


@contextmanager
def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wait=True):
"""
Expand All @@ -32,6 +57,7 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys = [key1]
if key2 is not None:
keys.append(key2)
keys = _prepare_keys(keys)

query = "SELECT pg{_try}_advisory_{xact_}{lock}{_shared}({keys}) AS lock;".format(
_try="" if wait else "_try",
Expand All @@ -41,11 +67,11 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys=", ".join(["%s" for i in range(0, 2 if key2 is not None else 1)])
)

log_query = "'{}' with params {}".format(query, keys)
logging.debug("Acquiring advisory lock: {}".format(query, log_query))
log_query = f"'{query}' with params {keys}"
logging.debug(f"Acquiring advisory lock: {log_query}")
with connection.cursor() as c:
c.execute(query, keys)
logging.debug("Acquired advisory lock: {}".format(query, log_query))
logging.debug(f"Acquired advisory lock: {log_query}")
yield c


Expand Down
2 changes: 1 addition & 1 deletion contentcuration/contentcuration/db/models/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from mptt.managers import TreeManager
from mptt.signals import node_moved

from contentcuration.constants.locking import TREE_LOCK
from contentcuration.db.advisory_lock import advisory_lock
from contentcuration.db.models.query import CustomTreeQuerySet
from contentcuration.utils.cache import ResourceSizeCache
Expand All @@ -32,7 +33,6 @@
# The exact optimum batch size is probably highly dependent on tree
# topology also, so these rudimentary tests are likely insufficient
BATCH_SIZE = 100
TREE_LOCK = 1001


class CustomManager(Manager.from_queryset(CTEQuerySet)):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import get from 'lodash/get';
import partition from 'lodash/partition';
import chunk from 'lodash/chunk';
import uniq from 'lodash/uniq';
import uniqBy from 'lodash/uniqBy';
import defer from 'lodash/defer';
Expand Down Expand Up @@ -83,12 +84,20 @@ export function loadClipboardNodes(context, { parent }) {
const legacyNodeIds = legacyNodes.map(n => n.id);

return Promise.all([
context.dispatch(
'contentNode/loadContentNodes',
{ '[node_id+channel_id]__in': nodeIdChannelIdPairs },
{ root }
// To avoid error code 414 URI Too Long errors, we chunk the pairs
// Given URI limit is 2000 chars:
// base URL at 100 chars + each pair at 70 chars = max 27 pairs
...chunk(nodeIdChannelIdPairs, 25).map(chunkPairs =>
context.dispatch(
'contentNode/loadContentNodes',
{ '[node_id+channel_id]__in': chunkPairs },
{ root }
)
),
// Chunk legacy nodes, double the size since not pairs
...chunk(legacyNodeIds, 50).map(legacyChunk =>
context.dispatch('contentNode/loadContentNodes', { id__in: legacyChunk }, { root })
),
context.dispatch('contentNode/loadContentNodes', { id__in: legacyNodeIds }, { root }),
]).then(() => {
return context.dispatch('addClipboardNodes', {
nodes: clipboardNodes,
Expand Down
2 changes: 1 addition & 1 deletion contentcuration/contentcuration/frontend/shared/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ client.interceptors.response.use(
}
}

message = message ? `${message}: ${url}` : `Network Error: ${url}`;
message = message ? `${message}: [${status}] ${url}` : `Network Error: [${status}] ${url}`;

if (process.env.NODE_ENV !== 'production') {
// In dev build log warnings to console for developer use
Expand Down
23 changes: 19 additions & 4 deletions contentcuration/contentcuration/frontend/shared/data/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Dexie from 'dexie';
import * as Sentry from '@sentry/vue';
import mapValues from 'lodash/mapValues';
import channel from './broadcastChannel';
import { CHANGES_TABLE, IGNORED_SOURCE, TABLE_NAMES } from './constants';
Expand Down Expand Up @@ -47,11 +48,25 @@ function runElection() {
elector.awaitLeadership().then(startSyncing);
elector.onduplicate = () => {
stopSyncing();
elector.die().then(runElection);
elector
.die()
.then(() => {
// manually reset reference to dead elector on the channel
// which is set within `createLeaderElection` and whose
// presence is also validated against, requiring its removal
channel._leaderElector = null;
return runElection();
})
.catch(Sentry.captureException);
};
}

export function initializeDB() {
setupSchema();
return db.open().then(runElection);
export async function initializeDB() {
try {
setupSchema();
await db.open();
await runElection();
} catch (e) {
Sentry.captureException(e);
}
}
15 changes: 15 additions & 0 deletions contentcuration/contentcuration/frontend/shared/data/serverSync.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as Sentry from '@sentry/vue';
import debounce from 'lodash/debounce';
import findLastIndex from 'lodash/findLastIndex';
import get from 'lodash/get';
Expand Down Expand Up @@ -99,6 +100,20 @@ function handleDisallowed(response) {
// that were rejected.
const disallowed = get(response, ['data', 'disallowed'], []);
if (disallowed.length) {
// Capture occurrences of the api disallowing changes
if (process.env.NODE_ENV === 'production') {
Sentry.withScope(function(scope) {
scope.addAttachment({
filename: 'disallowed.json',
data: JSON.stringify(disallowed),
contentType: 'application/json',
});
Sentry.captureException(new Error('/api/sync returned disallowed changes'));
});
} else {
console.warn('/api/sync returned disallowed changes:', disallowed); // eslint-disable-line no-console
}

// Collect all disallowed
const disallowedRevs = disallowed.map(d => Number(d.rev));
// Set the return error data onto the changes - this will update the change
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 3.2.14 on 2022-12-09 16:09
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

replaces = [('django_celery_results', '0140_delete_task'),]

def __init__(self, name, app_label):
super(Migration, self).__init__(name, 'django_celery_results')

dependencies = [
('contentcuration', '0141_soft_delete_user'),
('django_celery_results', '0011_taskresult_periodic_task_name'),
]

operations = [
migrations.AddField(
model_name='taskresult',
name='signature',
field=models.CharField(max_length=32, null=True),
),
migrations.AddIndex(
model_name='taskresult',
index=models.Index(condition=models.Q(('status__in', frozenset(['STARTED', 'REJECTED', 'RETRY', 'RECEIVED', 'PENDING']))), fields=['signature'], name='task_result_signature_idx'),
),
]
40 changes: 39 additions & 1 deletion contentcuration/contentcuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime

import pytz
from celery import states as celery_states
from django.conf import settings
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.base_user import BaseUserManager
Expand Down Expand Up @@ -75,6 +76,7 @@
from contentcuration.db.models.manager import CustomManager
from contentcuration.statistics import record_channel_stats
from contentcuration.utils.cache import delete_public_channel_cache_keys
from contentcuration.utils.celery.tasks import generate_task_signature
from contentcuration.utils.parser import load_json_string
from contentcuration.viewsets.sync.constants import ALL_CHANGES
from contentcuration.viewsets.sync.constants import ALL_TABLES
Expand Down Expand Up @@ -2541,13 +2543,20 @@ def serialize_to_change_dict(self):
class TaskResultCustom(object):
"""
Custom fields to add to django_celery_results's TaskResult model
If adding fields to this class, run `makemigrations` then move the generated migration from the
`django_celery_results` app to the `contentcuration` app and override the constructor to change
the app_label. See `0141_add_task_signature` for an example
"""
# user shouldn't be null, but in order to append the field, this needs to be allowed
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="tasks", on_delete=models.CASCADE, null=True)
channel_id = DjangoUUIDField(db_index=True, null=True, blank=True)
progress = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0), MaxValueValidator(100)])
# a hash of the task name and kwargs for identifying repeat tasks
signature = models.CharField(null=True, blank=False, max_length=32)

super_as_dict = TaskResult.as_dict
super_save = TaskResult.save

def as_dict(self):
"""
Expand All @@ -2561,16 +2570,45 @@ def as_dict(self):
)
return super_dict

def set_signature(self):
"""
Generates and sets the signature for the task if it isn't set
"""
if self.signature is not None:
# nothing to do
return
self.signature = generate_task_signature(self.task_name, task_kwargs=self.task_kwargs, channel_id=self.channel_id)

def save(self, *args, **kwargs):
"""
Override save to ensure signature is generated
"""
self.set_signature()
return self.super_save(*args, **kwargs)

@classmethod
def contribute_to_class(cls, model_class=TaskResult):
"""
Adds fields to model, by default TaskResult
:param model_class: TaskResult model
"""
for field in dir(cls):
if not field.startswith("_"):
if not field.startswith("_") and field not in ('contribute_to_class', 'Meta'):
model_class.add_to_class(field, getattr(cls, field))

# manually add Meta afterwards
setattr(model_class._meta, 'indexes', getattr(model_class._meta, 'indexes', []) + cls.Meta.indexes)

class Meta:
indexes = [
# add index that matches query usage for signature
models.Index(
fields=['signature'],
name='task_result_signature_idx',
condition=Q(status__in=celery_states.UNREADY_STATES),
),
]


# trigger class contributions immediately
TaskResultCustom.contribute_to_class()
3 changes: 2 additions & 1 deletion contentcuration/contentcuration/tests/test_asynctask.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ def test_fetch_or_enqueue_task__channel_id__uuid_then_hex(self):
self.assertEqual(expected_task.task_id, async_result.task_id)

def test_requeue_task(self):
existing_task_ids = requeue_test_task.find_ids()
signature = requeue_test_task._generate_signature({})
existing_task_ids = requeue_test_task.find_ids(signature)
self.assertEqual(len(existing_task_ids), 0)

first_async_result = requeue_test_task.enqueue(self.user, requeue=True)
Expand Down
Loading

0 comments on commit 4cfcb13

Please sign in to comment.