Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge "hotfixes" to "unstable" with tsvector command incompatibility fix #3936

Merged
merged 22 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7e3a110
Merge pull request #3864 from learningequality/hotfixes
bjester Dec 7, 2022
54f06ad
Add task signature and locking when processing fetch_or_enqueue
bjester Dec 9, 2022
00ca70f
Add status code to sentry error messages
bjester Dec 16, 2022
646bcb0
Report disallowed changes to sentry
bjester Dec 16, 2022
bfe9481
Chunk request params when loading clipboard nodes
bjester Dec 16, 2022
9e2a392
Merge pull request #3888 from bjester/sentry-insights-and-chunking
rtibbles Dec 16, 2022
f3c9a01
Merge branch 'master' into hotfixes
bjester Dec 16, 2022
e150a15
Merge pull request #3889 from learningequality/hotfixes
bjester Dec 16, 2022
0c20063
Optimized tsvectors insertion 🚀
vkWeb Dec 20, 2022
ec99d47
Don't create tsvectors for incomplete and unpublished nodes
vkWeb Dec 27, 2022
15971bb
Merge pull request #3892 from vkWeb/optimize-tsvectors
bjester Jan 5, 2023
154cb17
Correct integer bounds
bjester Jan 6, 2023
4f282bf
Reset elector on duplicate, and capture errors in Sentry
bjester Jan 13, 2023
a918204
Merge pull request #3907 from bjester/leader-election-fix
rtibbles Jan 13, 2023
dc85747
Use OrderedDict for task_kwargs and update docstrings
bjester Jan 17, 2023
b77bea1
Merge pull request #3875 from bjester/task-signature
bjester Jan 17, 2023
a1ba192
Merge branch 'master' into hotfixes
bjester Jan 17, 2023
90e41a9
Add line that prevents Django reapplying the migration
bjester Jan 18, 2023
395afee
Update translation in JSON file
bjester Jan 18, 2023
a32a357
Merge pull request #3911 from bjester/fix-task-signature-migration
bjester Jan 18, 2023
3d89859
Merge pull request #3913 from bjester/reflexionar-fix
bjester Jan 18, 2023
b8efa5e
Merge branch 'hotfixes' into hot-to-un
vkWeb Jan 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions contentcuration/contentcuration/constants/locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Constants for locking behaviors, like advisory locking in Postgres, and mutexes
"""
TREE_LOCK = 1001
TASK_LOCK = 1002
32 changes: 29 additions & 3 deletions contentcuration/contentcuration/db/advisory_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,36 @@

logging = logger.getLogger(__name__)

# signed limits are 2**32 or 2**64, so one less power of 2
# to become unsigned limits (half above 0, half below 0)
INT_32BIT = 2**31
INT_64BIT = 2**63


class AdvisoryLockBusy(RuntimeError):
pass


def _prepare_keys(keys):
"""
Ensures that integers do not exceed postgres constraints:
- signed 64bit allowed with single key
- signed 32bit allowed with two keys
:param keys: A list of unsigned integers
:return: A list of signed integers
"""
limit = INT_64BIT if len(keys) == 1 else INT_32BIT
new_keys = []
for key in keys:
# if key is over the limit, convert to negative int since key should be unsigned int
if key >= limit:
key = limit - key
if key < -limit or key >= limit:
raise OverflowError(f"Advisory lock key '{key}' is too large")
new_keys.append(key)
return new_keys


@contextmanager
def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wait=True):
"""
Expand All @@ -32,6 +57,7 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys = [key1]
if key2 is not None:
keys.append(key2)
keys = _prepare_keys(keys)

query = "SELECT pg{_try}_advisory_{xact_}{lock}{_shared}({keys}) AS lock;".format(
_try="" if wait else "_try",
Expand All @@ -41,11 +67,11 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys=", ".join(["%s" for i in range(0, 2 if key2 is not None else 1)])
)

log_query = "'{}' with params {}".format(query, keys)
logging.debug("Acquiring advisory lock: {}".format(query, log_query))
log_query = f"'{query}' with params {keys}"
logging.debug(f"Acquiring advisory lock: {log_query}")
with connection.cursor() as c:
c.execute(query, keys)
logging.debug("Acquired advisory lock: {}".format(query, log_query))
logging.debug(f"Acquired advisory lock: {log_query}")
yield c


Expand Down
2 changes: 1 addition & 1 deletion contentcuration/contentcuration/db/models/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from mptt.managers import TreeManager
from mptt.signals import node_moved

from contentcuration.constants.locking import TREE_LOCK
from contentcuration.db.advisory_lock import advisory_lock
from contentcuration.db.models.query import CustomTreeQuerySet
from contentcuration.utils.cache import ResourceSizeCache
Expand All @@ -32,7 +33,6 @@
# The exact optimum batch size is probably highly dependent on tree
# topology also, so these rudimentary tests are likely insufficient
BATCH_SIZE = 100
TREE_LOCK = 1001


class CustomManager(Manager.from_queryset(CTEQuerySet)):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import get from 'lodash/get';
import partition from 'lodash/partition';
import chunk from 'lodash/chunk';
import uniq from 'lodash/uniq';
import uniqBy from 'lodash/uniqBy';
import defer from 'lodash/defer';
Expand Down Expand Up @@ -83,12 +84,20 @@ export function loadClipboardNodes(context, { parent }) {
const legacyNodeIds = legacyNodes.map(n => n.id);

return Promise.all([
context.dispatch(
'contentNode/loadContentNodes',
{ '[node_id+channel_id]__in': nodeIdChannelIdPairs },
{ root }
// To avoid error code 414 URI Too Long errors, we chunk the pairs
// Given URI limit is 2000 chars:
// base URL at 100 chars + each pair at 70 chars = max 27 pairs
...chunk(nodeIdChannelIdPairs, 25).map(chunkPairs =>
context.dispatch(
'contentNode/loadContentNodes',
{ '[node_id+channel_id]__in': chunkPairs },
{ root }
)
),
// Chunk legacy nodes, double the size since not pairs
...chunk(legacyNodeIds, 50).map(legacyChunk =>
context.dispatch('contentNode/loadContentNodes', { id__in: legacyChunk }, { root })
),
context.dispatch('contentNode/loadContentNodes', { id__in: legacyNodeIds }, { root }),
]).then(() => {
return context.dispatch('addClipboardNodes', {
nodes: clipboardNodes,
Expand Down
2 changes: 1 addition & 1 deletion contentcuration/contentcuration/frontend/shared/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ client.interceptors.response.use(
}
}

message = message ? `${message}: ${url}` : `Network Error: ${url}`;
message = message ? `${message}: [${status}] ${url}` : `Network Error: [${status}] ${url}`;

if (process.env.NODE_ENV !== 'production') {
// In dev build log warnings to console for developer use
Expand Down
23 changes: 19 additions & 4 deletions contentcuration/contentcuration/frontend/shared/data/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Dexie from 'dexie';
import * as Sentry from '@sentry/vue';
import mapValues from 'lodash/mapValues';
import channel from './broadcastChannel';
import { CHANGES_TABLE, IGNORED_SOURCE, TABLE_NAMES } from './constants';
Expand Down Expand Up @@ -47,11 +48,25 @@ function runElection() {
elector.awaitLeadership().then(startSyncing);
elector.onduplicate = () => {
stopSyncing();
elector.die().then(runElection);
elector
.die()
.then(() => {
// manually reset reference to dead elector on the channel
// which is set within `createLeaderElection` and whose
// presence is also validated against, requiring its removal
channel._leaderElector = null;
return runElection();
})
.catch(Sentry.captureException);
};
}

export function initializeDB() {
setupSchema();
return db.open().then(runElection);
export async function initializeDB() {
try {
setupSchema();
await db.open();
await runElection();
} catch (e) {
Sentry.captureException(e);
}
}
15 changes: 15 additions & 0 deletions contentcuration/contentcuration/frontend/shared/data/serverSync.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as Sentry from '@sentry/vue';
import debounce from 'lodash/debounce';
import findLastIndex from 'lodash/findLastIndex';
import get from 'lodash/get';
Expand Down Expand Up @@ -99,6 +100,20 @@ function handleDisallowed(response) {
// that were rejected.
const disallowed = get(response, ['data', 'disallowed'], []);
if (disallowed.length) {
// Capture occurrences of the api disallowing changes
if (process.env.NODE_ENV === 'production') {
Sentry.withScope(function(scope) {
scope.addAttachment({
filename: 'disallowed.json',
data: JSON.stringify(disallowed),
contentType: 'application/json',
});
Sentry.captureException(new Error('/api/sync returned disallowed changes'));
});
} else {
console.warn('/api/sync returned disallowed changes:', disallowed); // eslint-disable-line no-console
}

// Collect all disallowed
const disallowedRevs = disallowed.map(d => Number(d.rev));
// Set the return error data onto the changes - this will update the change
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 3.2.14 on 2022-12-09 16:09
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

replaces = [('django_celery_results', '0140_delete_task'),]

def __init__(self, name, app_label):
super(Migration, self).__init__(name, 'django_celery_results')

dependencies = [
('contentcuration', '0141_soft_delete_user'),
('django_celery_results', '0011_taskresult_periodic_task_name'),
]

operations = [
migrations.AddField(
model_name='taskresult',
name='signature',
field=models.CharField(max_length=32, null=True),
),
migrations.AddIndex(
model_name='taskresult',
index=models.Index(condition=models.Q(('status__in', frozenset(['STARTED', 'REJECTED', 'RETRY', 'RECEIVED', 'PENDING']))), fields=['signature'], name='task_result_signature_idx'),
),
]
40 changes: 39 additions & 1 deletion contentcuration/contentcuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime

import pytz
from celery import states as celery_states
from django.conf import settings
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.base_user import BaseUserManager
Expand Down Expand Up @@ -75,6 +76,7 @@
from contentcuration.db.models.manager import CustomManager
from contentcuration.statistics import record_channel_stats
from contentcuration.utils.cache import delete_public_channel_cache_keys
from contentcuration.utils.celery.tasks import generate_task_signature
from contentcuration.utils.parser import load_json_string
from contentcuration.viewsets.sync.constants import ALL_CHANGES
from contentcuration.viewsets.sync.constants import ALL_TABLES
Expand Down Expand Up @@ -2541,13 +2543,20 @@ def serialize_to_change_dict(self):
class TaskResultCustom(object):
"""
Custom fields to add to django_celery_results's TaskResult model

If adding fields to this class, run `makemigrations` then move the generated migration from the
`django_celery_results` app to the `contentcuration` app and override the constructor to change
the app_label. See `0141_add_task_signature` for an example
"""
# user shouldn't be null, but in order to append the field, this needs to be allowed
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="tasks", on_delete=models.CASCADE, null=True)
channel_id = DjangoUUIDField(db_index=True, null=True, blank=True)
progress = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0), MaxValueValidator(100)])
# a hash of the task name and kwargs for identifying repeat tasks
signature = models.CharField(null=True, blank=False, max_length=32)

super_as_dict = TaskResult.as_dict
super_save = TaskResult.save

def as_dict(self):
"""
Expand All @@ -2561,16 +2570,45 @@ def as_dict(self):
)
return super_dict

def set_signature(self):
"""
Generates and sets the signature for the task if it isn't set
"""
if self.signature is not None:
# nothing to do
return
self.signature = generate_task_signature(self.task_name, task_kwargs=self.task_kwargs, channel_id=self.channel_id)

def save(self, *args, **kwargs):
"""
Override save to ensure signature is generated
"""
self.set_signature()
return self.super_save(*args, **kwargs)

@classmethod
def contribute_to_class(cls, model_class=TaskResult):
"""
Adds fields to model, by default TaskResult
:param model_class: TaskResult model
"""
for field in dir(cls):
if not field.startswith("_"):
if not field.startswith("_") and field not in ('contribute_to_class', 'Meta'):
model_class.add_to_class(field, getattr(cls, field))

# manually add Meta afterwards
setattr(model_class._meta, 'indexes', getattr(model_class._meta, 'indexes', []) + cls.Meta.indexes)

class Meta:
indexes = [
# add index that matches query usage for signature
models.Index(
fields=['signature'],
name='task_result_signature_idx',
condition=Q(status__in=celery_states.UNREADY_STATES),
),
]


# trigger class contributions immediately
TaskResultCustom.contribute_to_class()
3 changes: 2 additions & 1 deletion contentcuration/contentcuration/tests/test_asynctask.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ def test_fetch_or_enqueue_task__channel_id__uuid_then_hex(self):
self.assertEqual(expected_task.task_id, async_result.task_id)

def test_requeue_task(self):
existing_task_ids = requeue_test_task.find_ids()
signature = requeue_test_task._generate_signature({})
existing_task_ids = requeue_test_task.find_ids(signature)
self.assertEqual(len(existing_task_ids), 0)

first_async_result = requeue_test_task.enqueue(self.user, requeue=True)
Expand Down
Loading